[autotest] Endpoints for syncing jobs and hqes back to master

The status of jobs that were completed on a shard should be visible
in the master AFE, so a human can easily see which jobs have been run in one
central place.

This adds the functionality to the AFE to retrieve newer versions of
jobs and hqes from shards.

This is a newer version of CL:213901 which was reverted because of
http://crbug.com/418022.

BUG=418022
DEPLOY=apache, afe
TEST=Ran Suites

Change-Id: I3c9a35c78585a43d193874decbb7b16216b2c69a
Reviewed-on: https://chromium-review.googlesource.com/220141
Reviewed-by: Fang Deng <fdeng@chromium.org>
Tested-by: Jakob Jülich <jakobjuelich@chromium.org>
Commit-Queue: Jakob Jülich <jakobjuelich@chromium.org>
diff --git a/frontend/afe/json_rpc/proxy.py b/frontend/afe/json_rpc/proxy.py
index b9e1121..2c51727 100644
--- a/frontend/afe/json_rpc/proxy.py
+++ b/frontend/afe/json_rpc/proxy.py
@@ -19,9 +19,34 @@
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 """
 
+import os
 import urllib2
 from autotest_lib.client.common_lib import error as exceptions
 
+from json import decoder
+
+from json import encoder as json_encoder
+json_encoder_class = json_encoder.JSONEncoder
+
+
+# Try to upgrade to the Django JSON encoder. It uses the standard json encoder
+# but can handle DateTime
+try:
+    # See http://crbug.com/418022 too see why the try except is needed here.
+    from django import conf as django_conf
+    # The serializers can't be imported if django isn't configured.
+    # Using try except here doesn't work, as test_that initializes it's own
+    # django environment (setup_django_lite_environment) which raises import
+    # errors if the django dbutils have been previously imported, as importing
+    # them leaves some state behind.
+    # This the variable name must not be undefined or empty string.
+    if os.environ.get(django_conf.ENVIRONMENT_VARIABLE, None):
+        from django.core.serializers import json as django_encoder
+        json_encoder_class = django_encoder.DjangoJSONEncoder
+except ImportError:
+    pass
+
+
 class JSONRPCException(Exception):
     pass
 
@@ -74,13 +99,9 @@
         return ServiceProxy(self.__serviceURL, name, self.__headers)
 
     def __call__(self, *args, **kwargs):
-        # pull in json imports lazily so that the library isn't required
-        # unless you actually need to do encoding and decoding
-        from json import decoder, encoder
-
-        postdata = encoder.JSONEncoder().encode({"method": self.__serviceName,
+        postdata = json_encoder_class().encode({'method': self.__serviceName,
                                                 'params': args + (kwargs,),
-                                                'id':'jsonrpc'})
+                                                'id': 'jsonrpc'})
         request = urllib2.Request(self.__serviceURL, data=postdata,
                                   headers=self.__headers)
         respdata = urllib2.urlopen(request).read()
diff --git a/frontend/afe/model_logic.py b/frontend/afe/model_logic.py
index 6329bd0..c77c80a 100644
--- a/frontend/afe/model_logic.py
+++ b/frontend/afe/model_logic.py
@@ -1117,6 +1117,18 @@
         return instance
 
 
+    def sanity_check_update_from_shard(self, shard, updated_serialized,
+                                       *args, **kwargs):
+        """Check if an update sent from a shard is legitimate.
+
+        @raises error.UnallowedRecordsSentToMaster if an update is not
+                legitimate.
+        """
+        raise NotImplementedError(
+            'sanity_check_update_from_shard must be implemented by subclass %s '
+            'for type %s' % type(self))
+
+
     def update_from_serialized(self, serialized):
         """Updates local fields of an existing object from a serialized form.
 
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 5eea6d8..0b03110 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -16,10 +16,10 @@
 from autotest_lib.frontend.afe import model_logic, model_attributes
 from autotest_lib.frontend.afe import rdb_model_extensions
 from autotest_lib.frontend import settings, thread_local
-from autotest_lib.client.common_lib import enum, host_protections, global_config
+from autotest_lib.client.common_lib import enum, error, host_protections
+from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import host_queue_entry_states
-from autotest_lib.client.common_lib import control_data, priorities
-from autotest_lib.client.common_lib import decorators
+from autotest_lib.client.common_lib import control_data, priorities, decorators
 
 # job options and user preferences
 DEFAULT_REBOOT_BEFORE = model_attributes.RebootBefore.IF_DIRTY
@@ -1100,6 +1100,14 @@
         self.shard = Shard.deserialize(data)
 
 
