[autotest] Sync completed jobs back to master

Records of jobs that are completed should be send back to the master
upon a heartbeat.

This changes the heartbeat to include these things.

BUG=None
CQ-DEPEND=CL:212725
DEPLOY=apache,afe
TEST=Ran suites, tried out manually

Change-Id: I25daed310dc1bad064bd0c4819a43d18ba04606d
Reviewed-on: https://chromium-review.googlesource.com/218730
Tested-by: Jakob Jülich <jakobjuelich@chromium.org>
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Jakob Jülich <jakobjuelich@chromium.org>
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 0b03110..dffccfa 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -1101,7 +1101,7 @@
 
 
     def sanity_check_update_from_shard(self, shard, updated_serialized):
-        if not self.shard_id == shard.id:
+        if self.shard_id != shard.id:
             raise error.UnallowedRecordsSentToMaster(
                 'Job id=%s is assigned to shard (%s). Cannot update it with %s '
                 'from shard %s.' % (self.id, self.shard_id, updated_serialized,
@@ -1164,6 +1164,8 @@
 
     timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS)
 
+    # If this is None on the master, a slave should be found.
+    # If this is None on a slave, it should be synced back to the master
     shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
 
     # custom manager
diff --git a/global_config.ini b/global_config.ini
index 793a8e9..1468d88 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -32,12 +32,11 @@
 sql_debug_mode: False
 
 [SHARD]
-# Whether this AFE is running as master AFE or as slave shard.
-is_slave_shard: False
-# Hostname of the local shard, only used if is_slave_shard
-shard_hostname: SET IN SHADOW CONFIG
-# Ignored if is_slave_shard is False
-global_afe_hostname: SET IN SHADOW IF is_slave_shard=True
+# If this is not None, the instance is considered a shard.
+# The value should be the hostname of the local shard.
+shard_hostname:
+# Ignored if shard_hostname is None
+global_afe_hostname: SET IN SHADOW IF shard_hostname IS NOT NONE
 heartbeat_pause_sec: 60
 
 [AUTOSERV]
diff --git a/scheduler/shard/shard_client.py b/scheduler/shard/shard_client.py
index ce451aa..bae49ca 100644
--- a/scheduler/shard/shard_client.py
+++ b/scheduler/shard/shard_client.py
@@ -21,7 +21,7 @@
 from autotest_lib.client.common_lib.cros.graphite import stats
 from autotest_lib.frontend.afe import models, rpc_utils
 from autotest_lib.scheduler import email_manager
-from autotest_lib.server import frontend
+from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
 from autotest_lib.shard import shard_logging_config
 
 
@@ -75,23 +75,29 @@
 
 
 HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
-STATS_KEY = 'shard_client.%s' % socket.gethostname()
 
+RPC_TIMEOUT_MIN = 5
+RPC_DELAY_SEC = 5
+
+STATS_KEY = 'shard_client.%s' % socket.gethostname()
 timer = stats.Timer(STATS_KEY)
 
 
 class ShardClient(object):
     """Performs client side tasks of sharding, i.e. the heartbeat.
 
-    This class contains the to do periodic heartbeats to a global AFE,
+    This class contains the logic to do periodic heartbeats to a global AFE,
     to retrieve new jobs from it and to report completed jobs back.
     """
 
     def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
-        self.afe = frontend.AFE(server=global_afe_hostname)
+        self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
+                                                 timeout_min=RPC_TIMEOUT_MIN,
+                                                 delay_sec=RPC_DELAY_SEC)
         self.hostname = shard_hostname
         self.tick_pause_sec = tick_pause_sec
         self._shutdown = False
+        self._shard = None
 
 
     @timer.decorate
@@ -120,6 +126,53 @@
             models.Job.deserialize(job)
 
 
+    @property
+    def shard(self):
+        """Return this shard's own shard object, fetched from the database.
+
+        A shard's object is fetched from the master with the first jobs. It will
+        not exist before that time.
+
+        @returns: The shard object if it already exists, otherwise None
+        """
+        if self._shard is None:
+            try:
+                self._shard = models.Shard.smart_get(self.hostname)
+            except models.Shard.DoesNotExist:
+                # This might happen before any jobs are assigned to this shard.
+                # This is okay because then there is nothing to offload anyway.
+                pass
+        return self._shard
+
+
+    def _get_jobs_to_upload(self):
+        jobs = []
+        # The scheduler sets shard to None upon completion of the job.
+        # For more information on the shard field's semantic see
+        # models.Job.shard.
+        job_ids = list(models.Job.objects.filter(
+            shard=None).values_list('pk', flat=True))
+
+        for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
+            jobs.append(job_to_upload)
+        return jobs
+
+
+    def _mark_jobs_as_uploaded(self, jobs):
+        job_ids = [job.id for job in jobs]
+        # self.shard might be None if no jobs were downloaded yet.
+        # But then job_ids is empty, so this is harmless.
+        # Even if there were jobs we'd in the worst case upload them twice.
+        models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
+
+
+    def _get_hqes_for_jobs(self, jobs):
+        hqes = []
+        for job in jobs:
+            hqes.extend(job.hostqueueentry_set.all())
+        return hqes
+
+
     @timer.decorate
     def do_heartbeat(self):
         """Perform a heartbeat: Retreive new jobs.
@@ -129,8 +182,17 @@
         objects in the local database.
         """
         logging.info("Performing heartbeat.")
