Host scheduler refactoring. Move HostScheduler out of monitor_db.

In order to facilitate site extensibility of HostScheduler we need to factor out the dependence on global variables in monitor_db. I modeled this refactoring off of monitor_db_cleanup.

The main changes I've made are as follows:
1. Move BaseHostScheduler, site import, and SchedulerError out of monitor_db. SchedulerError must be moved to prevent a cyclical dependency.
2. Convert staticmethod/classmethods in BaseHostScheduler, to normal methods.
3. Fix unit tests and monitor_db to import SchedulerError from host_scheduler.

Change-Id: I0c10b79e70064b73121bbb347bb71ba15e0353d1

BUG=chromium-os:12654
TEST=Ran unit tests. Tested with private Autotest instance.

Review URL: http://codereview.chromium.org/6597047
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 556eb17..cd1f91e 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -23,9 +23,8 @@
 from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
 from autotest_lib.frontend.afe import model_attributes
 from autotest_lib.scheduler import drone_manager, drones, email_manager
-from autotest_lib.scheduler import monitor_db_cleanup
+from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
 from autotest_lib.scheduler import status_server, scheduler_config
-from autotest_lib.scheduler import gc_stats, metahost_scheduler
 from autotest_lib.scheduler import scheduler_models
 BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
 PID_FILE_PREFIX = 'monitor_db'
@@ -75,15 +74,11 @@
     return {}
 
 
-get_site_metahost_schedulers = utils.import_site_function(
-        __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
-        'get_metahost_schedulers', lambda : ())
-
-
 def _verify_default_drone_set_exists():
     if (models.DroneSet.drone_sets_enabled() and
             not models.DroneSet.default_drone_set_name()):
-        raise SchedulerError('Drone sets are enabled, but no default is set')
+        raise host_scheduler.SchedulerError(
+                'Drone sets are enabled, but no default is set')
 
 
 def _sanity_check():
@@ -258,427 +253,11 @@
     return autoserv_argv + extra_args
 
 