+    def sanity_check_update_from_shard(self, shard, updated_serialized):
+        if not self.shard_id == shard.id:
+            raise error.UnallowedRecordsSentToMaster(
+                'Job id=%s is assigned to shard (%s). Cannot update it with %s '
+                'from shard %s.' % (self.id, self.shard_id, updated_serialized,
+                                    shard.id))
+
+
     # TIMEOUT is deprecated.
     DEFAULT_TIMEOUT = global_config.global_config.get_config_value(
         'AUTOTEST_WEB', 'job_timeout_default', default=24)
@@ -1426,6 +1434,14 @@
         self.meta_host = Label.deserialize(data)
 
 
+    def sanity_check_update_from_shard(self, shard, updated_serialized,
+                                       job_ids_sent):
+        if self.job_id not in job_ids_sent:
+            raise error.UnallowedRecordsSentToMaster(
+                'Sent HostQueueEntry without corresponding '
+                'job entry: %s' % updated_serialized)
+
+
     Status = host_queue_entry_states.Status
     ACTIVE_STATUSES = host_queue_entry_states.ACTIVE_STATUSES
     COMPLETE_STATUSES = host_queue_entry_states.COMPLETE_STATUSES
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 80756e3..dd8c963 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -945,9 +945,72 @@
 def find_records_for_shard(shard):
     """Find records that should be sent to a shard.
 
-    @returns: Tuple of two lists for hosts and jobs: (hosts, jobs)
+    @param shard: Shard to find records for.
+
+    @returns: Tuple of two lists for hosts and jobs: (hosts, jobs).
     """
     hosts = models.Host.assign_to_shard(shard)
     jobs = models.Job.assign_to_shard(shard)
 
     return hosts, jobs
+
+
+def _persist_records_with_type_sent_from_shard(
+    shard, records, record_type, *args, **kwargs):
+    """
+    Handle records of a specified type that were sent to the shard master.
+
+    @param shard: The shard the records were sent from.
+    @param records: The records sent in their serialized format.
+    @param record_type: Type of the objects represented by records.
+    @param args: Additional arguments that will be passed on to the sanity
+                 checks.
+    @param kwargs: Additional arguments that will be passed on to the sanity
+                  checks.
+
+    @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
+
+    @returns: List of primary keys of the processed records.
+    """
+    pks = []
+    for serialized_record in records:
+        pk = serialized_record['id']
+        try:
+            current_record = record_type.objects.get(pk=pk)
+        except record_type.DoesNotExist:
+            raise error.UnallowedRecordsSentToMaster(
+                'Object with pk %s of type %s does not exist on master.' % (
+                    pk, record_type))
+
+        current_record.sanity_check_update_from_shard(
+            shard, serialized_record, *args, **kwargs)
+
+        current_record.update_from_serialized(serialized_record)
+        pks.append(pk)
+    return pks
+
+
+def persist_records_sent_from_shard(shard, jobs, hqes):
+    """
+    Sanity checking then saving serialized records sent to master from shard.
+
+    During heartbeats shards upload jobs and hostqueuentries. This performs
+    some sanity checks on these and then updates the existing records for those
+    entries with the updated ones from the heartbeat.
+
+    The sanity checks include:
+    - Checking if the objects sent already exist on the master.
+    - Checking if the objects sent were assigned to this shard.
+    - hostqueueentries must be sent together with their jobs.
+
+    @param shard: The shard the records were sent from.
+    @param jobs: The jobs the shard sent.
+    @param hqes: The hostqueuentries the shart sent.
+
+    @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
+    """
+    job_ids_sent = _persist_records_with_type_sent_from_shard(
+        shard, jobs, models.Job)
+
+    _persist_records_with_type_sent_from_shard(
+        shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
index b7a945c..9e175d5 100644
--- a/frontend/afe/site_rpc_interface.py
+++ b/frontend/afe/site_rpc_interface.py
@@ -305,16 +305,23 @@
                     process_pool_size=4))
 
 
-def shard_heartbeat(shard_hostname):
-    """Register shard if it doesn't exist, then assign hosts and jobs.
+def shard_heartbeat(shard_hostname, jobs=(), hqes=()):
+    """Register shard if not existing, then assign hosts and jobs.
 
     @param shard_hostname: Hostname of the calling shard
