Add the concept of an Atomic Group to the scheduler and database.

Scheduling a job on an atomic group means that all of the Ready machines
(up to a maximum specified in the atomic group) in a single label associated
with that atomic group will be used to run the job.

The job synch_count becomes a minimum when scheduling on an atomic group.

Both HostQueueEntrys and Labels may have an AtomicGroup associated with
them:

* A HostQueueEntry with an AtomicGroup acts to schedule a job on
  all Ready machines of a single Label associated with that AtomicGroup.

* A Label with an AtomicGroup means that any Hosts bearing that Label
  may only be scheduled together as a group with other hosts of that Label
  to satisify a Job's HostQueueEntry bearing the same AtomicGroup.
  Such Hosts will never be scheduled as normal metahosts.

Future patches are coming that will add the ability to schedule jobs using
this feature to the RPC interface, CLI and GUI.

Signed-off-by: Gregory Smith <gps@google.com>



git-svn-id: http://test.kernel.org/svn/autotest/trunk@2878 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 02356d6..388dfe5 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -5,7 +5,7 @@
 """
 
 
-import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
+import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
 import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
 import itertools, logging, weakref
 import common
@@ -193,6 +193,10 @@
     return qe
 
 
+class SchedulerError(Exception):
+    """Raised by HostScheduler when an inconsistent state occurs."""
+
+
 class HostScheduler(object):
     def _get_ready_hosts(self):
         # avoid any host with a currently active queue entry against it
@@ -224,7 +228,7 @@
     def _process_many2many_dict(rows, flip=False):
         result = {}
         for row in rows:
-            left_id, right_id = long(row[0]), long(row[1])
+            left_id, right_id = int(row[0]), int(row[1])
             if flip:
                 left_id, right_id = right_id, left_id
             result.setdefault(left_id, set()).add(right_id)
@@ -317,7 +321,7 @@
 
     def _check_job_dependencies(self, job_dependencies, host_labels):
         missing = job_dependencies - host_labels
-        return len(job_dependencies - host_labels) == 0
+        return len(missing) == 0
 
 
     def _check_only_if_needed_labels(self, job_dependencies, host_labels,
@@ -339,15 +343,79 @@
         return True
 
 
+    def _check_atomic_group_labels(self, host_labels, queue_entry):
+        """
+        Determine if the given HostQueueEntry's atomic group settings are okay
+        to schedule on a host with the given labels.
+
+        @param host_labels - A list of label ids that the host has.
+        @param queue_entry - The HostQueueEntry being considered for the host.
+
+        @returns True if atomic group settings are okay, False otherwise.
+        """
+        return (self._get_host_atomic_group_id(host_labels) ==
+                queue_entry.atomic_group_id)
+
+
+    def _get_host_atomic_group_id(self, host_labels):
+        """
+        Return the atomic group label id for a host with the given set of
+        labels if any, or None otherwise.  Raises an exception if more than
+        one atomic group are found in the set of labels.
+
+        @param host_labels - A list of label ids that the host has.
+
+        @returns The id of the atomic group found on a label in host_labels
+                or None if no atomic group label is found.
+        @raises SchedulerError - If more than one atomic group label is found.
+        """
+        atomic_ids = [self._labels[label_id].atomic_group_id
+                      for label_id in host_labels
+                      if self._labels[label_id].atomic_group_id is not None]
+        if not atomic_ids:
+            return None
+        if len(atomic_ids) > 1:
+            raise SchedulerError('More than one atomic label on host.')
+        return atomic_ids[0]
+
+
+    def _get_atomic_group_labels(self, atomic_group_id):
+        """
+        Lookup the label ids that an atomic_group is associated with.
+
+        @param atomic_group_id - The id of the AtomicGroup to look up.
+
+        @returns A generator yeilding Label ids for this atomic group.
+        """
+        return (id for id, label in self._labels.iteritems()
+                if label.atomic_group_id == atomic_group_id
+                and not label.invalid)
+
+
+    def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
+        """
+        @param group_hosts - A sequence of Host ids to test for usability
+                and eligibility against the Job associated with queue_entry.
+        @param queue_entry - The HostQueueEntry that these hosts are being
+                tested for eligibility against.
+
+        @returns A subset of group_hosts Host ids that are eligible for the
+                supplied queue_entry.
+        """
+        return set(host_id for host_id in group_hosts
+                   if self._is_host_usable(host_id)
+                   and self._is_host_eligible_for_job(host_id, queue_entry))
+
+
     def _is_host_eligible_for_job(self, host_id, queue_entry):
         job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
         host_labels = self._host_labels.get(host_id, set())
 
-        acl = self._is_acl_accessible(host_id, queue_entry)
-        deps = self._check_job_dependencies(job_dependencies, host_labels)
-        only_if = self._check_only_if_needed_labels(job_dependencies,
-                                                    host_labels, queue_entry)
-        return acl and deps and only_if
+        return (self._is_acl_accessible(host_id, queue_entry) and
+                self._check_job_dependencies(job_dependencies, host_labels) and
+                self._check_only_if_needed_labels(
+                    job_dependencies, host_labels, queue_entry) and
+                self._check_atomic_group_labels(host_labels, queue_entry))
 
 
     def _schedule_non_metahost(self, queue_entry):
@@ -383,6 +451,8 @@
             if not self._is_host_eligible_for_job(host_id, queue_entry):
                 continue
 
+            # Remove the host from our cached internal state before returning
+            # the host object.
             hosts_in_label.remove(host_id)
             return self._hosts_available.pop(host_id)
         return None
@@ -390,10 +460,88 @@
 
     def find_eligible_host(self, queue_entry):
         if not queue_entry.meta_host:
+            assert queue_entry.host_id is not None
             return self._schedule_non_metahost(queue_entry)
+        assert queue_entry.atomic_group_id is None
         return self._schedule_metahost(queue_entry)
 
 
+    def find_eligible_atomic_group(self, queue_entry):
+        """
+        Given an atomic group host queue entry, locate an appropriate group
+        of hosts for the associated job to run on.
+
+        The caller is responsible for creating new HQEs for the additional
+        hosts returned in order to run the actual job on them.
+
+        @returns A list of Host instances in a ready state to satisfy this
+                atomic group scheduling.  Hosts will all belong to the same
+                atomic group label as specified by the queue_entry.
+                An empty list will be returned if no suitable atomic
+                group could be found.
+
+        TODO(gps): what is responsible for kicking off any attempted repairs on
+        a group of hosts?  not this function, but something needs to.  We do
+        not communicate that reason for returning [] outside of here...
+        For now, we'll just be unschedulable if enough hosts within one group
+        enter Repair Failed state.
+        """
+        assert queue_entry.atomic_group_id is not None
+        job = queue_entry.job
+        assert job.synch_count and job.synch_count > 0
+        atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
+        if job.synch_count > atomic_group.max_number_of_machines:
+            # Such a Job and HostQueueEntry should never be possible to
+            # create using the frontend.  Regardless, we can't process it.
+            # Abort it immediately and log an error on the scheduler.
+            queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
+            bprint('Error: job %d synch_count=%d > requested atomic_group %d '
+                   'max_number_of_machines=%d.  Aborted host_queue_entry %d.' %
+                   (job.id, job.synch_count, atomic_group.id,
+                    atomic_group.max_number_of_machines, queue_entry.id))
+            return []
+        hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
+        ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
+                                                         set())
+
+        # Look in each label associated with atomic_group until we find one with
+        # enough hosts to satisfy the job.
+        for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
+            group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
+            if queue_entry.meta_host is not None:
+                # If we have a metahost label, only allow its hosts.
+                group_hosts.intersection_update(hosts_in_label)
+            group_hosts -= ineligible_host_ids
+            eligible_hosts_in_group = self._get_eligible_hosts_in_group(
+                    group_hosts, queue_entry)
+
+            # Job.synch_count is treated as "minimum synch count" when
+            # scheduling for an atomic group of hosts.  The atomic group
+            # number of machines is the maximum to pick out of a single
+            # atomic group label for scheduling at one time.
+            min_hosts = job.synch_count
+            max_hosts = atomic_group.max_number_of_machines
+
+            if len(eligible_hosts_in_group) < min_hosts:
+                # Not enough eligible hosts in this atomic group label.
+                continue
+
+            # Limit ourselves to scheduling the atomic group size.
+            if len(eligible_hosts_in_group) > max_hosts:
+                eligible_hosts_in_group = random.sample(
+                        eligible_hosts_in_group, max_hosts)
+
+            # Remove the selected hosts from our cached internal state
+            # of available hosts in order to return the Host objects.
+            host_list = []
+            for host_id in eligible_hosts_in_group:
+                hosts_in_label.discard(host_id)
+                host_list.append(self._hosts_available.pop(host_id))
+            return host_list
+
+        return []
+
+
 class Dispatcher(object):
     def __init__(self):
         self._agents = []
@@ -677,18 +825,64 @@
             order_by='jobs.priority DESC, meta_host, job_id'))
 
 
-    def _schedule_new_jobs(self):
+    def _refresh_pending_queue_entries(self):
+        """
+        Lookup the pending HostQueueEntries and call our HostScheduler
+        refresh() method given that list.  Return the list.
+
+        @returns A list of pending HostQueueEntries sorted in priority order.
+        """
         queue_entries = self._get_pending_queue_entries()
         if not queue_entries:
-            return
+            return []
 
         self._host_scheduler.refresh(queue_entries)
 
+        return queue_entries
+
+
+    def _schedule_atomic_group(self, queue_entry):
+        """
+        Schedule the given queue_entry on an atomic group of hosts.
+
+        Returns immediately if there are insufficient available hosts.
+
+        Creates new HostQueueEntries based off of queue_entry for the
+        scheduled hosts and starts them all running.
+        """
+        # This is a virtual host queue entry representing an entire
+        # atomic group, find a group and schedule their hosts.
+        group_hosts = self._host_scheduler.find_eligible_atomic_group(
+                queue_entry)
+        if not group_hosts:
+            return
+        # The first assigned host uses the original HostQueueEntry
+        group_queue_entries = [queue_entry]
+        for assigned_host in group_hosts[1:]:
+            # Create a new HQE for every additional assigned_host.
+            new_hqe = HostQueueEntry.clone(queue_entry)
+            new_hqe.save()
+            group_queue_entries.append(new_hqe)
+        assert len(group_queue_entries) == len(group_hosts)
+        for queue_entry, host in itertools.izip(group_queue_entries,
+                                                group_hosts):
+            self._run_queue_entry(queue_entry, host)
+
+
+    def _schedule_new_jobs(self):
+        queue_entries = self._refresh_pending_queue_entries()
+        if not queue_entries:
+            return
+
         for queue_entry in queue_entries:
-            assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
-            if not assigned_host:
-                continue
-            self._run_queue_entry(queue_entry, assigned_host)
+            if (queue_entry.atomic_group_id is None or
+                queue_entry.host_id is not None):
+                assigned_host = self._host_scheduler.find_eligible_host(
+                        queue_entry)
+                if assigned_host:
+                    self._run_queue_entry(queue_entry, assigned_host)
+            else:
+                self._schedule_atomic_group(queue_entry)
 
 
     def _run_queue_entry(self, queue_entry, host):
@@ -1707,10 +1901,12 @@
             keys = self._fields[1:] # avoid id
             columns = ','.join([str(key) for key in keys])
             values = ['"%s"' % self.__dict__[key] for key in keys]
-            values = ','.join(values)
-            query = """INSERT INTO %s (%s) VALUES (%s)""" % \
-                                    (self.__table, columns, values)
+            values_str = ','.join(values)
+            query = ('INSERT INTO %s (%s) VALUES (%s)' %
+                     (self.__table, columns, values_str))
             _db.execute(query)
+            # Update our id to the one the database just assigned to us.
+            self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
 
 
     def delete(self):
@@ -1730,6 +1926,11 @@
 
     @classmethod
     def fetch(cls, where='', params=(), joins='', order_by=''):
+        """
+        Construct instances of our class based on the given database query.
+
+        @yields One class instance for each row fetched.
+        """
         order_by = cls._prefix_with(order_by, 'ORDER BY ')
         where = cls._prefix_with(where, 'WHERE ')
         query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
@@ -1747,10 +1948,15 @@
     _fields = ('id', 'job_id', 'host_id')
 
 
+class AtomicGroup(DBObject):
+    _table_name = 'atomic_groups'
+    _fields = ('id', 'name', 'description', 'max_number_of_machines')
+
+
 class Label(DBObject):
     _table_name = 'labels'
     _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
-               'only_if_needed')
+               'only_if_needed', 'atomic_group_id')
 
 
 class Host(DBObject):
@@ -1814,7 +2020,8 @@
 class HostQueueEntry(DBObject):
     _table_name = 'host_queue_entries'
     _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
-               'active', 'complete', 'deleted', 'execution_subdir')
+               'active', 'complete', 'deleted', 'execution_subdir',
+               'atomic_group_id')
 
 
     def __init__(self, id=None, row=None, **kwargs):
@@ -1831,6 +2038,21 @@
                                            'queue.log.' + str(self.id))
 
 
+    @classmethod
+    def clone(cls, template):
+        """
+        Creates a new row using the values from a template instance.
+
+        The new instance will not exist in the database or have a valid
+        id attribute until its save() method is called.
+        """
+        assert isinstance(template, cls)
+        new_row = [getattr(template, field) for field in cls._fields]
+        clone = cls(row=new_row, new_record=True)
+        clone.id = None
+        return clone
+
+
     def _view_job_url(self):
         return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
 
@@ -1959,14 +2181,15 @@
         email_manager.manager.send_email(self.job.email_list, subject, body)
 
 
-    def run(self,assigned_host=None):
-        if self.meta_host:
+    def run(self, assigned_host=None):
+        if self.meta_host is not None or self.atomic_group_id is not None:
             assert assigned_host
             # ensure results dir exists for the queue log
             self.set_host(assigned_host)
 
-        print "%s/%s scheduled on %s, status=%s" % (self.job.name,
-                        self.meta_host, self.host.hostname, self.status)
+        print "%s/%s/%s scheduled on %s, status=%s" % (
+                self.job.name, self.meta_host, self.atomic_group_id,
+                self.host.hostname, self.status)
 
         return self.job.run(queue_entry=self)