-class SchedulerError(Exception):
-    """Raised by HostScheduler when an inconsistent state occurs."""
-
-
-class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility):
-    """Handles the logic for choosing when to run jobs and on which hosts.
-
-    This class makes several queries to the database on each tick, building up
-    some auxiliary data structures and using them to determine which hosts are
-    eligible to run which jobs, taking into account all the various factors that
-    affect that.
-
-    In the past this was done with one or two very large, complex database
-    queries.  It has proven much simpler and faster to build these auxiliary
-    data structures and perform the logic in Python.
-    """
-    def __init__(self):
-        self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
-
-        # load site-specific scheduler selected in global_config
-        site_schedulers_str = global_config.global_config.get_config_value(
-                scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
-                default='')
-        site_schedulers = set(site_schedulers_str.split(','))
-        for scheduler in get_site_metahost_schedulers():
-            if type(scheduler).__name__ in site_schedulers:
-                # always prepend, so site schedulers take precedence
-                self._metahost_schedulers = (
-                        [scheduler] + self._metahost_schedulers)
-        logging.info('Metahost schedulers: %s',
-                     ', '.join(type(scheduler).__name__ for scheduler
-                               in self._metahost_schedulers))
-
-
-    def _get_ready_hosts(self):
-        # avoid any host with a currently active queue entry against it
-        hosts = scheduler_models.Host.fetch(
-            joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
-                  'ON (afe_hosts.id = active_hqe.host_id AND '
-                      'active_hqe.active)',
-            where="active_hqe.host_id IS NULL "
-                  "AND NOT afe_hosts.locked "
-                  "AND (afe_hosts.status IS NULL "
-                          "OR afe_hosts.status = 'Ready')")
-        return dict((host.id, host) for host in hosts)
-
-
-    @staticmethod
-    def _get_sql_id_list(id_list):
-        return ','.join(str(item_id) for item_id in id_list)
-
-
-    @classmethod
-    def _get_many2many_dict(cls, query, id_list, flip=False):
-        if not id_list:
-            return {}
-        query %= cls._get_sql_id_list(id_list)
-        rows = _db.execute(query)
-        return cls._process_many2many_dict(rows, flip)
-
-
-    @staticmethod
-    def _process_many2many_dict(rows, flip=False):
-        result = {}
-        for row in rows:
-            left_id, right_id = int(row[0]), int(row[1])
-            if flip:
-                left_id, right_id = right_id, left_id
-            result.setdefault(left_id, set()).add(right_id)
-        return result
-
-
-    @classmethod
-    def _get_job_acl_groups(cls, job_ids):
-        query = """
-        SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
-        FROM afe_jobs
-        INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
-        INNER JOIN afe_acl_groups_users ON
-                afe_acl_groups_users.user_id = afe_users.id
-        WHERE afe_jobs.id IN (%s)
-        """
-        return cls._get_many2many_dict(query, job_ids)
-
-
-    @classmethod
-    def _get_job_ineligible_hosts(cls, job_ids):
-        query = """
-        SELECT job_id, host_id
-        FROM afe_ineligible_host_queues
-        WHERE job_id IN (%s)
-        """
-        return cls._get_many2many_dict(query, job_ids)
-
-
-    @classmethod
-    def _get_job_dependencies(cls, job_ids):
-        query = """
-        SELECT job_id, label_id
-        FROM afe_jobs_dependency_labels
-        WHERE job_id IN (%s)
-        """
-        return cls._get_many2many_dict(query, job_ids)
-
-
-    @classmethod
-    def _get_host_acls(cls, host_ids):
-        query = """
-        SELECT host_id, aclgroup_id
-        FROM afe_acl_groups_hosts
-        WHERE host_id IN (%s)
-        """
-        return cls._get_many2many_dict(query, host_ids)
-
-
-    @classmethod
-    def _get_label_hosts(cls, host_ids):
-        if not host_ids:
-            return {}, {}
-        query = """
-        SELECT label_id, host_id
-        FROM afe_hosts_labels
-        WHERE host_id IN (%s)
-        """ % cls._get_sql_id_list(host_ids)
-        rows = _db.execute(query)
-        labels_to_hosts = cls._process_many2many_dict(rows)
-        hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
-        return labels_to_hosts, hosts_to_labels
-
-
-    @classmethod
-    def _get_labels(cls):
-        return dict((label.id, label) for label
-                    in scheduler_models.Label.fetch())
-
-
-    def recovery_on_startup(self):
-        for metahost_scheduler in self._metahost_schedulers:
-            metahost_scheduler.recovery_on_startup()
-
-
-    def refresh(self, pending_queue_entries):
-        self._hosts_available = self._get_ready_hosts()
-
-        relevant_jobs = [queue_entry.job_id
-                         for queue_entry in pending_queue_entries]
-        self._job_acls = self._get_job_acl_groups(relevant_jobs)
-        self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
-        self._job_dependencies = self._get_job_dependencies(relevant_jobs)
-
-        host_ids = self._hosts_available.keys()
-        self._host_acls = self._get_host_acls(host_ids)
-        self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
-
-        self._labels = self._get_labels()
-
-
-    def tick(self):
-        for metahost_scheduler in self._metahost_schedulers:
-            metahost_scheduler.tick()
-
-
-    def hosts_in_label(self, label_id):
-        return set(self._label_hosts.get(label_id, ()))
-
-
-    def remove_host_from_label(self, host_id, label_id):
-        self._label_hosts[label_id].remove(host_id)
-
-
-    def pop_host(self, host_id):
-        return self._hosts_available.pop(host_id)
-
-
-    def ineligible_hosts_for_entry(self, queue_entry):
-        return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
-
-
-    def _is_acl_accessible(self, host_id, queue_entry):
-        job_acls = self._job_acls.get(queue_entry.job_id, set())
-        host_acls = self._host_acls.get(host_id, set())
-        return len(host_acls.intersection(job_acls)) > 0
-
-
-    def _check_job_dependencies(self, job_dependencies, host_labels):
-        missing = job_dependencies - host_labels
-        return len(missing) == 0
-
-
-    def _check_only_if_needed_labels(self, job_dependencies, host_labels,
-                                     queue_entry):
-        if not queue_entry.meta_host:
-            # bypass only_if_needed labels when a specific host is selected
-            return True
-
-        for label_id in host_labels:
-            label = self._labels[label_id]
-            if not label.only_if_needed:
-                # we don't care about non-only_if_needed labels
-                continue
-            if queue_entry.meta_host == label_id:
-                # if the label was requested in a metahost it's OK
-                continue
-            if label_id not in job_dependencies:
-                return False
-        return True
-
-
-    def _check_atomic_group_labels(self, host_labels, queue_entry):
-        """
-        Determine if the given HostQueueEntry's atomic group settings are okay
-        to schedule on a host with the given labels.
-
-        @param host_labels: A list of label ids that the host has.
-        @param queue_entry: The HostQueueEntry being considered for the host.
-
-        @returns True if atomic group settings are okay, False otherwise.
-        """
-        return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
-                queue_entry.atomic_group_id)
-
-
-    def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
-        """
-        Return the atomic group label id for a host with the given set of
-        labels if any, or None otherwise.  Raises an exception if more than
-        one atomic group are found in the set of labels.
-
-        @param host_labels: A list of label ids that the host has.
-        @param queue_entry: The HostQueueEntry we're testing.  Only used for
-                extra info in a potential logged error message.
-
-        @returns The id of the atomic group found on a label in host_labels
-                or None if no atomic group label is found.
-        """
-        atomic_labels = [self._labels[label_id] for label_id in host_labels
-                         if self._labels[label_id].atomic_group_id is not None]
-        atomic_ids = set(label.atomic_group_id for label in atomic_labels)
-        if not atomic_ids:
-            return None
-        if len(atomic_ids) > 1:
-            logging.error('More than one Atomic Group on HQE "%s" via: %r',
-                          queue_entry, atomic_labels)
-        return atomic_ids.pop()
-
-
-    def _get_atomic_group_labels(self, atomic_group_id):
-        """
-        Lookup the label ids that an atomic_group is associated with.
-
-        @param atomic_group_id - The id of the AtomicGroup to look up.
-
-        @returns A generator yeilding Label ids for this atomic group.
-        """
-        return (id for id, label in self._labels.iteritems()
-                if label.atomic_group_id == atomic_group_id
-                and not label.invalid)
-
-
-    def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
-        """
-        @param group_hosts - A sequence of Host ids to test for usability
-                and eligibility against the Job associated with queue_entry.
-        @param queue_entry - The HostQueueEntry that these hosts are being
-                tested for eligibility against.
-
-        @returns A subset of group_hosts Host ids that are eligible for the
-                supplied queue_entry.
-        """
-        return set(host_id for host_id in group_hosts
-                   if self.is_host_usable(host_id)
-                   and self.is_host_eligible_for_job(host_id, queue_entry))
-
-
-    def is_host_eligible_for_job(self, host_id, queue_entry):
-        if self._is_host_invalid(host_id):
-            # if an invalid host is scheduled for a job, it's a one-time host
-            # and it therefore bypasses eligibility checks. note this can only
-            # happen for non-metahosts, because invalid hosts have their label
-            # relationships cleared.
-            return True
-
-        job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
-        host_labels = self._host_labels.get(host_id, set())
-
-        return (self._is_acl_accessible(host_id, queue_entry) and
-                self._check_job_dependencies(job_dependencies, host_labels) and
-                self._check_only_if_needed_labels(
-                    job_dependencies, host_labels, queue_entry) and
-                self._check_atomic_group_labels(host_labels, queue_entry))
-
-
-    def _is_host_invalid(self, host_id):
-        host_object = self._hosts_available.get(host_id, None)
-        return host_object and host_object.invalid
-
-
-    def _schedule_non_metahost(self, queue_entry):
-        if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
-            return None
-        return self._hosts_available.pop(queue_entry.host_id, None)
-
-
-    def is_host_usable(self, host_id):
-        if host_id not in self._hosts_available:
-            # host was already used during this scheduling cycle
-            return False
-        if self._hosts_available[host_id].invalid:
-            # Invalid hosts cannot be used for metahosts.  They're included in
-            # the original query because they can be used by non-metahosts.
-            return False
-        return True
-
-
-    def schedule_entry(self, queue_entry):
-        if queue_entry.host_id is not None:
-            return self._schedule_non_metahost(queue_entry)
-
-        for scheduler in self._metahost_schedulers:
-            if scheduler.can_schedule_metahost(queue_entry):
-                scheduler.schedule_metahost(queue_entry, self)
-                return None
-
-        raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
-
-
-    def find_eligible_atomic_group(self, queue_entry):
-        """
-        Given an atomic group host queue entry, locate an appropriate group
-        of hosts for the associated job to run on.
-
-        The caller is responsible for creating new HQEs for the additional
-        hosts returned in order to run the actual job on them.
-
-        @returns A list of Host instances in a ready state to satisfy this
-                atomic group scheduling.  Hosts will all belong to the same
-                atomic group label as specified by the queue_entry.
-                An empty list will be returned if no suitable atomic
-                group could be found.
-
-        TODO(gps): what is responsible for kicking off any attempted repairs on
-        a group of hosts?  not this function, but something needs to.  We do
-        not communicate that reason for returning [] outside of here...
-        For now, we'll just be unschedulable if enough hosts within one group
-        enter Repair Failed state.
-        """
-        assert queue_entry.atomic_group_id is not None
-        job = queue_entry.job
-        assert job.synch_count and job.synch_count > 0
-        atomic_group = queue_entry.atomic_group
-        if job.synch_count > atomic_group.max_number_of_machines:
-            # Such a Job and HostQueueEntry should never be possible to
-            # create using the frontend.  Regardless, we can't process it.
-            # Abort it immediately and log an error on the scheduler.
-            queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
-            logging.error(
-                'Error: job %d synch_count=%d > requested atomic_group %d '
-                'max_number_of_machines=%d.  Aborted host_queue_entry %d.',
-                job.id, job.synch_count, atomic_group.id,
-                 atomic_group.max_number_of_machines, queue_entry.id)
-            return []
-        hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
-        ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
-
-        # Look in each label associated with atomic_group until we find one with
-        # enough hosts to satisfy the job.
-        for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
-            group_hosts = set(self.hosts_in_label(atomic_label_id))
-            if queue_entry.meta_host is not None:
-                # If we have a metahost label, only allow its hosts.
-                group_hosts.intersection_update(hosts_in_label)
-            group_hosts -= ineligible_host_ids
-            eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
-                    group_hosts, queue_entry)
-
-            # Job.synch_count is treated as "minimum synch count" when
-            # scheduling for an atomic group of hosts.  The atomic group
-            # number of machines is the maximum to pick out of a single
-            # atomic group label for scheduling at one time.
-            min_hosts = job.synch_count
-            max_hosts = atomic_group.max_number_of_machines
-
-            if len(eligible_host_ids_in_group) < min_hosts:
-                # Not enough eligible hosts in this atomic group label.
-                continue
-
-            eligible_hosts_in_group = [self._hosts_available[id]
-                                       for id in eligible_host_ids_in_group]
-            # So that they show up in a sane order when viewing the job.
-            eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
-
-            # Limit ourselves to scheduling the atomic group size.
-            if len(eligible_hosts_in_group) > max_hosts:
-                eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
-
-            # Remove the selected hosts from our cached internal state
-            # of available hosts in order to return the Host objects.
-            host_list = []
-            for host in eligible_hosts_in_group:
-                hosts_in_label.discard(host.id)
-                self._hosts_available.pop(host.id)
-                host_list.append(host)
-            return host_list
-
-        return []
-
-
-site_host_scheduler = utils.import_site_class(__file__,
-                                  "autotest_lib.scheduler.site_host_scheduler",
-                                  "site_host_scheduler", BaseHostScheduler)
-
-
-class HostScheduler(site_host_scheduler):
-    pass
-
-
 class Dispatcher(object):
     def __init__(self):
         self._agents = []
         self._last_clean_time = time.time()
