[autotest] Add heartbeat AFE endpoint to shard Autotest
To improve the workload the autotest setup can handle, jobs should
be executed by multiple shards. They are sharded by type of board.
This a HTTP accessible endpoint to AFE. This assigns jobs and hosts
to moblabs and returns them as records to insert into a shard's
local database.
TEST=Ran suites and tried manually retrieving jobs
DEPLOY=scheduler,apache,host-scheduler
Change-Id: Ie1dfa7bb295f685c56bb1354675d83f82d933c65
Reviewed-on: https://chromium-review.googlesource.com/211214
Reviewed-by: Alex Miller <milleral@chromium.org>
Tested-by: Jakob Jülich <jakobjuelich@chromium.org>
Commit-Queue: Jakob Jülich <jakobjuelich@chromium.org>
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
index 78048d1..75535fa 100644
--- a/frontend/afe/site_rpc_interface.py
+++ b/frontend/afe/site_rpc_interface.py
@@ -11,12 +11,12 @@
import logging
import os
import shutil
-import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import priorities
from autotest_lib.client.common_lib.cros import dev_server
+from autotest_lib.client.common_lib.cros.graphite import stats
from autotest_lib.frontend.afe import rpc_utils
from autotest_lib.server import utils
from autotest_lib.server.cros.dynamic_suite import constants
@@ -182,13 +182,13 @@
control_file = tools.inject_vars(inject_dict, control_file)
return rpc_utils.create_job_common(name,
- priority=priority,
- timeout_mins=timeout_mins,
- max_runtime_mins=timeout*60,
- control_type='Server',
- control_file=control_file,
- hostless=True,
- keyvals=timings)
+ priority=priority,
+ timeout_mins=timeout_mins,
+ max_runtime_mins=timeout*60,
+ control_type='Server',
+ control_file=control_file,
+ hostless=True,
+ keyvals=timings)
# TODO: hide the following rpcs under is_moblab
@@ -266,3 +266,20 @@
job_id = filter_data['job_id']
job_info = job_history.get_job_info(job_id)
return _rpc_utils().prepare_for_serialization(job_info.get_history())
+
+
+def shard_heartbeat(shard_hostname):
+ """Register shard if it doesn't exist, then assign hosts and jobs.
+
+ @param shard_hostname: Hostname of the calling shard
+ @returns: Serialized representations of hosts, jobs and their dependencies
+ to be inserted into a shard's database.
+ """
+ timer = stats.Timer('shard_heartbeat')
+ with timer:
+ shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
+ hosts, jobs = rpc_utils.find_records_for_shard(shard_obj)
+ return {
+ 'hosts': [host.serialize() for host in hosts],
+ 'jobs': [job.serialize() for job in jobs],
+ }