Some improvements to process tracking in the scheduler.
* have all AgentTasks declare how many processes they'll create (as an instance attribute). this is really where the information belongs.
* have Agent read its num_processes from its AgentTask, rather than requiring clients to pass it into the constructor.
* have AgentTasks pass this num_processes value into the DroneManager when executing commands, and have the DroneManager use this value rather than the hack of parsing it out of the command line. this required various changed to the DroneManager code which actually fix some small bugs and make the code cleaner in my opinion.
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: http://test.kernel.org/svn/autotest/trunk@3971 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/drone_manager.py b/scheduler/drone_manager.py
index 10d1f09..1e424a5 100644
--- a/scheduler/drone_manager.py
+++ b/scheduler/drone_manager.py
@@ -88,6 +88,20 @@
return self.error
+class _DroneHeapWrapper(object):
+ """Wrapper to compare drones based on used_capacity().
+
+ These objects can be used to keep a heap of drones by capacity.
+ """
+ def __init__(self, drone):
+ self.drone = drone
+
+
+ def __cmp__(self, other):
+ assert isinstance(other, _DroneHeapWrapper)
+ return cmp(self.drone.used_capacity(), other.drone.used_capacity())
+
+
class DroneManager(object):
"""
This class acts as an interface from the scheduler to drones, whether it be
@@ -107,7 +121,7 @@
self._drones = {}
self._results_drone = None
self._attached_files = {}
- # heapq of tuples (used_capacity, drone)
+ # heapq of _DroneHeapWrappers
self._drone_queue = []
@@ -191,6 +205,8 @@
if allowed_users is not None:
drone.allowed_users = set(allowed_users)
+ self._reorder_drone_queue() # max_processes may have changed
+
def get_drones(self):
return self._drones.itervalues()
@@ -278,7 +294,11 @@
def _enqueue_drone(self, drone):
- heapq.heappush(self._drone_queue, (drone.used_capacity(), drone))
+ heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))
+
+
+ def _reorder_drone_queue(self):
+ heapq.heapify(self._drone_queue)
def refresh(self):
@@ -369,16 +389,6 @@
os.makedirs(path)
- def _extract_num_processes(self, command):
- try:
- machine_list_index = command.index('-m') + 1
- except ValueError:
- return 1
- assert machine_list_index < len(command)
- machine_list = command[machine_list_index].split(',')
- return len(machine_list)
-
-
def total_running_processes(self):
return sum(drone.active_processes for drone in self.get_drones())
@@ -392,9 +402,9 @@
if not self._drone_queue:
# all drones disabled
return 0
- return max(drone.max_processes - drone.active_processes
- for _, drone in self._drone_queue
- if drone.usable_by(username))
+ return max(wrapper.drone.max_processes - wrapper.drone.active_processes
+ for wrapper in self._drone_queue
+ if wrapper.drone.usable_by(username))
def _least_loaded_drone(self, drones):
@@ -411,7 +421,7 @@
checked_drones = []
drone_to_use = None
while self._drone_queue:
- used_capacity, drone = heapq.heappop(self._drone_queue)
+ drone = heapq.heappop(self._drone_queue).drone
checked_drones.append(drone)
if not drone.usable_by(username):
continue
@@ -428,8 +438,6 @@
num_processes, drone_summary)
drone_to_use = self._least_loaded_drone(checked_drones)
- drone_to_use.active_processes += num_processes
-
# refill _drone_queue
for drone in checked_drones:
self._enqueue_drone(drone)
@@ -445,7 +453,7 @@
def execute_command(self, command, working_directory, pidfile_name,
- log_file=None, paired_with_pidfile=None,
+ num_processes, log_file=None, paired_with_pidfile=None,
username=None):
"""
Execute the given command, taken as an argv list.
@@ -474,13 +482,14 @@
if paired_with_pidfile:
drone = self._get_drone_for_pidfile_id(paired_with_pidfile)
else:
- num_processes = self._extract_num_processes(command)
drone = self._choose_drone_for_execution(num_processes, username)
logging.info("command = %s" % command)
logging.info('log file = %s:%s' % (drone.hostname, log_file))
self._write_attached_files(working_directory, drone)
drone.queue_call('execute_command', command, abs_working_directory,
log_file, pidfile_name)
+ drone.active_processes += num_processes
+ self._reorder_drone_queue()
pidfile_path = os.path.join(abs_working_directory, pidfile_name)
pidfile_id = PidfileId(pidfile_path)
diff --git a/scheduler/drone_manager_unittest.py b/scheduler/drone_manager_unittest.py
index a44e729..4364bfb 100755
--- a/scheduler/drone_manager_unittest.py
+++ b/scheduler/drone_manager_unittest.py
@@ -170,6 +170,7 @@
command=['test', drone_manager.WORKING_DIRECTORY],
working_directory=self._WORKING_DIRECTORY,
pidfile_name=pidfile_name,
+ num_processes=1,
log_file=log_file)
full_working_directory = os.path.join(self._DRONE_RESULTS_DIR,
@@ -190,7 +191,8 @@
self._WORKING_DIRECTORY, contents)
self.manager.execute_command(command=['test'],
working_directory=self._WORKING_DIRECTORY,
- pidfile_name='mypidfile')
+ pidfile_name='mypidfile',
+ num_processes=1)
self.assert_(self.mock_drone.was_call_queued(
'write_to_file',
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 52f5b96..ce33add 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -785,8 +785,7 @@
def recover_entries(job, queue_entries, run_monitor):
queue_task = QueueTask(job=job, queue_entries=queue_entries,
recover_run_monitor=run_monitor)
- self.add_agent(Agent(task=queue_task,
- num_processes=len(queue_entries)))
+ self.add_agent(Agent(task=queue_task))
self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
orphans, _AUTOSERV_PID_FILE,
@@ -808,7 +807,7 @@
def recover_entries(job, queue_entries, run_monitor):
reparse_task = FinalReparseTask(queue_entries,
recover_run_monitor=run_monitor)
- self.add_agent(Agent(reparse_task, num_processes=0))
+ self.add_agent(Agent(reparse_task))
self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
orphans, _PARSER_PID_FILE,
@@ -1072,7 +1071,7 @@
'entry with invalid status %s: %s'
% (entry.status, entry))
- self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
+ self.add_agent(Agent(agent_task))
def _schedule_delay_tasks(self):
@@ -1080,7 +1079,7 @@
models.HostQueueEntry.Status.WAITING):
task = entry.job.schedule_delayed_callback_task(entry)
if task:
- self.add_agent(Agent(task, num_processes=0))
+ self.add_agent(Agent(task))
def _run_queue_entry(self, queue_entry, host):
@@ -1210,16 +1209,17 @@
self._start_time = time.time()
- def run(self, command, working_directory, nice_level=None, log_file=None,
- pidfile_name=None, paired_with_pidfile=None, username=None):
+ def run(self, command, working_directory, num_processes, nice_level=None,
+ log_file=None, pidfile_name=None, paired_with_pidfile=None,
+ username=None):
assert command is not None
if nice_level is not None:
command = ['nice', '-n', str(nice_level)] + command
self._set_start_time()
self.pidfile_id = _drone_manager.execute_command(
command, working_directory, pidfile_name=pidfile_name,
- log_file=log_file, paired_with_pidfile=paired_with_pidfile,
- username=username)
+ num_processes=num_processes, log_file=log_file,
+ paired_with_pidfile=paired_with_pidfile, username=username)
def attach_to_existing_process(self, execution_path,
@@ -1375,7 +1375,7 @@
"""
- def __init__(self, task, num_processes=1):
+ def __init__(self, task):
"""
@param task: A task as described in the class docstring.
@param num_processes: The number of subprocesses the Agent represents.
@@ -1387,7 +1387,7 @@
# This is filled in by Dispatcher.add_agent()
self.dispatcher = None
- self.num_processes = num_processes
+ self.num_processes = task.num_processes
self.queue_entry_ids = task.queue_entry_ids
self.host_ids = task.host_ids
@@ -1452,6 +1452,7 @@
self.host_ids = ()
self.success = False
self.queue_entry_ids = ()
+ self.num_processes = 0
# This is filled in by Agent.add_task().
self.agent = None
@@ -1472,14 +1473,18 @@
class AgentTask(object):
def __init__(self, cmd=None, working_directory=None,
- recover_run_monitor=None, username=None):
+ recover_run_monitor=None, username=None, num_processes=1):
"""
- username: login of user responsible for this task. may be None.
+ @param username: login of user responsible for this task. may be None.
+ @param num_processes: number of autoserv processes launched by this
+ AgentTask. this includes forking the autoserv process may do.
+ it may be only approximate.
"""
self.done = False
self.cmd = cmd
self._working_directory = working_directory
self.username = username
+ self.num_processes = num_processes
self.agent = None
self.monitor = recover_run_monitor
self.started = bool(recover_run_monitor)
@@ -1603,6 +1608,7 @@
if self.cmd:
self.monitor = PidfileRunMonitor()
self.monitor.run(self.cmd, self._working_directory,
+ num_processes=self.num_processes,
nice_level=AUTOSERV_NICE_LEVEL,
log_file=self.log_file,
pidfile_name=pidfile_name,
@@ -1882,7 +1888,7 @@
super(QueueTask, self).__init__(
cmd=cmd, working_directory=self._execution_path(),
recover_run_monitor=recover_run_monitor,
- username=job.owner)
+ username=job.owner, num_processes=len(queue_entries))
self._set_ids(queue_entries=queue_entries)
@@ -1993,7 +1999,7 @@
class PostJobTask(AgentTask):
- def __init__(self, queue_entries, pidfile_name, logfile_name,
+ def __init__(self, queue_entries, pidfile_name, logfile_name, num_processes,
recover_run_monitor=None):
self._queue_entries = queue_entries
self._pidfile_name = pidfile_name
@@ -2013,7 +2019,8 @@
super(PostJobTask, self).__init__(
cmd=command, working_directory=self._execution_path,
recover_run_monitor=recover_run_monitor,
- username=queue_entries[0].job.owner)
+ username=queue_entries[0].job.owner,
+ num_processes=num_processes)
self.log_file = os.path.join(self._execution_path, logfile_name)
self._final_status = self._determine_final_status()
@@ -2089,6 +2096,7 @@
super(GatherLogsTask, self).__init__(
queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
logfile_name='.collect_crashinfo.log',
+ num_processes=len(queue_entries),
recover_run_monitor=recover_run_monitor)
self._set_ids(queue_entries=queue_entries)
@@ -2214,6 +2222,7 @@
super(FinalReparseTask, self).__init__(
queue_entries, pidfile_name=_PARSER_PID_FILE,
logfile_name='.parse.log',
+ num_processes=0, # don't include parser processes in accounting
recover_run_monitor=recover_run_monitor)
# don't use _set_ids, since we don't want to set the host_ids
self.queue_entry_ids = [entry.id for entry in queue_entries]
diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py
index f4cdcbb..858bb52 100644
--- a/scheduler/monitor_db_functional_test.py
+++ b/scheduler/monitor_db_functional_test.py
@@ -67,17 +67,22 @@
Object to represent pidfile IDs that is opaque to the scheduler code but
still debugging-friendly for us.
"""
- def __init__(self, debug_string):
+ def __init__(self, debug_string, num_processes):
self._debug_string = debug_string
+ self._num_processes = num_processes
def __str__(self):
return self._debug_string
+ def __repr__(self):
+ return '<_DummyPidfileId: %s>' % str(self)
+
+
def __init__(self):
super(MockDroneManager, self).__init__()
- self.max_runnable_processes_value = 100
+ self.process_capacity = 100
# maps result_dir to set of tuples (file_path, file_contents)
self._attached_files = {}
@@ -122,11 +127,15 @@
return pidfile_id in self._killed_pidfiles
- def running_pidfile_ids(self):
- return [str(pidfile_id) for pidfile_id, pidfile_contents
+ def nonfinished_pidfile_ids(self):
+ return [pidfile_id for pidfile_id, pidfile_contents
in self._pidfiles.iteritems()
- if pidfile_contents.process is not None
- and pidfile_contents.exit_status is None]
+ if pidfile_contents.exit_status is None]
+
+
+ def running_pidfile_ids(self):
+ return [pidfile_id for pidfile_id in self.nonfinished_pidfile_ids()
+ if self._pidfiles[pidfile_id].process is not None]
# DroneManager emulation APIs for use by monitor_db
@@ -136,11 +145,12 @@
def total_running_processes(self):
- return 0
+ return sum(pidfile_id._num_processes
+ for pidfile_id in self.nonfinished_pidfile_ids())
def max_runnable_processes(self, username):
- return self.max_runnable_processes_value
+ return self.process_capacity - self.total_running_processes()
def refresh(self):
@@ -190,10 +200,11 @@
def execute_command(self, command, working_directory, pidfile_name,
- log_file=None, paired_with_pidfile=None,
+ num_processes, log_file=None, paired_with_pidfile=None,
username=None):
pidfile_id = self._DummyPidfileId(
- self._get_pidfile_debug_string(working_directory, pidfile_name))
+ self._get_pidfile_debug_string(working_directory, pidfile_name),
+ num_processes)
self._future_pidfiles.append(pidfile_id)
self._initialize_pidfile(pidfile_id)
self._pidfile_index[(working_directory, pidfile_name)] = pidfile_id
@@ -237,7 +248,8 @@
+ self._get_pidfile_debug_string(execution_tag,
pidfile_name))
return self._pidfile_index.get((execution_tag, pidfile_name),
- self._DummyPidfileId(debug_string))
+ self._DummyPidfileId(debug_string,
+ num_processes=0))
def kill_process(self, process):
@@ -318,11 +330,13 @@
'%s/%s not executed' % (working_directory, pidfile_name))
- def _check_statuses(self, queue_entry, queue_entry_status, host_status):
+ def _check_statuses(self, queue_entry, queue_entry_status,
+ host_status=None):
# update from DB
queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
self.assertEquals(queue_entry.status, queue_entry_status)
- self.assertEquals(queue_entry.host.status, host_status)
+ if host_status:
+ self.assertEquals(queue_entry.host.status, host_status)
def _check_host_status(self, host, status):
@@ -686,10 +700,10 @@
job1, queue_entry1 = self._make_job_and_queue_entry()
job2, queue_entry2 = self._make_job_and_queue_entry()
- self.mock_drone_manager.max_runnable_processes_value = 0
+ self.mock_drone_manager.process_capacity = 0
self._run_dispatcher() # schedule job1, but won't start verify
job1.hostqueueentry_set.update(aborted=True)
- self.mock_drone_manager.max_runnable_processes_value = 100
+ self.mock_drone_manager.process_capacity = 100
self._run_dispatcher() # cleanup must run here, not verify for job2
self._check_statuses(queue_entry1, HqeStatus.ABORTED,
HostStatus.CLEANING)
@@ -699,29 +713,6 @@
HostStatus.VERIFYING)
- def _test_job_scheduled_just_after_abort_2(self):
- # test a pretty obscure corner case where a job is aborted while queued,
- # another job is ready to run, and throttling is active. the post-abort
- # cleanup must not be pre-empted by the second job.
- job1, _ = self._make_job_and_queue_entry()
- job2 = self._create_job(hosts=[1,2])
- job2.synch_count = 2
- job2.save()
-
- self.mock_drone_manager.max_runnable_processes_value = 0
- self._run_dispatcher() # schedule job1, but won't start verify
- job1.hostqueueentry_set.update(aborted=True)
- self.mock_drone_manager.max_runnable_processes_value = 100
- self._run_dispatcher()
- self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
- self._run_dispatcher()
- self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)
- self._run_dispatcher()
- self.mock_drone_manager.finish_specific_process(
- 'hosts/host2/2-verify', monitor_db._AUTOSERV_PID_FILE)
- self._run_dispatcher()
-
-
def test_reverify_interrupting_pre_job(self):
# ensure things behave sanely if a reverify is scheduled in the middle
# of pre-job actions
@@ -806,5 +797,57 @@
self._check_host_status(queue_entry.host, HostStatus.READY)
+ def test_throttling(self):
+ job = self._create_job(hosts=[1,2,3])
+ job.synch_count = 3
+ job.save()
+
+ queue_entries = list(job.hostqueueentry_set.all())
+ def _check_hqe_statuses(*statuses):
+ for queue_entry, status in zip(queue_entries, statuses):
+ self._check_statuses(queue_entry, status)
+
+ self.mock_drone_manager.process_capacity = 2
+ self._run_dispatcher() # verify runs on 1 and 2
+ _check_hqe_statuses(HqeStatus.VERIFYING, HqeStatus.VERIFYING,
+ HqeStatus.VERIFYING)
+ self.assertEquals(len(self.mock_drone_manager.running_pidfile_ids()), 2)
+
+ self.mock_drone_manager.finish_specific_process(
+ 'hosts/host1/1-verify', monitor_db._AUTOSERV_PID_FILE)
+ self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
+ self._run_dispatcher() # verify runs on 3
+ _check_hqe_statuses(HqeStatus.PENDING, HqeStatus.PENDING,
+ HqeStatus.VERIFYING)
+
+ self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
+ self._run_dispatcher() # job won't run due to throttling
+ _check_hqe_statuses(HqeStatus.STARTING, HqeStatus.STARTING,
+ HqeStatus.STARTING)
+ self._assert_nothing_is_running()
+
+ self.mock_drone_manager.process_capacity = 3
+ self._run_dispatcher() # now job runs
+ _check_hqe_statuses(HqeStatus.RUNNING, HqeStatus.RUNNING,
+ HqeStatus.RUNNING)
+
+ self.mock_drone_manager.process_capacity = 2
+ self.mock_drone_manager.finish_process(_PidfileType.JOB,
+ exit_status=271)
+ self._run_dispatcher() # gathering won't run due to throttling
+ _check_hqe_statuses(HqeStatus.GATHERING, HqeStatus.GATHERING,
+ HqeStatus.GATHERING)
+ self._assert_nothing_is_running()
+
+ self.mock_drone_manager.process_capacity = 3
+ self._run_dispatcher() # now gathering runs
+
+ self.mock_drone_manager.process_capacity = 0
+ self.mock_drone_manager.finish_process(_PidfileType.GATHER)
+ self._run_dispatcher() # parsing runs despite throttling
+ _check_hqe_statuses(HqeStatus.PARSING, HqeStatus.PARSING,
+ HqeStatus.PARSING)
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index f9fb081..57e2cd5 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -1102,6 +1102,7 @@
def _create_mock_task(self, name):
task = self.god.create_mock_class(monitor_db.AgentTask, name)
+ task.num_processes = 1
_set_host_and_qe_ids(task)
return task
@@ -1174,7 +1175,7 @@
delay_seconds=2, callback=test_callback,
now_func=test_time) # time 33
self.assertEqual(35, delay_task.end_time)
- agent = monitor_db.Agent(delay_task, num_processes=0)
+ agent = monitor_db.Agent(delay_task)
self.assert_(not agent.started)
agent.tick() # activates the task and polls it once, time 34.01
self.assertEqual(0, test_callback.calls, "callback called early")
@@ -1192,7 +1193,7 @@
def test_delayed_call_abort(self):
delay_task = monitor_db.DelayedCallTask(
delay_seconds=987654, callback=lambda : None)
- agent = monitor_db.Agent(delay_task, num_processes=0)
+ agent = monitor_db.Agent(delay_task)
agent.abort()
agent.tick()
self.assert_(agent.is_done())
@@ -1330,6 +1331,7 @@
monitor_db.PidfileRunMonitor.run.expect_call(
mock.is_instance_comparator(list),
self.BASE_TASK_DIR + task_tag,
+ num_processes=1,
nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
log_file=mock.anything_comparator(),
pidfile_name=monitor_db._AUTOSERV_PID_FILE,
@@ -1650,13 +1652,14 @@
self._expect_copy_results()
- def _setup_post_job_run_monitor(self, pidfile_name):
+ def _setup_post_job_run_monitor(self, pidfile_name, num_processes=1):
self.pidfile_monitor.has_process.expect_call().and_return(True)
autoserv_pidfile_id = object()
self.monitor = monitor_db.PidfileRunMonitor.expect_new()
self.monitor.run.expect_call(
mock.is_instance_comparator(list),
'tag',
+ num_processes=num_processes,
nice_level=monitor_db.AUTOSERV_NICE_LEVEL,
log_file=mock.anything_comparator(),
pidfile_name=pidfile_name,
@@ -1676,7 +1679,8 @@
def _test_final_reparse_task_helper(self, autoserv_success=True):
self._setup_pre_parse_expects(autoserv_success)
- self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE)
+ self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE,
+ num_processes=0)
self._setup_post_parse_expects(autoserv_success)
task = monitor_db.FinalReparseTask([self.queue_entry])
@@ -1704,7 +1708,8 @@
False)
monitor_db.FinalReparseTask._can_run_new_parse.expect_call().and_return(
True)
- self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE)
+ self._setup_post_job_run_monitor(monitor_db._PARSER_PID_FILE,
+ num_processes=0)
self._setup_post_parse_expects(True)
task = monitor_db.FinalReparseTask([self.queue_entry])