[autotest] Add heartbeat AFE endpoint to shard Autotest

To improve the workload the autotest setup can handle, jobs should
be executed by multiple shards. They are sharded by type of board.

This a HTTP accessible endpoint to AFE. This assigns jobs and hosts
to moblabs and returns them as records to insert into a shard's
local database.

TEST=Ran suites and tried manually retrieving jobs
DEPLOY=scheduler,apache,host-scheduler

Change-Id: I20ac41364e4c67e17883729181af798e18d8093e
Reviewed-on: https://chromium-review.googlesource.com/217968
Reviewed-by: Jakob Jülich <jakobjuelich@chromium.org>
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Jakob Jülich <jakobjuelich@chromium.org>
Tested-by: Jakob Jülich <jakobjuelich@chromium.org>
diff --git a/frontend/afe/json_rpc/serviceHandler.py b/frontend/afe/json_rpc/serviceHandler.py
index b0d9e84..a66bb0d 100644
--- a/frontend/afe/json_rpc/serviceHandler.py
+++ b/frontend/afe/json_rpc/serviceHandler.py
@@ -21,9 +21,23 @@
 
 import traceback
 
-from json import decoder, encoder
+from json import decoder
+
+try:
+    from django.core import exceptions as django_exceptions
+    # Django JSON encoder uses the standard json encoder but can handle DateTime
+    from django.core.serializers import json as django_encoder
+    json_encoder = django_encoder.DjangoJSONEncoder()
+except django_exceptions.ImproperlyConfigured:
+    from json import encoder
+    json_encoder = encoder.JSONEncoder()
+
 from autotest_lib.client.common_lib.cros.graphite import stats
 
