[autotest] Endpoints for syncing jobs and hqes back to master
The status of jobs that were completed on a shard should be visible
in the master AFE, so a human can easily see which jobs have been run in one
central place.
This adds the functionality to the AFE to retrieve newer versions of
jobs and hqes from shards.
This is a newer version of CL:213901 which was reverted because of
http://crbug.com/418022.
BUG=418022
DEPLOY=apache, afe
TEST=Ran Suites
Change-Id: I3c9a35c78585a43d193874decbb7b16216b2c69a
Reviewed-on: https://chromium-review.googlesource.com/220141
Reviewed-by: Fang Deng <fdeng@chromium.org>
Tested-by: Jakob Jülich <jakobjuelich@chromium.org>
Commit-Queue: Jakob Jülich <jakobjuelich@chromium.org>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 80756e3..dd8c963 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -945,9 +945,72 @@
def find_records_for_shard(shard):
"""Find records that should be sent to a shard.
- @returns: Tuple of two lists for hosts and jobs: (hosts, jobs)
+ @param shard: Shard to find records for.
+
+ @returns: Tuple of two lists for hosts and jobs: (hosts, jobs).
"""
hosts = models.Host.assign_to_shard(shard)
jobs = models.Job.assign_to_shard(shard)
return hosts, jobs
+
+
+def _persist_records_with_type_sent_from_shard(
+ shard, records, record_type, *args, **kwargs):
+ """
+ Handle records of a specified type that were sent to the shard master.
+
+ @param shard: The shard the records were sent from.
+ @param records: The records sent in their serialized format.
+ @param record_type: Type of the objects represented by records.
+ @param args: Additional arguments that will be passed on to the sanity
+ checks.
+ @param kwargs: Additional arguments that will be passed on to the sanity
+ checks.
+
+ @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
+
+ @returns: List of primary keys of the processed records.
+ """
+ pks = []
+ for serialized_record in records:
+ pk = serialized_record['id']
+ try:
+ current_record = record_type.objects.get(pk=pk)
+ except record_type.DoesNotExist:
+ raise error.UnallowedRecordsSentToMaster(
+ 'Object with pk %s of type %s does not exist on master.' % (
+ pk, record_type))
+
+ current_record.sanity_check_update_from_shard(
+ shard, serialized_record, *args, **kwargs)
+
+ current_record.update_from_serialized(serialized_record)
+ pks.append(pk)
+ return pks
+
+
+def persist_records_sent_from_shard(shard, jobs, hqes):
+ """
+ Sanity checking then saving serialized records sent to master from shard.
+
+ During heartbeats shards upload jobs and hostqueuentries. This performs
+ some sanity checks on these and then updates the existing records for those
+ entries with the updated ones from the heartbeat.
+
+ The sanity checks include:
+ - Checking if the objects sent already exist on the master.
+ - Checking if the objects sent were assigned to this shard.
+ - hostqueueentries must be sent together with their jobs.
+
+ @param shard: The shard the records were sent from.
+ @param jobs: The jobs the shard sent.
+ @param hqes: The hostqueuentries the shart sent.
+
+ @raises error.UnallowedRecordsSentToMaster if any of the sanity checks fail.
+ """
+ job_ids_sent = _persist_records_with_type_sent_from_shard(
+ shard, jobs, models.Job)
+
+ _persist_records_with_type_sent_from_shard(
+ shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)