[autotest] RDB Refactor II + Request/Response API.

Scheduler Refactor:
1. Batched processing of jobs.
2. Rdb hits the database instead of going through host_scheduler.
3. Migration to add a leased column.The scheduler released hosts
    every tick, back to the rdb.
4. Client rdb host that queue_entries use to track a host, instead
    of a database model.

Establishes a basic request/response api for the rdb:
rdb_utils:
    1. Requests: Assert the format and fields of some basic request types.
    2. Helper client/server modules to communicate with the rdb.
rdb_lib:
    1. Request managers for rdb methods:
        a. Match request-response
        b. Abstract the batching of requests.
    2. JobQueryManager: Regulates database access for job information.
rdb:
    1. QueryManagers: Regulate database access
    2. RequestHandlers: Use query managers to get things done.
    3. Dispatchers: Send incoming requests to the appropriate handlers.
Ignores wire formats.

TEST=unittests, functional verification.
BUG=chromium:314081, chromium:314083, chromium:314084
DEPLOY=scheduler, migrate

Change-Id: Id174c663c6e78295d365142751053eae4023116d
Reviewed-on: https://chromium-review.googlesource.com/183385
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 3cacbfa..4c320e7 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -22,6 +22,8 @@
 from autotest_lib.scheduler import email_manager, gc_stats, host_scheduler
 from autotest_lib.scheduler import monitor_db_cleanup, prejob_task
 from autotest_lib.scheduler import postjob_task, scheduler_logging_config
+from autotest_lib.scheduler import rdb_lib
+from autotest_lib.scheduler import rdb_utils
 from autotest_lib.scheduler import scheduler_models
 from autotest_lib.scheduler import status_server, scheduler_config
 from autotest_lib.server import autoserv_utils
@@ -817,10 +819,49 @@
 
 
     def _schedule_hostless_job(self, queue_entry):
+        """Schedule a hostless (suite) job.
+
+        @param queue_entry: The queue_entry representing the hostless job.
+        """
         self.add_agent_task(HostlessQueueTask(queue_entry))
         queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
 
 
+    def _schedule_host_job(self, host, queue_entry):
+        """Schedules a job on the given host.
+
+        1. Assign the host to the hqe, if it isn't already assigned.
+        2. Create a SpecialAgentTask for the hqe.
+        3. Activate the hqe.
+
+        @param queue_entry: The job to schedule.
+        @param host: The host to schedule the job on.
+        """
+        if self.host_has_agent(host):
+            host_agent_task = list(self._host_agents.get(host.id))[0].task
+            subject = 'Host with agents assigned to an HQE'
+            message = ('HQE: %s assigned host %s, but the host has '
+                       'agent: %s for queue_entry %s. The HQE '
+                       'will remain in a queued state till the '
+                       'the host is usable.' %
+                       (queue_entry, host.hostname, host_agent_task,
+                        host_agent_task.queue_entry))
+            email_manager.manager.enqueue_notify_email(subject, message)
+            queue_entry.set_host(None)
+            queue_entry.update_field('active', False)
+        else:
+            if queue_entry.host_id is None:
+                queue_entry.set_host(host)
+            else:
+                if host.id != queue_entry.host_id:
+                    raise rdb_utils.RDBException('The rdb returned host: %s '
+                            'but the job:%s was already assigned a host: %s. ' %
+                            (host.hostname, queue_entry.job_id,
+                            queue_entry.host.hostname))
+            queue_entry.update_field('active', True)
+            self._run_queue_entry(queue_entry)
+
+
     def _schedule_new_jobs(self):
         """
         Find any new HQEs and call schedule_pre_job_tasks for it.
@@ -833,55 +874,31 @@
         """
         queue_entries = self._refresh_pending_queue_entries()
 
+        key = 'scheduler.jobs_per_tick'
         new_hostless_jobs = 0
-        new_atomic_groups = 0
         new_jobs_with_hosts = 0
         new_jobs_need_hosts = 0
-
+        host_jobs = []
         logging.debug('Processing %d queue_entries', len(queue_entries))
-        for queue_entry in queue_entries:
-            self._log_extra_msg('Processing queue_entry: %s' % queue_entry)
-            is_unassigned_atomic_group = (
-                    queue_entry.atomic_group_id is not None
-                    and queue_entry.host_id is None)
 
+        for queue_entry in queue_entries:
             if queue_entry.is_hostless():
-                self._log_extra_msg('Scheduling hostless job.')
                 self._schedule_hostless_job(queue_entry)
                 new_hostless_jobs = new_hostless_jobs + 1
-            elif is_unassigned_atomic_group:
-                self._schedule_atomic_group(queue_entry)
-                new_atmoic_groups = new_atomic_groups + 1
             else:
+                host_jobs.append(queue_entry)
                 new_jobs_need_hosts = new_jobs_need_hosts + 1
-                assigned_host = self._host_scheduler.schedule_entry(queue_entry)
-                if assigned_host:
-                    # If we ever find ourselves in a position where a ready host
-                    # has an agent, roll back the host assignment and try again
-                    # next tick.
-                    if self.host_has_agent(assigned_host):
-                        host_agent_task = [host_agent.task for host_agent in
-                                           list(self._host_agents.get(
-                                                       assigned_host.id))][0]
-                        subject = 'Host with agents assigned to an HQE'
-                        message = ('HQE: %s assigned host %s, but the host has '
-                                   'agent: %s for queue_entry %s. The HQE '
-                                   'will remain in a queued state till the '
-                                   'the host is usable.' %
-                                   (queue_entry, assigned_host.hostname,
-                                    host_agent_task,
-                                    host_agent_task.queue_entry))
-                        email_manager.manager.enqueue_notify_email(subject, message)
-                        queue_entry.set_host(None)
-                        queue_entry.update_field('active', False)
-                    else:
-                        assert assigned_host.id == queue_entry.host_id
-                        self._run_queue_entry(queue_entry)
-                        new_jobs_with_hosts = new_jobs_with_hosts + 1
 
-        key = 'scheduler.jobs_per_tick'
         stats.Gauge(key).send('new_hostless_jobs', new_hostless_jobs)
-        stats.Gauge(key).send('new_atomic_groups', new_atomic_groups)
+        if not host_jobs:
+            return
+
+        hosts = rdb_lib.acquire_hosts(self._host_scheduler, host_jobs)
+        for host, queue_entry in zip(hosts, host_jobs):
+            if host:
+                self._schedule_host_job(host, queue_entry)
+                new_jobs_with_hosts = new_jobs_with_hosts + 1
+
         stats.Gauge(key).send('new_jobs_with_hosts', new_jobs_with_hosts)
         stats.Gauge(key).send('new_jobs_without_hosts',
                               new_jobs_need_hosts - new_jobs_with_hosts)