+
+json_decoder = decoder.JSONDecoder()
+
+
 def customConvertJson(value):
     """\
     Recursively process JSON values and do type conversions.
@@ -46,9 +60,6 @@
     else:
         return value
 
-json_encoder = encoder.JSONEncoder()
-json_decoder = decoder.JSONDecoder()
-
 
 def ServiceMethod(fn):
     fn.IsServiceMethod = True
diff --git a/frontend/afe/json_rpc/serviceHandler_unittest.py b/frontend/afe/json_rpc/serviceHandler_unittest.py
index b8d24be..1429f45 100755
--- a/frontend/afe/json_rpc/serviceHandler_unittest.py
+++ b/frontend/afe/json_rpc/serviceHandler_unittest.py
@@ -2,6 +2,7 @@
 
 import unittest
 import common
+from autotest_lib.frontend import setup_django_environment
 import serviceHandler
 
 
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index fbfc521..60ddb52 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -431,6 +431,43 @@
         return host
 
 
+    @classmethod
+    def assign_to_shard(cls, shard):
+        """Assigns hosts to a shard.
+
+        This function will check which labels are associated with the given
+        shard. It will assign those hosts, that have labels that are assigned
+        to the shard and haven't been returned to shard earlier.
+
+        @param shard: The shard object to assign labels/hosts for.
+        @returns the hosts objects that should be sent to the shard.
+        """
+
+        # Disclaimer: concurrent heartbeats should theoretically not occur in
+        # the current setup. As they may be introduced in the near future,
+        # this comment will be left here.
+
+        # Sending stuff twice is acceptable, but forgetting something isn't.
+        # Detecting duplicates on the client is easy, but here it's harder. The
+        # following options were considered:
+        # - SELECT ... WHERE and then UPDATE ... WHERE: Update might update more
+        #   than select returned, as concurrently more hosts might have been
+        #   inserted
+        # - UPDATE and then SELECT WHERE shard=shard: select always returns all
+        #   hosts for the shard, this is overhead
+        # - SELECT and then UPDATE only selected without requerying afterwards:
+        #   returns the old state of the records.
+        host_ids = list(Host.objects.filter(
+            shard=None,
+            labels=shard.labels.all(),
+            leased=False
+            ).values_list('pk', flat=True))
+
+        if host_ids:
+            Host.objects.filter(pk__in=host_ids).update(shard=shard)
+            return list(Host.objects.filter(pk__in=host_ids).all())
+        return []
+
     def resurrect_object(self, old_object):
         super(Host, self).resurrect_object(old_object)
         # invalid hosts can be in use by the scheduler (as one-time hosts), so
@@ -1246,6 +1283,33 @@
         return job
 
 
+    @classmethod
+    def assign_to_shard(cls, shard):
+        """Assigns unassigned jobs to a shard.
+
+        All jobs that have the platform label that was assigned to the given
+        shard are assigned to the shard and returned.
+        @param shard: The shard to assign jobs to.
+        @returns The job objects that should be sent to the shard.
+        """
+        # Disclaimer: Concurrent heartbeats should not occur in today's setup.
+        # If this changes or they are triggered manually, this applies:
+        # Jobs may be returned more than once by concurrent calls of this
+        # function, as there is a race condition between SELECT and UPDATE.
+        job_ids = list(Job.objects.filter(
+            shard=None,
+            dependency_labels=shard.labels.all()
+            ).exclude(
+            hostqueueentry__complete=True
+            ).exclude(
+            hostqueueentry__active=True
+            ).values_list('pk', flat=True))
+        if job_ids:
+            Job.objects.filter(pk__in=job_ids).update(shard=shard)
+            return list(Job.objects.filter(pk__in=job_ids).all())
+        return []
+
+
     def save(self, *args, **kwargs):
         # The current implementation of parameterized jobs requires that only
         # control files or parameterized jobs are used. Using the image
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 687ab15..f1f6ec5 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -926,4 +926,30 @@
                                    start_time, end_time, **filter_data_common),
            inject_times_to_filter('time_started__gte', 'time_started__lte',
                                   start_time, end_time,
-                                  **filter_data_special_tasks))
\ No newline at end of file
+                                  **filter_data_special_tasks))
+
+
+def retrieve_shard(shard_hostname):
+    """
+    Retrieves the shard with the given hostname from the database or creates it.
+
+    @param shard_hostname: Hostname of the shard to retrieve
+
+    @returns: Shard object
+    """
+    try:
+        shard = models.Shard.smart_get(shard_hostname)
+    except models.Shard.DoesNotExist:
+        shard = models.Shard.objects.create(hostname=shard_hostname)
+    return shard
+
+
+def find_records_for_shard(shard):
+    """Find records that should be sent to a shard.
+
+    @returns: Tuple of two lists for hosts and jobs: (hosts, jobs)
+    """
+    hosts = models.Host.assign_to_shard(shard)
+    jobs = models.Job.assign_to_shard(shard)
+
+    return hosts, jobs
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
index 47810d4..765748d 100644
--- a/frontend/afe/site_rpc_interface.py
+++ b/frontend/afe/site_rpc_interface.py
@@ -11,13 +11,13 @@
 import logging
 import os
 import shutil
-import utils
 
 from autotest_lib.client.common_lib import error
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import priorities
 from autotest_lib.client.common_lib import time_utils
 from autotest_lib.client.common_lib.cros import dev_server
+from autotest_lib.client.common_lib.cros.graphite import stats
 from autotest_lib.frontend.afe import rpc_utils
 from autotest_lib.server import utils
 from autotest_lib.server.cros.dynamic_suite import constants
@@ -186,13 +186,13 @@
     control_file = tools.inject_vars(inject_dict, control_file)
 
     return rpc_utils.create_job_common(name,
-                                          priority=priority,
-                                          timeout_mins=timeout_mins,
-                                          max_runtime_mins=max_runtime_mins,
-                                          control_type='Server',
-                                          control_file=control_file,
-                                          hostless=True,
-                                          keyvals=timings)
+                                       priority=priority,
+                                       timeout_mins=timeout_mins,
+                                       max_runtime_mins=max_runtime_mins,
+                                       control_type='Server',
+                                       control_file=control_file,
+                                       hostless=True,
+                                       keyvals=timings)
 
 
 # TODO: hide the following rpcs under is_moblab
@@ -302,3 +302,20 @@
                     start_time=start_time, end_time=end_time,
                     hosts=hosts, board=board, pool=pool,
                     process_pool_size=4))
+
+
+def shard_heartbeat(shard_hostname):
+    """Register shard if it doesn't exist, then assign hosts and jobs.
+
+    @param shard_hostname: Hostname of the calling shard
+    @returns: Serialized representations of hosts, jobs and their dependencies
+              to be inserted into a shard's database.
+    """
+    timer = stats.Timer('shard_heartbeat')
+    with timer:
+        shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
+        hosts, jobs = rpc_utils.find_records_for_shard(shard_obj)
+        return {
+            'hosts': [host.serialize() for host in hosts],
+            'jobs': [job.serialize() for job in jobs],
+        }
diff --git a/frontend/afe/site_rpc_interface_unittest.py b/frontend/afe/site_rpc_interface_unittest.py
index 0048eca..c720d57 100644
--- a/frontend/afe/site_rpc_interface_unittest.py
+++ b/frontend/afe/site_rpc_interface_unittest.py
@@ -17,18 +17,24 @@
 import common
 
 from autotest_lib.frontend import setup_django_environment
-from autotest_lib.frontend.afe import rpc_utils
-from autotest_lib.client.common_lib import error
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.frontend.afe import models, rpc_utils
+from autotest_lib.client.common_lib import control_data, error
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import priorities
 from autotest_lib.client.common_lib.cros import dev_server
-from autotest_lib.frontend.afe import site_rpc_interface
+from autotest_lib.frontend.afe import rpc_interface, site_rpc_interface
 from autotest_lib.server import utils
 from autotest_lib.server.cros.dynamic_suite import control_file_getter
 from autotest_lib.server.cros.dynamic_suite import constants
 
 
-class SiteRpcInterfaceTest(mox.MoxTestBase):
+CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT
+SERVER = control_data.CONTROL_TYPE_NAMES.SERVER
+
+
+class SiteRpcInterfaceTest(mox.MoxTestBase,
+                           frontend_test_utils.FrontendTestMixin):
     """Unit tests for functions in site_rpc_interface.py.
 
     @var _NAME: fake suite name.
