[autotest] Sync completed jobs back to master
Records of jobs that are completed should be send back to the master
upon a heartbeat.
This changes the heartbeat to include these things.
BUG=None
CQ-DEPEND=CL:212725
DEPLOY=apache,afe
TEST=Ran suites, tried out manually
Change-Id: I25daed310dc1bad064bd0c4819a43d18ba04606d
Reviewed-on: https://chromium-review.googlesource.com/218730
Tested-by: Jakob Jülich <jakobjuelich@chromium.org>
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Jakob Jülich <jakobjuelich@chromium.org>
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 0b03110..dffccfa 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -1101,7 +1101,7 @@
def sanity_check_update_from_shard(self, shard, updated_serialized):
- if not self.shard_id == shard.id:
+ if self.shard_id != shard.id:
raise error.UnallowedRecordsSentToMaster(
'Job id=%s is assigned to shard (%s). Cannot update it with %s '
'from shard %s.' % (self.id, self.shard_id, updated_serialized,
@@ -1164,6 +1164,8 @@
timeout_mins = dbmodels.IntegerField(default=DEFAULT_TIMEOUT_MINS)
+ # If this is None on the master, a slave should be found.
+ # If this is None on a slave, it should be synced back to the master
shard = dbmodels.ForeignKey(Shard, blank=True, null=True)
# custom manager
diff --git a/global_config.ini b/global_config.ini
index 793a8e9..1468d88 100644
--- a/global_config.ini
+++ b/global_config.ini
@@ -32,12 +32,11 @@
sql_debug_mode: False
[SHARD]
-# Whether this AFE is running as master AFE or as slave shard.
-is_slave_shard: False
-# Hostname of the local shard, only used if is_slave_shard
-shard_hostname: SET IN SHADOW CONFIG
-# Ignored if is_slave_shard is False
-global_afe_hostname: SET IN SHADOW IF is_slave_shard=True
+# If this is not None, the instance is considered a shard.
+# The value should be the hostname of the local shard.
+shard_hostname:
+# Ignored if shard_hostname is None
+global_afe_hostname: SET IN SHADOW IF shard_hostname IS NOT NONE
heartbeat_pause_sec: 60
[AUTOSERV]
diff --git a/scheduler/shard/shard_client.py b/scheduler/shard/shard_client.py
index ce451aa..bae49ca 100644
--- a/scheduler/shard/shard_client.py
+++ b/scheduler/shard/shard_client.py
@@ -21,7 +21,7 @@
from autotest_lib.client.common_lib.cros.graphite import stats
from autotest_lib.frontend.afe import models, rpc_utils
from autotest_lib.scheduler import email_manager
-from autotest_lib.server import frontend
+from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
from autotest_lib.shard import shard_logging_config
@@ -75,23 +75,29 @@
HEARTBEAT_AFE_ENDPOINT = 'shard_heartbeat'
-STATS_KEY = 'shard_client.%s' % socket.gethostname()
+RPC_TIMEOUT_MIN = 5
+RPC_DELAY_SEC = 5
+
+STATS_KEY = 'shard_client.%s' % socket.gethostname()
timer = stats.Timer(STATS_KEY)
class ShardClient(object):
"""Performs client side tasks of sharding, i.e. the heartbeat.
- This class contains the to do periodic heartbeats to a global AFE,
+ This class contains the logic to do periodic heartbeats to a global AFE,
to retrieve new jobs from it and to report completed jobs back.
"""
def __init__(self, global_afe_hostname, shard_hostname, tick_pause_sec):
- self.afe = frontend.AFE(server=global_afe_hostname)
+ self.afe = frontend_wrappers.RetryingAFE(server=global_afe_hostname,
+ timeout_min=RPC_TIMEOUT_MIN,
+ delay_sec=RPC_DELAY_SEC)
self.hostname = shard_hostname
self.tick_pause_sec = tick_pause_sec
self._shutdown = False
+ self._shard = None
@timer.decorate
@@ -120,6 +126,53 @@
models.Job.deserialize(job)
+ @property
+ def shard(self):
+ """Return this shard's own shard object, fetched from the database.
+
+ A shard's object is fetched from the master with the first jobs. It will
+ not exist before that time.
+
+ @returns: The shard object if it already exists, otherwise None
+ """
+ if self._shard is None:
+ try:
+ self._shard = models.Shard.smart_get(self.hostname)
+ except models.Shard.DoesNotExist:
+ # This might happen before any jobs are assigned to this shard.
+ # This is okay because then there is nothing to offload anyway.
+ pass
+ return self._shard
+
+
+ def _get_jobs_to_upload(self):
+ jobs = []
+ # The scheduler sets shard to None upon completion of the job.
+ # For more information on the shard field's semantic see
+ # models.Job.shard.
+ job_ids = list(models.Job.objects.filter(
+ shard=None).values_list('pk', flat=True))
+
+ for job_to_upload in models.Job.objects.filter(pk__in=job_ids).all():
+ jobs.append(job_to_upload)
+ return jobs
+
+
+ def _mark_jobs_as_uploaded(self, jobs):
+ job_ids = [job.id for job in jobs]
+ # self.shard might be None if no jobs were downloaded yet.
+ # But then job_ids is empty, so this is harmless.
+ # Even if there were jobs we'd in the worst case upload them twice.
+ models.Job.objects.filter(pk__in=job_ids).update(shard=self.shard)
+
+
+ def _get_hqes_for_jobs(self, jobs):
+ hqes = []
+ for job in jobs:
+ hqes.extend(job.hostqueueentry_set.all())
+ return hqes
+
+
@timer.decorate
def do_heartbeat(self):
"""Perform a heartbeat: Retreive new jobs.
@@ -129,8 +182,17 @@
objects in the local database.
"""
logging.info("Performing heartbeat.")
- response = self.afe.run(HEARTBEAT_AFE_ENDPOINT,
- shard_hostname=self.hostname)
+
+ jobs = self._get_jobs_to_upload()
+ hqes = self._get_hqes_for_jobs(jobs)
+
+ response = self.afe.run(
+ HEARTBEAT_AFE_ENDPOINT, shard_hostname=self.hostname,
+ jobs=[job.serialize(include_dependencies=False) for job in jobs],
+ hqes=[hqe.serialize(include_dependencies=False) for hqe in hqes])
+
+ self._mark_jobs_as_uploaded(jobs)
+
self.process_heartbeat_response(response)
logging.info("Heartbeat completed.")
@@ -159,30 +221,27 @@
_heartbeat_client.shutdown()
-def _ensure_running_on_shard():
- """Raises an exception if run from elsewhere than a shard.
-
- @raises error.HeartbeatOnlyAllowedInShardModeException if run from
- elsewhere than from a shard.
- """
- is_shard = global_config.global_config.get_config_value(
- 'SHARD', 'is_slave_shard', type=bool)
-
- if not is_shard:
- raise error.HeartbeatOnlyAllowedInShardModeException(
- 'To run the shard client, is_slave_shard must be set to True')
-
-
def _get_global_afe_hostname():
"""Read the hostname of the global AFE from the global configuration."""
return global_config.global_config.get_config_value(
'SHARD', 'global_afe_hostname')
-def _get_my_shard_hostname():
- """Read the hostname the local shard from the global configuration."""
- return global_config.global_config.get_config_value(
- 'SHARD', 'shard_hostname')
+def _get_shard_hostname_and_ensure_running_on_shard():
+ """Read the hostname the local shard from the global configuration.
+
+ Raise an exception if run from elsewhere than a shard.
+
+ @raises error.HeartbeatOnlyAllowedInShardModeException if run from
+ elsewhere than from a shard.
+ """
+ hostname = global_config.global_config.get_config_value(
+ 'SHARD', 'shard_hostname', default=None)
+ if not hostname:
+ raise error.HeartbeatOnlyAllowedInShardModeException(
+ 'To run the shard client, shard_hostname must neither be None nor '
+ 'empty.')
+ return hostname
def _get_tick_pause_sec():
@@ -198,10 +257,8 @@
@returns A shard client instance.
"""
- _ensure_running_on_shard()
-
global_afe_hostname = _get_global_afe_hostname()
- shard_hostname = _get_my_shard_hostname()
+ shard_hostname = _get_shard_hostname_and_ensure_running_on_shard()
tick_pause_sec = _get_tick_pause_sec()
return ShardClient(global_afe_hostname, shard_hostname, tick_pause_sec)
diff --git a/scheduler/shard/shard_client_unittest.py b/scheduler/shard/shard_client_unittest.py
index 7270719..3c12302 100644
--- a/scheduler/shard/shard_client_unittest.py
+++ b/scheduler/shard/shard_client_unittest.py
@@ -2,6 +2,7 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+import datetime
import mox
import common
@@ -11,7 +12,7 @@
from autotest_lib.frontend.afe import models
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import global_config
-from autotest_lib.server import frontend
+from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
from autotest_lib.shard import shard_client
@@ -19,14 +20,23 @@
frontend_test_utils.FrontendTestMixin):
"""Unit tests for functions in shard_client.py"""
+
+ GLOBAL_AFE_HOSTNAME = 'foo_autotest'
+
+
def setUp(self):
super(ShardClientTest, self).setUp()
+
+ global_config.global_config.override_config_value(
+ 'SHARD', 'global_afe_hostname', self.GLOBAL_AFE_HOSTNAME)
+
self._frontend_common_setup(fill_data=False)
def setupMocks(self):
- self.mox.StubOutClassWithMocks(frontend, 'AFE')
- self.afe = frontend.AFE(server=mox.IgnoreArg())
+ self.mox.StubOutClassWithMocks(frontend_wrappers, 'RetryingAFE')
+ self.afe = frontend_wrappers.RetryingAFE(
+ delay_sec=5, server=self.GLOBAL_AFE_HOSTNAME, timeout_min=5)
def tearDown(self):
@@ -53,30 +63,85 @@
'synch_id': None}
+ def _get_sample_serialized_job(self):
+ return {'control_file': u'control',
+ 'control_type': 2,
+ 'created_on': datetime.datetime(2008, 1, 1, 0, 0),
+ 'dependency_labels': [],
+ 'email_list': u'',
+ 'hostqueueentry_set': [{'aborted': False,
+ 'active': False,
+ 'complete': False,
+ 'deleted': False,
+ 'execution_subdir': u'',
+ 'finished_on': None,
+ u'id': 2,
+ 'meta_host': {u'id': 10,
+ 'invalid': False,
+ 'kernel_config': u'',
+ 'name': u'myplatform',
+ 'only_if_needed': False,
+ 'platform': True},
+ 'started_on': None,
+ 'status': u'Queued'}],
+ u'id': 2,
+ 'jobkeyval_set': [],
+ 'max_runtime_hrs': 72,
+ 'max_runtime_mins': 1440,
+ 'name': u'test',
+ 'owner': u'autotest_system',
+ 'parse_failed_repair': True,
+ 'priority': 0,
+ 'reboot_after': 0,
+ 'reboot_before': 0,
+ 'run_reset': True,
+ 'run_verify': False,
+ 'shard': {'hostname': 'host1',
+ 'id': 4},
+ 'synch_count': 1,
+ 'test_retry': 0,
+ 'timeout': 24,
+ 'timeout_mins': 1440}
+
+
def testHeartbeat(self):
"""Trigger heartbeat, verify RPCs and persisting of the responses."""
self.setupMocks()
global_config.global_config.override_config_value(
- 'SHARD', 'is_slave_shard', 'True')
- global_config.global_config.override_config_value(
'SHARD', 'shard_hostname', 'host1')
self.afe.run(
- 'shard_heartbeat', shard_hostname='host1',
+ 'shard_heartbeat', shard_hostname='host1', jobs=[], hqes=[],
).AndReturn({
'hosts': [self._get_sample_serialized_host()],
- 'jobs': [],
+ 'jobs': [self._get_sample_serialized_job()],
})
modified_sample_host = self._get_sample_serialized_host()
modified_sample_host['hostname'] = 'host2'
self.afe.run(
- 'shard_heartbeat', shard_hostname='host1',
+ 'shard_heartbeat', shard_hostname='host1', jobs=[], hqes=[]
).AndReturn({
'hosts': [modified_sample_host],
'jobs': [],
})
+
+ def verify_upload_jobs_and_hqes(name, shard_hostname, jobs, hqes):
+ self.assertEqual(len(jobs), 1)
+ self.assertEqual(len(hqes), 1)
+ job, hqe = jobs[0], hqes[0]
+ self.assertEqual(hqe['status'], 'Completed')
+
+
+ self.afe.run(
+ 'shard_heartbeat', shard_hostname='host1', jobs=mox.IgnoreArg(),
+ hqes=mox.IgnoreArg()
+ ).WithSideEffects(verify_upload_jobs_and_hqes).AndReturn({
+ 'hosts': [],
+ 'jobs': [],
+ })
+
self.mox.ReplayAll()
sut = shard_client.get_shard_client()
@@ -88,16 +153,25 @@
sut.do_heartbeat()
+ # Ensure it wasn't overwritten
host = models.Host.objects.get(id=2)
self.assertEqual(host.hostname, 'host1')
+ job = models.Job.objects.all()[0]
+ job.shard = None
+ job.save()
+ hqe = job.hostqueueentry_set.all()[0]
+ hqe.status = 'Completed'
+ hqe.save()
+
+ sut.do_heartbeat()
+
+
self.mox.VerifyAll()
def testHeartbeatNoShardMode(self):
"""Ensure an exception is thrown when run on a non-shard machine."""
- global_config.global_config.override_config_value(
- 'SHARD', 'is_slave_shard', 'False')
self.mox.ReplayAll()
self.assertRaises(error.HeartbeatOnlyAllowedInShardModeException,
@@ -111,14 +185,12 @@
self.setupMocks()
global_config.global_config.override_config_value(
- 'SHARD', 'is_slave_shard', 'True')
- global_config.global_config.override_config_value(
'SHARD', 'heartbeat_pause_sec', '0.01')
global_config.global_config.override_config_value(
'SHARD', 'shard_hostname', 'host1')
self.afe.run(
- 'shard_heartbeat', shard_hostname='host1',
+ 'shard_heartbeat', shard_hostname='host1', jobs=[], hqes=[]
).AndReturn({
'hosts': [],
'jobs': [],
@@ -130,7 +202,7 @@
sut.shutdown()
self.afe.run(
- 'shard_heartbeat', shard_hostname='host1',
+ 'shard_heartbeat', shard_hostname='host1', jobs=[], hqes=[]
).WithSideEffects(shutdown_sut).AndReturn({
'hosts': [],
'jobs': [],