+    @param jobs: Jobs in serialized form that should be updated with newer
+                 status from a shard.
+    @param hqes: Hostqueueentries in serialized form that should be updated with
+                 newer status from a shard. Note that for every hostqueueentry
+                 the corresponding job must be in jobs.
+
     @returns: Serialized representations of hosts, jobs and their dependencies
               to be inserted into a shard's database.
     """
     timer = stats.Timer('shard_heartbeat')
     with timer:
         shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
+        rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
         hosts, jobs = rpc_utils.find_records_for_shard(shard_obj)
         return {
             'hosts': [host.serialize() for host in hosts],
diff --git a/frontend/afe/site_rpc_interface_unittest.py b/frontend/afe/site_rpc_interface_unittest.py
index 198e614..9e7184b 100644
--- a/frontend/afe/site_rpc_interface_unittest.py
+++ b/frontend/afe/site_rpc_interface_unittest.py
@@ -14,6 +14,8 @@
 import StringIO
 import unittest
 
+from django.core import exceptions as django_exceptions
+
 import common
 
 from autotest_lib.frontend import setup_django_environment
@@ -378,11 +380,46 @@
         site_rpc_interface.set_boto_key(boto_key)
 
 
-    def _do_heartbeat_and_assert_response(self, shard_hostname=None, **kwargs):
+    def _get_records_for_sending_to_master(self):
+        return [{'control_file': 'foo',
+                 'control_type': 1,
+                 'created_on': datetime.datetime(2014, 8, 21),
+                 'drone_set': None,
+                 'email_list': '',
+                 'max_runtime_hrs': 72,
+                 'max_runtime_mins': 1440,
+                 'name': 'dummy',
+                 'owner': 'autotest_system',
+                 'parse_failed_repair': True,
+                 'priority': 40,
+                 'reboot_after': 0,
+                 'reboot_before': 1,
+                 'run_reset': True,
+                 'run_verify': False,
+                 'synch_count': 0,
+                 'test_retry': 10,
+                 'timeout': 24,
+                 'timeout_mins': 1440,
+                 'id': 1
+                 }], [{
+                    'aborted': False,
+                    'active': False,
+                    'complete': False,
+                    'deleted': False,
+                    'execution_subdir': '',
+                    'finished_on': None,
+                    'started_on': None,
+                    'status': 'Queued',
+                    'id': 1
+                }]
+
+
+    def _do_heartbeat_and_assert_response(self, shard_hostname=None, upload_jobs=[],
+                                         upload_hqes=[], **kwargs):
         shard_hostname = shard_hostname or str(
             models.Shard.objects.count() + 1)
         retval = site_rpc_interface.shard_heartbeat(
-            shard_hostname=shard_hostname)
+            shard_hostname=shard_hostname, jobs=upload_jobs, hqes=upload_hqes)
 
         self._assert_shard_heartbeat_response(shard_hostname, retval,
                                               **kwargs)
@@ -415,6 +452,62 @@
         self.assertEqual(returned_hqes, expected_hqes)
 
 
+    def _send_records_to_master_helper(
+        self, jobs, hqes, shard_hostname='host1',
+        exception_to_throw=error.UnallowedRecordsSentToMaster):
+        job_id = rpc_interface.create_job(name='dummy', priority='Medium',
+                                          control_file='foo',
+                                          control_type=SERVER,
+                                          test_retry=10, hostless=True)
+        job = models.Job.objects.get(pk=job_id)
+        shard = models.Shard.objects.create(hostname='host1')
+        job.shard = shard
+        job.save()
+        hqe = job.hostqueueentry_set.all()[0]
+        if not exception_to_throw:
+            self._do_heartbeat_and_assert_response(
+                shard_hostname=shard_hostname,
+                upload_jobs=jobs, upload_hqes=hqes)
+        else:
+            self.assertRaises(
+                exception_to_throw,
+                self._do_heartbeat_and_assert_response,
+                shard_hostname=shard_hostname,
+                upload_jobs=jobs, upload_hqes=hqes)
+
+
+    def testSendingRecordsToMaster(self):
+        jobs, hqes = self._get_records_for_sending_to_master()
+        hqes[0]['status'] = 'Completed'
+        self._send_records_to_master_helper(
+            jobs=jobs, hqes=hqes, exception_to_throw=None)
+
+        # Check the entry was actually written to db
+        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
+                         'Completed')
+
+
+    def testSendingRecordsToMasterJobAssignedToDifferentShard(self):
+        jobs, hqes = self._get_records_for_sending_to_master()
+        models.Shard.objects.create(hostname='other_shard')
+        self._send_records_to_master_helper(
+            jobs=jobs, hqes=hqes, shard_hostname='other_shard')
+
+
+    def testSendingRecordsToMasterJobHqeWithoutJob(self):
+        _, hqes = self._get_records_for_sending_to_master()
+        self._send_records_to_master_helper(
+            jobs=[], hqes=hqes)
+
+
+    def testSendingRecordsToMasterNotExistingJob(self):
+        jobs, hqes = self._get_records_for_sending_to_master()
+        jobs[0]['id'] = 3
+
+        self._send_records_to_master_helper(
+            jobs=jobs, hqes=hqes)
+
+
     def testShardHeartbeatFetchHostlessJob(self):
         models.Label.objects.create(name='board:lumpy', platform=True)
         label2 = models.Label.objects.create(name='bluetooth', platform=False)