-        response = self.afe.run(HEARTBEAT_AFE_ENDPOINT,
-                                shard_hostname=self.hostname)
+
+        jobs = self._get_jobs_to_upload()
+        hqes = self._get_hqes_for_jobs(jobs)
+
+        response = self.afe.run(
+            HEARTBEAT_AFE_ENDPOINT, shard_hostname=self.hostname,
+            jobs=[job.serialize(include_dependencies=False) for job in jobs],
+            hqes=[hqe.serialize(include_dependencies=False) for hqe in hqes])
+
+        self._mark_jobs_as_uploaded(jobs)
+
         self.process_heartbeat_response(response)
         logging.info("Heartbeat completed.")
 
@@ -159,30 +221,27 @@
     _heartbeat_client.shutdown()
 
 
-def _ensure_running_on_shard():
-    """Raises an exception if run from elsewhere than a shard.
-
-    @raises error.HeartbeatOnlyAllowedInShardModeException if run from
-            elsewhere than from a shard.
-    """
-    is_shard = global_config.global_config.get_config_value(
-            'SHARD', 'is_slave_shard', type=bool)
-
-    if not is_shard:
-        raise error.HeartbeatOnlyAllowedInShardModeException(
-            'To run the shard client, is_slave_shard must be set to True')
-
-
 def _get_global_afe_hostname():
     """Read the hostname of the global AFE from the global configuration."""
     return global_config.global_config.get_config_value(
             'SHARD', 'global_afe_hostname')
 
 
-def _get_my_shard_hostname():
-    """Read the hostname the local shard from the global configuration."""
-    return global_config.global_config.get_config_value(
-        'SHARD', 'shard_hostname')
+def _get_shard_hostname_and_ensure_running_on_shard():
+    """Read the hostname the local shard from the global configuration.
+
+    Raise an exception if run from elsewhere than a shard.
+
+    @raises error.HeartbeatOnlyAllowedInShardModeException if run from
+            elsewhere than from a shard.
+    """
+    hostname = global_config.global_config.get_config_value(
+        'SHARD', 'shard_hostname', default=None)
+    if not hostname:
+        raise error.HeartbeatOnlyAllowedInShardModeException(
+            'To run the shard client, shard_hostname must neither be None nor '
+            'empty.')
+    return hostname
 
 
 def _get_tick_pause_sec():
