rpc: Guarantee reset to good state after readonly

The kludge needed to query readonly backup DB could leave the connection
to master broken in case of an error. Remedy this.
Behind a feature flag, fetch_readonly_jobs, defaults to False.

BUG=chromium:810965
BUG=chromium:818271
TEST=Old and new unit tests, feature flag for canarying rollout

Change-Id: Idc5e3793f5dc5a2bd1022e468456b88b2f347ed3
Reviewed-on: https://chromium-review.googlesource.com/944041
Commit-Ready: Jacob Kopczynski <jkop@chromium.org>
Tested-by: Jacob Kopczynski <jkop@chromium.org>
Reviewed-by: Xixuan Wu <xixuan@chromium.org>
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index beb64c3..0d8b0ea 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -1487,6 +1487,8 @@
     DEFAULT_PARSE_FAILED_REPAIR = global_config.global_config.get_config_value(
         'AUTOTEST_WEB', 'parse_failed_repair_default', type=bool,
         default=False)
+    FETCH_READONLY_JOBS = global_config.global_config.get_config_value(
+        'AUTOTEST_WEB','shard_heartbeat_use_readonly_slave', default=False)
 
     owner = dbmodels.CharField(max_length=255)
     name = dbmodels.CharField(max_length=255)
@@ -1660,10 +1662,22 @@
             check_known_jobs_exclude = 'AND NOT ' + check_known_jobs
             check_known_jobs_include = 'OR ' + check_known_jobs
 
-        query = Job.objects.raw(cls.SQL_SHARD_JOBS % {
-                'check_known_jobs': check_known_jobs_exclude,
-                'shard_id': shard.id})
-        job_ids |= set([j.id for j in query])
+        raw_sql = cls.SQL_SHARD_JOBS % {
+            'check_known_jobs': check_known_jobs_exclude,
+            'shard_id': shard.id
+        }
+        if cls.FETCH_READONLY_JOBS:
+            #TODO(jkop): Get rid of this kludge when we update Django to >=1.7
+            #correct usage would be .raw(..., using='readonly')
+            old_db = Job.objects._db
+            try:
+                Job.objects._db = 'readonly'
+                job_ids |= set([j.id for j in Job.objects.raw(raw_sql)])
+            finally:
+                Job.objects._db = old_db
+        if not job_ids:
+            #If the replica is down or we're in a test, fetch from master.
+            job_ids |= set([j.id for j in Job.objects.raw(raw_sql)])
 
         static_labels, non_static_labels = Host.classify_label_objects(
                 shard.labels.all())
diff --git a/frontend/afe/rpc_interface_unittest.py b/frontend/afe/rpc_interface_unittest.py
index 8e74d0b..7616c39 100755
--- a/frontend/afe/rpc_interface_unittest.py
+++ b/frontend/afe/rpc_interface_unittest.py
@@ -24,6 +24,8 @@
 from autotest_lib.server.cros.dynamic_suite import constants
 from autotest_lib.server.cros.dynamic_suite import control_file_getter
 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
+from django.db.utils import DatabaseError
+
 
 CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT
 SERVER = control_data.CONTROL_TYPE_NAMES.SERVER
@@ -113,6 +115,38 @@
                                                incorrect_host_ids=[host2.id])
 
 
+    def _testShardHeartbeatBadReadonlyQueryHelper(self, shard1, host1, label1):
+        """Ensure recovery if query fails while reading from readonly."""
+        host2 = models.Host.objects.create(hostname='test_host2', leased=False)
+        host2.labels.add(label1)
+        self.assertEqual(host2.shard, None)
+
+        proper_master_db = models.Job.objects._db
+        # In the middle of the assign_to_shard call, remove label1 from shard1.
+        self.mox.StubOutWithMock(models.Host, '_assign_to_shard_nothing_helper')
+        def find_shard_job_query():
+            return models.Job.SQL_SHARD_JOBS
+
+        def break_shard_job_query():
+            set_shard_job_query("SELECT quux from foo%s malformed query;")
+
+        def set_shard_job_query(query):
+            models.Job.SQL_SHARD_JOBS = query
+
+        models.Host._assign_to_shard_nothing_helper().WithSideEffects(
+            break_shard_job_query)
+        self.mox.ReplayAll()
+
+        old_query = find_shard_job_query()
+        try:
+            self.assertRaises(DatabaseError,
+                              self._do_heartbeat_and_assert_response,
+                known_hosts=[host1], hosts=[], incorrect_host_ids=[host1.id])
+            self.assertEqual(models.Job.objects._db, proper_master_db)
+        finally:
+            set_shard_job_query(old_query)
+
+
     def _testShardHeartbeatLabelRemovalRaceHelper(self, shard1, host1, label1):
         """Ensure correctness if label removed during heartbeat."""
         host2 = models.Host.objects.create(hostname='test_host2', leased=False)
@@ -1411,10 +1445,13 @@
             self._NAME)
         self.dev_server = self.mox.CreateMock(dev_server.ImageServer)
         self._frontend_common_setup(fill_data=False)
+        self.stored_readonly_setting = models.Job.FETCH_READONLY_JOBS
+        models.Job.FETCH_READONLY_JOBS = True
 
 
     def tearDown(self):
         self._frontend_common_teardown()
+        models.Job.FETCH_READONLY_JOBS = self.stored_readonly_setting
 
 
     def _setupDevserver(self):
@@ -1875,5 +1912,12 @@
         self._testResendHostsAfterFailedHeartbeatHelper(host1)
 
 
+    def testShardHeartbeatBadReadonlyQuery(self):
+        old_readonly = models.Job.FETCH_READONLY_JOBS
+        shard1, host1, label1 = self._createShardAndHostWithLabel(
+                host_hostname='test_host1')
+        self._testShardHeartbeatBadReadonlyQueryHelper(shard1, host1, label1)
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/global_config.ini b/global_config.ini
index 3166868..04871cb 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -27,6 +27,10 @@
 # is too much verbosity for 'production' systems, hence turned off by default.
 sql_debug_mode: False
 
+# Feature flag for testing rollout of fetching jobs from readonly slaves
+# during heartbeat
+shard_heartbeat_use_readonly_slave: False
+
 # Restricted user group. The users in the specified groups only have
 # access to master server. Will always direct them to google storage for logs
 # rather than drones or shards.