[autotest] Split host acquisition and job scheduling.
This is phase one of two in the plan to split host acquisition out of the
scheduler's tick. The idea is to have the host scheduler use a job query
manager to query the database for new jobs without hosts and assign
hosts to them, while the main scheduler uses the same query managers to
look for hostless jobs.
Currently the main scheduler uses the class to acquire hosts inline,
like it always has, and will continue to do so till the
inline_host_acquisition feature flag is turned on via the shadow_config.
TEST=Ran the scheduler, suites, unittets.
BUG=chromium:344613
DEPLOY=Scheduler
Change-Id: I542e4d1e509c16cac7354810416ee18ac940a7cf
Reviewed-on: https://chromium-review.googlesource.com/199383
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 31efe83..4afd8c5 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -21,11 +21,11 @@
from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
from autotest_lib.scheduler import postjob_task
-from autotest_lib.scheduler import rdb_lib
-from autotest_lib.scheduler import rdb_utils
+from autotest_lib.scheduler import query_managers
from autotest_lib.scheduler import scheduler_lib
from autotest_lib.scheduler import scheduler_models
from autotest_lib.scheduler import status_server, scheduler_config
+from autotest_lib.scheduler import scheduler_lib
from autotest_lib.server import autoserv_utils
from autotest_lib.site_utils.graphite import stats
@@ -33,7 +33,6 @@
PID_FILE_PREFIX = 'monitor_db'
RESULTS_DIR = '.'
-DB_CONFIG_SECTION = 'AUTOTEST_WEB'
AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
if os.environ.has_key('AUTOTEST_DIR'):
@@ -59,6 +58,10 @@
_autoserv_path = autoserv_utils.autoserv_path
_testing_mode = False
_drone_manager = None
+_inline_host_acquisition = global_config.global_config.get_config_value(
+ scheduler_config.CONFIG_SECTION, 'inline_host_acquisition', type=bool,
+ default=True)
+
def _parser_path_default(install_dir):
return os.path.join(install_dir, 'tko', 'parse')
@@ -183,12 +186,12 @@
if _testing_mode:
global_config.global_config.override_config_value(
- DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
+ scheduler_lib.DB_CONFIG_SECTION, 'database',
+ 'stresstest_autotest_web')
os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
global _db_manager
_db_manager = scheduler_lib.ConnectionManager()
-
logging.info("Setting signal handler")
signal.signal(signal.SIGINT, handle_sigint)
@@ -235,8 +238,6 @@
def __init__(self):
self._agents = []
self._last_clean_time = time.time()
- self._host_scheduler = host_scheduler.HostScheduler(
- _db_manager.get_connection())
user_cleanup_time = scheduler_config.config.clean_interval
self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
_db_manager.get_connection(), user_cleanup_time)
@@ -257,6 +258,15 @@
scheduler_config.CONFIG_SECTION, 'extra_debugging', type=bool,
default=False)
+ # If _inline_host_acquisition is set the scheduler will acquire and
+ # release hosts against jobs inline, with the tick. Otherwise the
+ # scheduler will only focus on jobs that already have hosts, and
+ # will not explicitly unlease a host when a job finishes using it.
+ self._job_query_manager = query_managers.AFEJobQueryManager()
+ self._host_scheduler = (host_scheduler.BaseHostScheduler()
+ if _inline_host_acquisition else
+ host_scheduler.DummyHostScheduler())
+
def initialize(self, recover_hosts=True):
self._periodic_cleanup.initialize()
@@ -268,8 +278,6 @@
if recover_hosts:
self._recover_hosts()
- self._host_scheduler.recovery_on_startup()
-
def _log_tick_msg(self, msg):
if self._tick_debug:
@@ -623,38 +631,6 @@
(len(unrecovered_hqes), message))
- def _get_prioritized_special_tasks(self):
- """
- Returns all queued SpecialTasks prioritized for repair first, then
- cleanup, then verify.
-
- @return: list of afe.models.SpecialTasks sorted according to priority.
- """
- queued_tasks = models.SpecialTask.objects.filter(is_active=False,
- is_complete=False,
- host__locked=False)
- # exclude hosts with active queue entries unless the SpecialTask is for
- # that queue entry
- queued_tasks = models.SpecialTask.objects.add_join(
- queued_tasks, 'afe_host_queue_entries', 'host_id',
- join_condition='afe_host_queue_entries.active',
- join_from_key='host_id', force_left_join=True)
- queued_tasks = queued_tasks.extra(
- where=['(afe_host_queue_entries.id IS NULL OR '
- 'afe_host_queue_entries.id = '
- 'afe_special_tasks.queue_entry_id)'])
-
- # reorder tasks by priority
- task_priority_order = [models.SpecialTask.Task.REPAIR,
- models.SpecialTask.Task.CLEANUP,
- models.SpecialTask.Task.VERIFY,
- models.SpecialTask.Task.RESET,
- models.SpecialTask.Task.PROVISION]
- def task_priority_key(task):
- return task_priority_order.index(task.task)
- return sorted(queued_tasks, key=task_priority_key)
-
-
def _schedule_special_tasks(self):
"""
Execute queued SpecialTasks that are ready to run on idle hosts.
@@ -665,7 +641,7 @@
adds them to the dispatchers agents list, so _handle_agents can execute
them.
"""
- for task in self._get_prioritized_special_tasks():
+ for task in self._job_query_manager.get_prioritized_special_tasks():
if self.host_has_agent(task.host):
continue
self.add_agent_task(self._get_agent_task_for_special_task(task))
@@ -705,60 +681,6 @@
print_message=message)
-
- def _get_pending_queue_entries(self):
- """
- Fetch a list of new host queue entries.
-
- The ordering of this list is important, as every new agent
- we schedule can potentially contribute to the process count
- on the drone, which has a static limit. The sort order
- prioritizes jobs as follows:
- 1. High priority jobs: Based on the afe_job's priority
- 2. With hosts and metahosts: This will only happen if we don't
- activate the hqe after assigning a host to it in
- schedule_new_jobs.
- 3. With hosts but without metahosts: When tests are scheduled
- through the frontend the owner of the job would have chosen
- a host for it.
- 4. Without hosts but with metahosts: This is the common case of
- a new test that needs a DUT. We assign a host and set it to
- active so it shouldn't show up in case 2 on the next tick.
- 5. Without hosts and without metahosts: Hostless suite jobs, that
- will result in new jobs that fall under category 4.
-
- A note about the ordering of cases 3 and 4:
- Prioritizing one case above the other leads to earlier acquisition
- of the following resources: 1. process slots on the drone 2. machines.
- - When a user schedules a job through the afe they choose a specific
- host for it. Jobs with metahost can utilize any host that satisfies
- the metahost criterion. This means that if we had scheduled 4 before
- 3 there is a good chance that a job which could've used another host,
- will now use the host assigned to a metahost-less job. Given the
- availability of machines in pool:suites, this almost guarantees
- starvation for jobs scheduled through the frontend.
- - Scheduling 4 before 3 also has its pros however, since a suite
- has the concept of a time out, whereas users can wait. If we hit the
- process count on the drone a suite can timeout waiting on the test,
- but a user job generally has a much longer timeout, and relatively
- harmless consequences.
- The current ordering was chosed because it is more likely that we will
- run out of machines in pool:suites than processes on the drone.
-
- @returns A list of HQEs ordered according to sort_order.
- """
- sort_order = ('afe_jobs.priority DESC, '
- 'ISNULL(host_id), '
- 'ISNULL(meta_host), '
- 'parent_job_id, '
- 'job_id')
- query=('NOT complete AND NOT active AND status="Queued"'
- 'AND NOT aborted')
- return list(scheduler_models.HostQueueEntry.fetch(
- joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
- where=query, order_by=sort_order))
-
-
def _refresh_pending_queue_entries(self):
"""
Lookup the pending HostQueueEntries and call our HostScheduler
@@ -766,47 +688,13 @@
@returns A list of pending HostQueueEntries sorted in priority order.
"""
- queue_entries = self._get_pending_queue_entries()
+ queue_entries = self._job_query_manager.get_pending_queue_entries(
+ only_hostless=not _inline_host_acquisition)
if not queue_entries:
return []
-
- self._host_scheduler.refresh(queue_entries)
-
return queue_entries
- def _schedule_atomic_group(self, queue_entry):
- """
- Schedule the given queue_entry on an atomic group of hosts.
-
- Returns immediately if there are insufficient available hosts.
-
- Creates new HostQueueEntries based off of queue_entry for the
- scheduled hosts and starts them all running.
- """
- # This is a virtual host queue entry representing an entire
- # atomic group, find a group and schedule their hosts.
- group_hosts = self._host_scheduler.find_eligible_atomic_group(
- queue_entry)
- if not group_hosts:
- return
-
- logging.info('Expanding atomic group entry %s with hosts %s',
- queue_entry,
- ', '.join(host.hostname for host in group_hosts))
-
- for assigned_host in group_hosts[1:]:
- # Create a new HQE for every additional assigned_host.
- new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
- new_hqe.save()
- new_hqe.set_host(assigned_host)
- self._run_queue_entry(new_hqe)
-
- # The first assigned host uses the original HostQueueEntry
- queue_entry.set_host(group_hosts[0])
- self._run_queue_entry(queue_entry)
-
-
def _schedule_hostless_job(self, queue_entry):
"""Schedule a hostless (suite) job.
@@ -836,16 +724,7 @@
host_agent_task.queue_entry))
email_manager.manager.enqueue_notify_email(subject, message)
else:
- if queue_entry.host_id is None:
- queue_entry.set_host(host)
- else:
- if host.id != queue_entry.host_id:
- raise rdb_utils.RDBException('The rdb returned host: %s '
- 'but the job:%s was already assigned a host: %s. ' %
- (host.hostname, queue_entry.job_id,
- queue_entry.host.hostname))
- queue_entry.update_field('active', True)
- self._run_queue_entry(queue_entry)
+ self._host_scheduler.schedule_host_job(host, queue_entry)
def _schedule_new_jobs(self):
@@ -878,12 +757,17 @@
stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
if not host_jobs:
return
-
- hosts = rdb_lib.acquire_hosts(self._host_scheduler, host_jobs)
- for host, queue_entry in zip(hosts, host_jobs):
- if host:
- self._schedule_host_job(host, queue_entry)
- new_jobs_with_hosts = new_jobs_with_hosts + 1
+ if not _inline_host_acquisition:
+ message = ('Found %s jobs that need hosts though '
+ '_inline_host_acquisition=%s. Will acquire hosts.' %
+ ([str(job) for job in host_jobs],
+ _inline_host_acquisition))
+ email_manager.manager.enqueue_notify_email(
+ 'Processing unexpected host acquisition requests', message)
+ jobs_with_hosts = self._host_scheduler.find_hosts_for_jobs(host_jobs)
+ for host_assignment in jobs_with_hosts:
+ self._schedule_host_job(host_assignment.host, host_assignment.job)
+ new_jobs_with_hosts = new_jobs_with_hosts + 1
stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
stats.Gauge(key).send('new_jobs_without_hosts',
@@ -917,12 +801,6 @@
self.add_agent_task(task)
- def _run_queue_entry(self, queue_entry):
- self._log_extra_msg('Scheduling pre job tasks for queue_entry: %s' %
- queue_entry)
- queue_entry.schedule_pre_job_tasks()
-
-
def _find_aborting(self):
"""
Looks through the afe_host_queue_entries for an aborted entry.