[autotest] Sanity check host assignments.

Check that we haven't violated any correctness constraints by
assigning the same host to 2 simultaneously active jobs. These
changes are in preperation for eventually breaking host assignment
out of the scheduler. The performance degradation should be negligable
since we're only querying for the host_ids of currently active jobs,
every 5 minutes.

TEST=Ran suites, unittests.
BUG=None
DEPLOY=Scheduler

Change-Id: Ie560a67861f9e4d1d59cda9828fb9d2ef433e5f4
Reviewed-on: https://chromium-review.googlesource.com/198196
Reviewed-by: Prashanth B <beeps@chromium.org>
Tested-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
diff --git a/scheduler/monitor_db_cleanup.py b/scheduler/monitor_db_cleanup.py
index 109096a..b8d1b32 100644
--- a/scheduler/monitor_db_cleanup.py
+++ b/scheduler/monitor_db_cleanup.py
@@ -3,7 +3,7 @@
 """
 
 
-import datetime, time, logging, random
+import collections, datetime, time, logging, random
 from autotest_lib.database import database_connection
 from autotest_lib.frontend.afe import models
 from autotest_lib.scheduler import email_manager, scheduler_config
@@ -58,6 +58,7 @@
         self._abort_jobs_past_max_runtime()
         self._clear_inactive_blocks()
         self._check_for_db_inconsistencies()
+        self._check_host_assignments()
         self._reverify_dead_hosts()
 
 
@@ -92,6 +93,42 @@
             queue_entry.abort()
 
 
+    @classmethod
+    def get_overlapping_jobs(cls):
+        """A helper method to get all active jobs using the same host.
+
+        @return: A list of dictionaries with the hqe id, job_id and host_id
+            of the currently overlapping jobs.
+        """
+        # Filter all active hqes and stand alone special tasks to make sure
+        # a host isn't being used by two jobs at the same time. An incomplete
+        # stand alone special task can share a host with an active hqe, an
+        # example of this is the cleanup scheduled in gathering.
+        hqe_hosts = list(models.HostQueueEntry.objects.filter(
+                active=1, complete=0, host_id__isnull=False).values_list(
+                'host_id', flat=True))
+        special_task_hosts = list(models.SpecialTask.objects.filter(
+            is_active=1, is_complete=0, host_id__isnull=False,
+            queue_entry_id__isnull=True).values_list('host_id', flat=True))
+        host_counts = collections.Counter(
+                hqe_hosts + special_task_hosts).most_common()
+        multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
+        return list(models.HostQueueEntry.objects.filter(
+                host_id__in=multiple_hosts).values('id', 'job_id', 'host_id'))
+
+
+    @timer.decorate
+    def _check_host_assignments(self):
+        """Sanity check the current host assignments."""
+        message = ''
+        subject = 'Unexpected host assignments'
+        for offending_job in self.get_overlapping_jobs():
+            message += ('HQE %s is using a host in use by another job.\n' %
+                        offending_job)
+        if message:
+            email_manager.manager.enqueue_notify_email(subject, message)
+
+
     @timer.decorate
     def _check_for_db_inconsistencies(self):
         logging.info('Cleaning db inconsistencies')