-        self._host_scheduler = HostScheduler()
+        self._host_scheduler = host_scheduler.HostScheduler(_db)
         user_cleanup_time = scheduler_config.config.clean_interval
         self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
             _db, user_cleanup_time)
@@ -867,9 +446,9 @@
         if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
             return ArchiveResultsTask(queue_entries=task_entries)
 
-        raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
-                             'invalid status %s: %s'
-                             % (queue_entry.status, queue_entry))
+        raise host_scheduler.SchedulerError(
+                '_get_agent_task_for_queue_entry got entry with '
+                'invalid status %s: %s' % (queue_entry.status, queue_entry))
 
 
     def _check_for_duplicate_host_entries(self, task_entries):
@@ -888,7 +467,7 @@
         """
         if self.host_has_agent(entry.host):
             agent = tuple(self._host_agents.get(entry.host.id))[0]
-            raise SchedulerError(
+            raise host_scheduler.SchedulerError(
                     'While scheduling %s, host %s already has a host agent %s'
                     % (entry, entry.host, agent.task))
 
@@ -907,7 +486,8 @@
             if agent_task_class.TASK_TYPE == special_task.task:
                 return agent_task_class(task=special_task)
 
-        raise SchedulerError('No AgentTask class for task', str(special_task))
+        raise host_scheduler.SchedulerError(
+                'No AgentTask class for task', str(special_task))
 
 
     def _register_pidfiles(self, agent_tasks):
@@ -972,7 +552,7 @@
 
         if unrecovered_hqes:
             message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
-            raise SchedulerError(
+            raise host_scheduler.SchedulerError(
                     '%d unrecovered verifying host queue entries:\n%s' %
                     (len(unrecovered_hqes), message))
 
@@ -1781,16 +1361,17 @@
         class_name = self.__class__.__name__
         for entry in queue_entries:
             if entry.status not in allowed_hqe_statuses:
-                raise SchedulerError('%s attempting to start '
-                                     'entry with invalid status %s: %s'
-                                     % (class_name, entry.status, entry))
+                raise host_scheduler.SchedulerError(
+                        '%s attempting to start entry with invalid status %s: '
+                        '%s' % (class_name, entry.status, entry))
             invalid_host_status = (
                     allowed_host_statuses is not None
                     and entry.host.status not in allowed_host_statuses)
             if invalid_host_status:
-                raise SchedulerError('%s attempting to start on queue '
-                                     'entry with invalid host status %s: %s'
-                                     % (class_name, entry.host.status, entry))
+                raise host_scheduler.SchedulerError(
+                        '%s attempting to start on queue entry with invalid '
+                        'host status %s: %s'
+                        % (class_name, entry.host.status, entry))
 
 
 class TaskWithJobKeyvals(object):