@@ -198,10 +257,8 @@
 
     @returns A shard client instance.
     """
-    _ensure_running_on_shard()
-
     global_afe_hostname = _get_global_afe_hostname()
-    shard_hostname = _get_my_shard_hostname()
+    shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
     tick_pause_sec = _get_tick_pause_sec()
     return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
 
diff --git a/scheduler/shard/shard_client_unittest.py b/scheduler/shard/shard_client_unittest.py
index 7270719..3c12302 100644
--- a/scheduler/shard/shard_client_unittest.py
+++ b/scheduler/shard/shard_client_unittest.py
@@ -2,6 +2,7 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+import datetime
 import mox
 
 import common
@@ -11,7 +12,7 @@
 from autotest_lib.frontend.afe import models
 from autotest_lib.client.common_lib import error
 from autotest_lib.client.common_lib import global_config
-from autotest_lib.server import frontend
+from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
 from autotest_lib.shard import shard_client
 
 
@@ -19,14 +20,23 @@
                       frontend_test_utils.FrontendTestMixin):
     """Unit tests for functions in shard_client.py"""
 
+
+    GLOBAL_AFE_HOSTNAME = 'foo_autotest'
+
+
     def setUp(self):
         super(ShardClientTest, self).setUp()
+
+        global_config.global_config.override_config_value(
+                'SHARD', 'global_afe_hostname', self.GLOBAL_AFE_HOSTNAME)
+
         self._frontend_common_setup(fill_data=False)
 
 
     def setupMocks(self):
-        self.mox.StubOutClassWithMocks(frontend, 'AFE')
-        self.afe = frontend.AFE(server=mox.IgnoreArg())
+        self.mox.StubOutClassWithMocks(frontend_wrappers, 'RetryingAFE')
+        self.afe = frontend_wrappers.RetryingAFE(
+                delay_sec=5, server=self.GLOBAL_AFE_HOSTNAME, timeout_min=5)
 
 
     def tearDown(self):
@@ -53,30 +63,85 @@
                 'synch_id': None}
 
 
+    def _get_sample_serialized_job(self):
+        return {'control_file': u'control',
+                'control_type': 2,
+                'created_on': datetime.datetime(2008, 1, 1, 0, 0),
+                'dependency_labels': [],
+                'email_list': u'',
+                'hostqueueentry_set': [{'aborted': False,
+                                        'active': False,
+                                        'complete': False,
+                                        'deleted': False,
+                                        'execution_subdir': u'',
+                                        'finished_on': None,
+                                        u'id': 2,
+                                        'meta_host': {u'id': 10,
+                                                      'invalid': False,
+                                                      'kernel_config': u'',
+                                                      'name': u'myplatform',
+                                                      'only_if_needed': False,
+                                                      'platform': True},
+                                        'started_on': None,
+                                        'status': u'Queued'}],
+                u'id': 2,
+                'jobkeyval_set': [],
+                'max_runtime_hrs': 72,
+                'max_runtime_mins': 1440,
+                'name': u'test',
+                'owner': u'autotest_system',
+                'parse_failed_repair': True,
+                'priority': 0,
+                'reboot_after': 0,
+                'reboot_before': 0,
+                'run_reset': True,
+                'run_verify': False,
+                'shard': {'hostname': 'host1',
+                          'id': 4},
+                'synch_count': 1,
+                'test_retry': 0,
+                'timeout': 24,
+                'timeout_mins': 1440}
+
+
     def testHeartbeat(self):
         """Trigger heartbeat, verify RPCs and persisting of the responses."""
         self.setupMocks()
 
         global_config.global_config.override_config_value(
-                'SHARD', 'is_slave_shard', 'True')
-        global_config.global_config.override_config_value(
                 'SHARD', 'shard_hostname', 'host1')
 
         self.afe.run(
-            'shard_heartbeat', shard_hostname='host1',
+            'shard_heartbeat', shard_hostname='host1', jobs=[], hqes=[],
             ).AndReturn({
                 'hosts': [self._get_sample_serialized_host()],
-                'jobs': [],
+                'jobs': [self._get_sample_serialized_job()],
             })
         modified_sample_host = self._get_sample_serialized_host()
         modified_sample_host['hostname'] = 'host2'
         self.afe.run(
-            'shard_heartbeat', shard_hostname='host1',
+            'shard_heartbeat', shard_hostname='host1', jobs=[], hqes=[]
             ).AndReturn({
                 'hosts': [modified_sample_host],
                 'jobs': [],
             })
 
+
+        def verify_upload_jobs_and_hqes(name, shard_hostname, jobs, hqes):
+            self.assertEqual(len(jobs), 1)
+            self.assertEqual(len(hqes), 1)
+            job, hqe = jobs[0], hqes[0]
+            self.assertEqual(hqe['status'], 'Completed')
+
+
+        self.afe.run(
+            'shard_heartbeat', shard_hostname='host1', jobs=mox.IgnoreArg(),
+            hqes=mox.IgnoreArg()
+            ).WithSideEffects(verify_upload_jobs_and_hqes).AndReturn({
+                'hosts': [],
+                'jobs': [],
+            })
+
         self.mox.ReplayAll()
         sut = shard_client.get_shard_client()
 
@@ -88,16 +153,25 @@
 
         sut.do_heartbeat()
 
+        # Ensure it wasn't overwritten
         host = models.Host.objects.get(id=2)
         self.assertEqual(host.hostname, 'host1')
 
+        job = models.Job.objects.all()[0]
+        job.shard = None
+        job.save()
+        hqe = job.hostqueueentry_set.all()[0]
+        hqe.status = 'Completed'
+        hqe.save()
+
+        sut.do_heartbeat()
+
+
         self.mox.VerifyAll()
 
 
     def testHeartbeatNoShardMode(self):
         """Ensure an exception is thrown when run on a non-shard machine."""
-        global_config.global_config.override_config_value(
-                'SHARD', 'is_slave_shard', 'False')
         self.mox.ReplayAll()
 
         self.assertRaises(error.HeartbeatOnlyAllowedInShardModeException,
@@ -111,14 +185,12 @@
         self.setupMocks()
 
         global_config.global_config.override_config_value(
-                'SHARD', 'is_slave_shard', 'True')
-        global_config.global_config.override_config_value(
                 'SHARD', 'heartbeat_pause_sec', '0.01')
         global_config.global_config.override_config_value(
                 'SHARD', 'shard_hostname', 'host1')
 
         self.afe.run(
-            'shard_heartbeat', shard_hostname='host1',
+            'shard_heartbeat', shard_hostname='host1', jobs=[], hqes=[]
             ).AndReturn({
                 'hosts': [],
                 'jobs': [],
@@ -130,7 +202,7 @@
             sut.shutdown()
 
         self.afe.run(
-            'shard_heartbeat', shard_hostname='host1',
+            'shard_heartbeat', shard_hostname='host1', jobs=[], hqes=[]
             ).WithSideEffects(shutdown_sut).AndReturn({
                 'hosts': [],
                 'jobs': [],