[autotest] RDB Refactor II + Request/Response API.
Scheduler Refactor:
1. Batched processing of jobs.
2. Rdb hits the database instead of going through host_scheduler.
3. Migration to add a leased column.The scheduler released hosts
every tick, back to the rdb.
4. Client rdb host that queue_entries use to track a host, instead
of a database model.
Establishes a basic request/response api for the rdb:
rdb_utils:
1. Requests: Assert the format and fields of some basic request types.
2. Helper client/server modules to communicate with the rdb.
rdb_lib:
1. Request managers for rdb methods:
a. Match request-response
b. Abstract the batching of requests.
2. JobQueryManager: Regulates database access for job information.
rdb:
1. QueryManagers: Regulate database access
2. RequestHandlers: Use query managers to get things done.
3. Dispatchers: Send incoming requests to the appropriate handlers.
Ignores wire formats.
TEST=unittests, functional verification.
BUG=chromium:314081, chromium:314083, chromium:314084
DEPLOY=scheduler, migrate
Change-Id: Id174c663c6e78295d365142751053eae4023116d
Reviewed-on: https://chromium-review.googlesource.com/183385
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 3cacbfa..4c320e7 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -22,6 +22,8 @@
from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
from autotest_lib.scheduler import postjob_task, scheduler_logging_config
+from autotest_lib.scheduler import rdb_lib
+from autotest_lib.scheduler import rdb_utils
from autotest_lib.scheduler import scheduler_models
from autotest_lib.scheduler import status_server, scheduler_config
from autotest_lib.server import autoserv_utils
@@ -817,10 +819,49 @@
def _schedule_hostless_job(self, queue_entry):
+ """Schedule a hostless (suite) job.
+
+ @param queue_entry: The queue_entry representing the hostless job.
+ """
self.add_agent_task(HostlessQueueTask(queue_entry))
queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
+ def _schedule_host_job(self, host, queue_entry):
+ """Schedules a job on the given host.
+
+ 1. Assign the host to the hqe, if it isn't already assigned.
+ 2. Create a SpecialAgentTask for the hqe.
+ 3. Activate the hqe.
+
+ @param queue_entry: The job to schedule.
+ @param host: The host to schedule the job on.
+ """
+ if self.host_has_agent(host):
+ host_agent_task = list(self._host_agents.get(host.id))[0].task
+ subject = 'Host with agents assigned to an HQE'
+ message = ('HQE: %s assigned host %s, but the host has '
+ 'agent: %s for queue_entry %s. The HQE '
+ 'will remain in a queued state till the '
+ 'the host is usable.' %
+ (queue_entry, host.hostname, host_agent_task,
+ host_agent_task.queue_entry))
+ email_manager.manager.enqueue_notify_email(subject, message)
+ queue_entry.set_host(None)
+ queue_entry.update_field('active', False)
+ else:
+ if queue_entry.host_id is None:
+ queue_entry.set_host(host)
+ else:
+ if host.id != queue_entry.host_id:
+ raise rdb_utils.RDBException('The rdb returned host: %s '
+ 'but the job:%s was already assigned a host: %s. ' %
+ (host.hostname, queue_entry.job_id,
+ queue_entry.host.hostname))
+ queue_entry.update_field('active', True)
+ self._run_queue_entry(queue_entry)
+
+
def _schedule_new_jobs(self):
"""
Find any new HQEs and call schedule_pre_job_tasks for it.
@@ -833,55 +874,31 @@
"""
queue_entries = self._refresh_pending_queue_entries()
+ key = 'scheduler.jobs_per_tick'
new_hostless_jobs = 0
- new_atomic_groups = 0
new_jobs_with_hosts = 0
new_jobs_need_hosts = 0
-
+ host_jobs = []
logging.debug('Processing %d queue_entries', len(queue_entries))
- for queue_entry in queue_entries:
- self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
- is_unassigned_atomic_group = (
- queue_entry.atomic_group_id is not None
- and queue_entry.host_id is None)
+ for queue_entry in queue_entries:
if queue_entry.is_hostless():
- self._log_extra_msg('Scheduling hostless job.')
self._schedule_hostless_job(queue_entry)
new_hostless_jobs = new_hostless_jobs + 1
- elif is_unassigned_atomic_group:
- self._schedule_atomic_group(queue_entry)
- new_atmoic_groups = new_atomic_groups + 1
else:
+ host_jobs.append(queue_entry)
new_jobs_need_hosts = new_jobs_need_hosts + 1
- assigned_host = self._host_scheduler.schedule_entry(queue_entry)
- if assigned_host:
- # If we ever find ourselves in a position where a ready host
- # has an agent, roll back the host assignment and try again
- # next tick.
- if self.host_has_agent(assigned_host):
- host_agent_task = [host_agent.task for host_agent in
- list(self._host_agents.get(
- assigned_host.id))][0]
- subject = 'Host with agents assigned to an HQE'
- message = ('HQE: %s assigned host %s, but the host has '
- 'agent: %s for queue_entry %s. The HQE '
- 'will remain in a queued state till the '
- 'the host is usable.' %
- (queue_entry, assigned_host.hostname,
- host_agent_task,
- host_agent_task.queue_entry))
- email_manager.manager.enqueue_notify_email(subject, message)
- queue_entry.set_host(None)
- queue_entry.update_field('active', False)
- else:
- assert assigned_host.id == queue_entry.host_id
- self._run_queue_entry(queue_entry)
- new_jobs_with_hosts = new_jobs_with_hosts + 1
- key = 'scheduler.jobs_per_tick'
stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
- stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
+ if not host_jobs:
+ return
+
+ hosts = rdb_lib.acquire_hosts(self._host_scheduler, host_jobs)
+ for host, queue_entry in zip(hosts, host_jobs):
+ if host:
+ self._schedule_host_job(host, queue_entry)
+ new_jobs_with_hosts = new_jobs_with_hosts + 1
+
stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
stats.Gauge(key).send('new_jobs_without_hosts',
new_jobs_need_hosts - new_jobs_with_hosts)