@@ -48,6 +54,11 @@
         self._SUITE_NAME = site_rpc_interface.canonicalize_suite_name(
             self._NAME)
         self.dev_server = self.mox.CreateMock(dev_server.ImageServer)
+        self._frontend_common_setup(fill_data=False)
+
+
+    def tearDown(self):
+        self._frontend_common_teardown()
 
 
     def _setupDevserver(self):
@@ -367,5 +378,135 @@
         site_rpc_interface.set_boto_key(boto_key)
 
 
+    def _do_heartbeat_and_assert_response(self, shard_hostname=None, **kwargs):
+        expected_shard_hostname = shard_hostname or str(
+            models.Shard.objects.count() + 1)
+        retval = site_rpc_interface.shard_heartbeat(
+            shard_hostname=expected_shard_hostname)
+
+        self._assert_shard_heartbeat_response(expected_shard_hostname, retval,
+                                            **kwargs)
+
+        return expected_shard_hostname
+
+
+    def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[],
+                                         hosts=[], hqes=[]):
+
+        retval_hosts, retval_jobs = retval['hosts'], retval['jobs']
+
+        expected_jobs = [
+            (job.id, job.name, int(shard_hostname)) for job in jobs]
+        returned_jobs = [(job['id'], job['name'], job['shard']['id'])
+                         for job in retval_jobs]
+        self.assertEqual(returned_jobs, expected_jobs)
+
+        expected_hosts = [(host.id, host.hostname) for host in hosts]
+        returned_hosts = [(host['id'], host['hostname'])
+                          for host in retval_hosts]
+        self.assertEqual(returned_hosts, expected_hosts)
+
+        retval_hqes = []
+        for job in retval_jobs:
+            retval_hqes += job['hostqueueentry_set']
+
+        expected_hqes = [(hqe.id) for hqe in hqes]
+        returned_hqes = [(hqe['id']) for hqe in retval_hqes]
+        self.assertEqual(returned_hqes, expected_hqes)
+
+
+    def testShardHeartbeatFetchHostlessJob(self):
+        models.Label.objects.create(name='board:lumpy', platform=True)
+        label2 = models.Label.objects.create(name='bluetooth', platform=False)
+
+        shard_hostname = self._do_heartbeat_and_assert_response()
+        shard = models.Shard.smart_get(shard_hostname)
+        shard.labels.add(models.Label.smart_get('board:lumpy'))
+
+        job1 = self._create_job(hostless=True)
+
+        # Hostless jobs should be executed by the global scheduler.
+        self._do_heartbeat_and_assert_response(
+            shard_hostname=shard_hostname)
+
+
+    def testShardRetrieveJobs(self):
+        host1, host2 = [models.Host.objects.create(
+            hostname=hostname, leased=False) for hostname in ['host1', 'host2']]
+
+        # should never be returned by heartbeat
+        leased_host = models.Host.objects.create(hostname='leased_host',
+                                                 leased=True)
+
+        lumpy_label = models.Label.objects.create(name='board:lumpy',
+                                                  platform=True)
+        grumpy_label = models.Label.objects.create(name='board:grumpy',
+                                                   platform=True)
+
+
+        host1.labels.add(lumpy_label)
+        leased_host.labels.add(lumpy_label)
+        host2.labels.add(grumpy_label)
+
+        shard_hostname1 = self._do_heartbeat_and_assert_response()
+        shard_hostname2 = self._do_heartbeat_and_assert_response()
+
+        shard1 = models.Shard.smart_get(shard_hostname1)
+        shard2 = models.Shard.smart_get(shard_hostname2)
+
+        shard1.labels.add(lumpy_label)
+        shard2.labels.add(grumpy_label)
+
+        job_id = rpc_interface.create_job(name='dummy', priority='Medium',
+                                          control_file='foo',
+                                          control_type=CLIENT,
+                                          meta_hosts=['board:lumpy'],
+                                          dependencies=('board:lumpy',),
+                                          test_retry=10)
+        job1 = models.Job.objects.get(id=job_id)
+        job_id = rpc_interface.create_job(name='dummy', priority='Medium',
+                                          control_file='foo',
+                                          control_type=CLIENT,
+                                          meta_hosts=['board:grumpy'],
+                                          dependencies=('board:grumpy',),
+                                          test_retry=10)
+
+        job2 = models.Job.objects.get(id=job_id)
+        job_id = rpc_interface.create_job(name='dummy', priority='Medium',
+                                          control_file='foo',
+                                          control_type=CLIENT,
+                                          meta_hosts=['board:lumpy'],
+                                          dependencies=('board:lumpy',),
+                                          test_retry=10)
+        job_completed = models.Job.objects.get(id=job_id)
+        # Job is obviously already run, so don't sync it
+        job_completed.hostqueueentry_set.update(complete=True)
+        job_completed.hostqueueentry_set.create(complete=False)
+        job_id = rpc_interface.create_job(name='dummy', priority='Medium',
+                                          control_file='foo',
+                                          control_type=CLIENT,
+                                          meta_hosts=['board:lumpy'],
+                                          dependencies=('board:lumpy',),
+                                          test_retry=10)
+        job_active = models.Job.objects.get(id=job_id)
+        # Job is obviously already started, so don't sync it
+        job_active.hostqueueentry_set.update(active=True)
+        job_active.hostqueueentry_set.create(complete=False, active=False)
+
+        self._do_heartbeat_and_assert_response(
+            shard_hostname=shard_hostname1, jobs=[job1], hosts=[host1],
+            hqes=job1.hostqueueentry_set.all())
+
+        self._do_heartbeat_and_assert_response(
+            shard_hostname=shard_hostname2, jobs=[job2], hosts=[host2],
+            hqes=job2.hostqueueentry_set.all())
+
+        host3 = models.Host.objects.create(hostname='host3', leased=False)
+        host3.labels.add(lumpy_label)
+
+        self._do_heartbeat_and_assert_response(
+            shard_hostname=shard_hostname1, jobs=[], hosts=[host3])
+
+
 if __name__ == '__main__':
   unittest.main()