[autotest] Retransmit hosts and jobs if heartbeat failed on client.
If the client fails while performing a heartbeat, the master will
have already set the shard_id and therefore won't ever send jobs or
hosts again.
This changes this behavior: The shard client sends ids of hosts and
incomplete jobs it already has in the heartbeat.
Objects with id's that weren't sent, will be returned by the
heartbeat regardless of already having a shard_id set.
BUG=None
DEPLOY=apache
TEST=Ran suites
Change-Id: I46bbb13a81886476ec48c6f879f123290769b659
Reviewed-on: https://chromium-review.googlesource.com/220692
Tested-by: Jakob Jülich <jakobjuelich@chromium.org>
Reviewed-by: Prashanth B <beeps@chromium.org>
Commit-Queue: Jakob Jülich <jakobjuelich@chromium.org>
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index a813a4e..1b8aeb4 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -433,14 +433,24 @@
@classmethod
- def assign_to_shard(cls, shard):
+ def assign_to_shard(cls, shard, known_ids):
"""Assigns hosts to a shard.
- This function will check which labels are associated with the given
- shard. It will assign those hosts, that have labels that are assigned
- to the shard and haven't been returned to shard earlier.
+ For all labels that have been assigned to this shard, all hosts that
+ have this label, are assigned to this shard.
+
+ Hosts that are assigned to the shard but aren't already present on the
+ shard are returned.
@param shard: The shard object to assign labels/hosts for.
+ @param known_ids: List of all host-ids the shard already knows.
+ This is used to figure out which hosts should be sent
+ to the shard. If shard_ids were used instead, hosts
+ would only be transferred once, even if the client
+ failed persisting them.
+ The number of hosts usually lies in O(100), so the
+ overhead is acceptable.
+
@returns the hosts objects that should be sent to the shard.
"""
@@ -459,9 +469,10 @@
# - SELECT and then UPDATE only selected without requerying afterwards:
# returns the old state of the records.
host_ids = list(Host.objects.filter(
- shard=None,
labels=shard.labels.all(),
leased=False
+ ).exclude(
+ id__in=known_ids,
).values_list('pk', flat=True))
if host_ids:
@@ -1316,24 +1327,39 @@
@classmethod
- def assign_to_shard(cls, shard):
+ def assign_to_shard(cls, shard, known_ids):
"""Assigns unassigned jobs to a shard.
- All jobs that have the platform label that was assigned to the given
- shard are assigned to the shard and returned.
+ For all labels that have been assigned to this shard, all jobs that
+ have this label, are assigned to this shard.
+
+ Jobs that are assigned to the shard but aren't already present on the
+ shard are returned.
+
@param shard: The shard to assign jobs to.
+ @param known_ids: List of all ids of incomplete jobs, the shard already
+ knows about.
+ This is used to figure out which jobs should be sent
+ to the shard. If shard_ids were used instead, jobs
+ would only be transferred once, even if the client
+ failed persisting them.
+ The number of unfinished jobs usually lies in O(1000).
+ Assuming one id takes 8 chars in the json, this means
+ overhead that lies in the lower kilobyte range.
+ A not in query with 5000 id's takes about 30ms.
+
@returns The job objects that should be sent to the shard.
"""
# Disclaimer: Concurrent heartbeats should not occur in today's setup.
# If this changes or they are triggered manually, this applies:
# Jobs may be returned more than once by concurrent calls of this
# function, as there is a race condition between SELECT and UPDATE.
- unassigned_or_aborted_query = (
- dbmodels.Q(shard=None) | dbmodels.Q(hostqueueentry__aborted=True))
job_ids = list(Job.objects.filter(
- unassigned_or_aborted_query,
dependency_labels=shard.labels.all()
).exclude(
+ id__in=known_ids,
+ hostqueueentry__aborted=False
+ ).exclude(
hostqueueentry__complete=True
).exclude(
hostqueueentry__active=True
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 7e53c7c..5c0deb1 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -943,15 +943,17 @@
return models.Shard.smart_get(shard_hostname)
-def find_records_for_shard(shard):
+def find_records_for_shard(shard, known_job_ids, known_host_ids):
"""Find records that should be sent to a shard.
@param shard: Shard to find records for.
+ @param known_job_ids: List of ids of jobs the shard already has.
+ @param known_host_ids: List of ids of hosts the shard already has.
@returns: Tuple of two lists for hosts and jobs: (hosts, jobs).
"""
- hosts = models.Host.assign_to_shard(shard)
- jobs = models.Job.assign_to_shard(shard)
+ hosts = models.Host.assign_to_shard(shard, known_host_ids)
+ jobs = models.Job.assign_to_shard(shard, known_job_ids)
return hosts, jobs
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
index 547e022..50a43c3 100644
--- a/frontend/afe/site_rpc_interface.py
+++ b/frontend/afe/site_rpc_interface.py
@@ -305,8 +305,9 @@
process_pool_size=4))
-def shard_heartbeat(shard_hostname, jobs=(), hqes=()):
- """Register shard if not existing, then assign hosts and jobs.
+def shard_heartbeat(shard_hostname, jobs=(), hqes=(),
+ known_job_ids=(), known_host_ids=()):
+ """Receive updates for job statuses from shards and assign hosts and jobs.
@param shard_hostname: Hostname of the calling shard
@param jobs: Jobs in serialized form that should be updated with newer
@@ -314,15 +315,51 @@
@param hqes: Hostqueueentries in serialized form that should be updated with
newer status from a shard. Note that for every hostqueueentry
the corresponding job must be in jobs.
+ @param known_job_ids: List of ids of jobs the shard already has.
+ @param known_host_ids: List of ids of hosts the shard already has.
@returns: Serialized representations of hosts, jobs and their dependencies
to be inserted into a shard's database.
"""
+ # The following alternatives to sending host and job ids in every heartbeat
+ # have been considered:
+ # 1. Sending the highest known job and host ids. This would work for jobs:
+ # Newer jobs always have larger ids. Also, if a job is not assigned to a
+ # particular shard during a heartbeat, it never will be assigned to this
+ # shard later.
+ # This is not true for hosts though: A host that is leased won't be sent
+ # to the shard now, but might be sent in a future heartbeat. This means
+ # sometimes hosts should be transfered that have a lower id than the
+ # maximum host id the shard knows.
+ # 2. Send the number of jobs/hosts the shard knows to the master in each
+ # heartbeat. Compare these to the number of records that already have
+ # the shard_id set to this shard. In the normal case, they should match.
+ # In case they don't, resend all entities of that type.
+ # This would work well for hosts, because there aren't that many.
+ # Resending all jobs is quite a big overhead though.
+ # Also, this approach might run into edge cases when entities are
+ # ever deleted.
+ # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
+ # Using two different approaches isn't consistent and might cause
+ # confusion. Also the issues with the case of deletions might still
+ # occur.
+ #
+ # The overhead of sending all job and host ids in every heartbeat is low:
+ # At peaks one board has about 1200 created but unfinished jobs.
+ # See the numbers here: http://goo.gl/gQCGWH
+ # Assuming that job id's have 6 digits and that json serialization takes a
+ # comma and a space as overhead, the traffic per id sent is about 8 bytes.
+ # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
+ # A NOT IN query with 5000 ids took about 30ms in tests made.
+ # These numbers seem low enough to outweigh the disadvantages of the
+ # solutions described above.
timer = stats.Timer('shard_heartbeat')
with timer:
shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
- hosts, jobs = rpc_utils.find_records_for_shard(shard_obj)
+ hosts, jobs = rpc_utils.find_records_for_shard(
+ shard_obj,
+ known_job_ids=known_job_ids, known_host_ids=known_host_ids)
return {
'hosts': [host.serialize() for host in hosts],
'jobs': [job.serialize() for job in jobs],
diff --git a/frontend/afe/site_rpc_interface_unittest.py b/frontend/afe/site_rpc_interface_unittest.py
index 2600920..a49f8ee 100644
--- a/frontend/afe/site_rpc_interface_unittest.py
+++ b/frontend/afe/site_rpc_interface_unittest.py
@@ -414,12 +414,17 @@
}]
- def _do_heartbeat_and_assert_response(self, shard_hostname=None, upload_jobs=[],
- upload_hqes=[], **kwargs):
- shard_hostname = shard_hostname or str(
- models.Shard.objects.count() + 1)
+ def _do_heartbeat_and_assert_response(self, shard_hostname='shard1',
+ upload_jobs=(), upload_hqes=(),
+ known_jobs=(), known_hosts=(),
+ **kwargs):
+ known_job_ids = [job.id for job in known_jobs]
+ known_host_ids = [host.id for host in known_hosts]
+
retval = site_rpc_interface.shard_heartbeat(
- shard_hostname=shard_hostname, jobs=upload_jobs, hqes=upload_hqes)
+ shard_hostname=shard_hostname,
+ jobs=upload_jobs, hqes=upload_hqes,
+ known_job_ids=known_job_ids, known_host_ids=known_host_ids)
self._assert_shard_heartbeat_response(shard_hostname, retval,
**kwargs)
@@ -477,6 +482,7 @@
def testSendingRecordsToMaster(self):
+ """Send records to the master and ensure they are persisted."""
jobs, hqes = self._get_records_for_sending_to_master()
hqes[0]['status'] = 'Completed'
self._send_records_to_master_helper(
@@ -488,6 +494,7 @@
def testSendingRecordsToMasterJobAssignedToDifferentShard(self):
+ """Ensure records that belong to a different shard are rejected."""
jobs, hqes = self._get_records_for_sending_to_master()
models.Shard.objects.create(hostname='other_shard')
self._send_records_to_master_helper(
@@ -495,12 +502,14 @@
def testSendingRecordsToMasterJobHqeWithoutJob(self):
+ """Ensure update for hqe without update for it's job gets rejected."""
_, hqes = self._get_records_for_sending_to_master()
self._send_records_to_master_helper(
jobs=[], hqes=hqes)
def testSendingRecordsToMasterNotExistingJob(self):
+ """Ensure update for non existing job gets rejected."""
jobs, hqes = self._get_records_for_sending_to_master()
jobs[0]['id'] = 3
@@ -508,105 +517,125 @@
jobs=jobs, hqes=hqes)
- def testShardHeartbeatFetchHostlessJob(self):
- models.Label.objects.create(name='board:lumpy', platform=True)
- label2 = models.Label.objects.create(name='bluetooth', platform=False)
+ def _createShardAndHostWithLabel(self, shard_hostname='shard1',
+ host_hostname='host1',
+ label_name='board:lumpy'):
+ label = models.Label.objects.create(name=label_name)
- shard_hostname = 'host1'
shard = models.Shard.objects.create(hostname=shard_hostname)
- shard.labels.add(models.Label.smart_get('board:lumpy'))
+ shard.labels.add(label)
+
+ host = models.Host.objects.create(hostname=host_hostname, leased=False)
+ host.labels.add(label)
+
+ return shard, host, label
+
+
+ def _createJobForLabel(self, label):
+ job_id = rpc_interface.create_job(name='dummy', priority='Medium',
+ control_file='foo',
+ control_type=CLIENT,
+ meta_hosts=[label.name],
+ dependencies=(label.name,))
+ return models.Job.objects.get(id=job_id)
+
+
+ def testShardHeartbeatFetchHostlessJob(self):
+ """Create a hostless job and ensure it's not assigned to a shard."""
+ shard1, host1, lumpy_label = self._createShardAndHostWithLabel(
+ 'shard1', 'host1', 'board:lumpy')
+
+ label2 = models.Label.objects.create(name='bluetooth', platform=False)
job1 = self._create_job(hostless=True)
# Hostless jobs should be executed by the global scheduler.
- self._do_heartbeat_and_assert_response(
- shard_hostname=shard_hostname)
+ self._do_heartbeat_and_assert_response(hosts=[host1])
def testShardRetrieveJobs(self):
- host1, host2 = [models.Host.objects.create(
- hostname=hostname, leased=False) for hostname in ['host1', 'host2']]
-
+ """Create jobs and retrieve them."""
# should never be returned by heartbeat
leased_host = models.Host.objects.create(hostname='leased_host',
leased=True)
- lumpy_label = models.Label.objects.create(name='board:lumpy',
- platform=True)
- grumpy_label = models.Label.objects.create(name='board:grumpy',
- platform=True)
+ shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
+ shard2, host2, grumpy_label = self._createShardAndHostWithLabel(
+ 'shard2', 'host2', 'board:grumpy')
-
- host1.labels.add(lumpy_label)
leased_host.labels.add(lumpy_label)
- host2.labels.add(grumpy_label)
- shard_hostname1 = 'host1'
- shard_hostname2 = 'host2'
+ job1 = self._createJobForLabel(lumpy_label)
- shard1 = models.Shard.objects.create(hostname=shard_hostname1)
- shard2 = models.Shard.objects.create(hostname=shard_hostname2)
+ job2 = self._createJobForLabel(grumpy_label)
- shard1.labels.add(lumpy_label)
- shard2.labels.add(grumpy_label)
-
- job_id = rpc_interface.create_job(name='dummy', priority='Medium',
- control_file='foo',
- control_type=CLIENT,
- meta_hosts=['board:lumpy'],
- dependencies=('board:lumpy',),
- test_retry=10)
- job1 = models.Job.objects.get(id=job_id)
- job_id = rpc_interface.create_job(name='dummy', priority='Medium',
- control_file='foo',
- control_type=CLIENT,
- meta_hosts=['board:grumpy'],
- dependencies=('board:grumpy',),
- test_retry=10)
-
- job2 = models.Job.objects.get(id=job_id)
- job_id = rpc_interface.create_job(name='dummy', priority='Medium',
- control_file='foo',
- control_type=CLIENT,
- meta_hosts=['board:lumpy'],
- dependencies=('board:lumpy',),
- test_retry=10)
- job_completed = models.Job.objects.get(id=job_id)
- # Job is obviously already run, so don't sync it
+ job_completed = self._createJobForLabel(lumpy_label)
+ # Job is already being run, so don't sync it
job_completed.hostqueueentry_set.update(complete=True)
job_completed.hostqueueentry_set.create(complete=False)
- job_id = rpc_interface.create_job(name='dummy', priority='Medium',
- control_file='foo',
- control_type=CLIENT,
- meta_hosts=['board:lumpy'],
- dependencies=('board:lumpy',),
- test_retry=10)
- job_active = models.Job.objects.get(id=job_id)
- # Job is obviously already started, so don't sync it
+
+ job_active = self._createJobForLabel(lumpy_label)
+ # Job is already started, so don't sync it
job_active.hostqueueentry_set.update(active=True)
job_active.hostqueueentry_set.create(complete=False, active=False)
self._do_heartbeat_and_assert_response(
- shard_hostname=shard_hostname1, jobs=[job1], hosts=[host1],
- hqes=job1.hostqueueentry_set.all())
+ jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all())
self._do_heartbeat_and_assert_response(
- shard_hostname=shard_hostname2, jobs=[job2], hosts=[host2],
- hqes=job2.hostqueueentry_set.all())
+ shard_hostname=shard2.hostname,
+ jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all())
host3 = models.Host.objects.create(hostname='host3', leased=False)
host3.labels.add(lumpy_label)
self._do_heartbeat_and_assert_response(
- shard_hostname=shard_hostname1, jobs=[], hosts=[host3])
+ known_jobs=[job1], known_hosts=[host1], hosts=[host3])
- job1.hostqueueentry_set.update(aborted=True)
+
+ def testResendJobsAfterFailedHeartbeat(self):
+ """Create jobs, retrieve them, fail on client, fetch them again."""
+ shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
+
+ job1 = self._createJobForLabel(lumpy_label)
+
self._do_heartbeat_and_assert_response(
- shard_hostname=shard_hostname1, jobs=[job1],
- hqes=job1.hostqueueentry_set.all())
+ jobs=[job1],
+ hqes=job1.hostqueueentry_set.all(), hosts=[host1])
+ # Make sure it's resubmitted by sending last_job=None again
+ self._do_heartbeat_and_assert_response(
+ known_hosts=[host1],
+ jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[])
- site_rpc_interface.delete_shard(hostname=shard_hostname1)
+ # Now it worked, make sure it's not sent again
+ self._do_heartbeat_and_assert_response(
+ known_jobs=[job1], known_hosts=[host1])
+
+ job1 = models.Job.objects.get(pk=job1.id)
+ job1.hostqueueentry_set.all().update(complete=True)
+
+ # Job is completed, make sure it's not sent again
+ self._do_heartbeat_and_assert_response(
+ known_hosts=[host1])
+
+ job2 = self._createJobForLabel(lumpy_label)
+
+ # job2's creation was later, it should be returned now.
+ self._do_heartbeat_and_assert_response(
+ known_hosts=[host1],
+ jobs=[job2], hqes=job2.hostqueueentry_set.all())
+
+ self._do_heartbeat_and_assert_response(
+ known_jobs=[job2], known_hosts=[host1])
+
+ job2.hostqueueentry_set.update(aborted=True)
+ self._do_heartbeat_and_assert_response(
+ known_jobs=[job2], known_hosts=[host1],
+ jobs=[job2],
+ hqes=job2.hostqueueentry_set.all())
+
+ site_rpc_interface.delete_shard(hostname=shard1.hostname)
self.assertRaises(
models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id)
@@ -623,6 +652,7 @@
def testCreateListShard(self):
+ """Retrieve a list of all shards."""
lumpy_label = models.Label.objects.create(name='board:lumpy',
platform=True)
@@ -641,5 +671,19 @@
'id': 1}])
+ def testResendHostsAfterFailedHeartbeat(self):
+ """Check that master accepts resending updated records after failure."""
+ shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
+
+ # Send the host
+ self._do_heartbeat_and_assert_response(hosts=[host1])
+
+ # Send it again because previous one didn't persist correctly
+ self._do_heartbeat_and_assert_response(hosts=[host1])
+
+ # Now it worked, make sure it isn't sent again
+ self._do_heartbeat_and_assert_response(known_hosts=[host1])
+
+
if __name__ == '__main__':
unittest.main()