[autotest] Support sharding for getting HQEs and special tasks
Following 2 RPCs that are used by AFE java client are improved
to support sharding.
get_host_queue_entries_and_special_tasks
get_num_host_queue_entries_and_special_tasks
BUG=chromium:462819
DEPLOY=afe,apache
TEST=puppylab. Get HQEs and special tasks for a lumpy host
which is sharded from the master AFE.
Change-Id: I68c0d1a70fb6b61034c755d3a2b7f26475994bb0
Reviewed-on: https://chromium-review.googlesource.com/268523
Reviewed-by: Mungyung Ryu <mkryu@google.com>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index d9fe281..12b3a28 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -1140,7 +1140,8 @@
database query.
"""
- host = models.Host.smart_get(host_id)
+ # Retrieve host data even if the host is in an invalid state.
+ host = models.Host.smart_get(host_id, False)
if not host.shard:
return get_special_tasks(host_id=host_id, **filter_data)
else:
@@ -1153,6 +1154,40 @@
host_id=host_id, **filter_data)
+def get_num_special_tasks(**kwargs):
+ """Get the number of special task entries from the local database.
+
+ Query the special tasks table for tasks matching the given 'kwargs',
+ and return the number of the results. No attempt is made to forward
+ the call to shards; the buck will stop here.
+
+ @param kwargs Filter keywords to pass to the underlying database query.
+
+ """
+ return models.SpecialTask.query_count(kwargs)
+
+
+def get_host_num_special_tasks(host, **kwargs):
+ """Get special task entries for a given host.
+
+ Query the special tasks table for tasks that ran on the host
+ given by 'host' and matching the given 'kwargs'.
+ Return a list of the results. If the host is assigned to a
+ shard, forward this call to that shard.
+
+ @param host id or name of a host. More often a hostname.
+ @param kwargs Filter keywords to pass to the underlying database query.
+
+ """
+ # Retrieve host data even if the host is in an invalid state.
+ host_model = models.Host.smart_get(host, False)
+ if not host_model.shard:
+ return get_num_special_tasks(host=host, **kwargs)
+ else:
+ shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname())
+ return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
+
+
def get_status_task(host_id, end_time):
"""Get the "status task" for a host from the local shard.
@@ -1210,7 +1245,7 @@
# support for host detail view
-def get_host_queue_entries_and_special_tasks(host_id, query_start=None,
+def get_host_queue_entries_and_special_tasks(host, query_start=None,
query_limit=None, start_time=None,
end_time=None):
"""
@@ -1221,18 +1256,17 @@
total_limit = None
if query_limit is not None:
total_limit = query_start + query_limit
- filter_data_common = {'host': host_id,
+ filter_data_common = {'host': host,
'query_limit': total_limit,
'sort_by': ['-id']}
- filter_data_queue_entries, filter_data_special_tasks = (
- rpc_utils.inject_times_to_hqe_special_tasks_filters(
- filter_data_common, start_time, end_time))
+ filter_data_special_tasks = rpc_utils.inject_times_to_filter(
+ 'time_started__gte', 'time_started__lte', start_time, end_time,
+ **filter_data_common)
- queue_entries = list(models.HostQueueEntry.query_objects(
- filter_data_queue_entries))
- special_tasks = list(models.SpecialTask.query_objects(
- filter_data_special_tasks))
+ queue_entries = get_host_queue_entries(
+ start_time, end_time, **filter_data_common)
+ special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
interleaved_entries = rpc_utils.interleave_entries(queue_entries,
special_tasks)
@@ -1240,19 +1274,20 @@
interleaved_entries = interleaved_entries[query_start:]
if query_limit is not None:
interleaved_entries = interleaved_entries[:query_limit]
- return rpc_utils.prepare_for_serialization(interleaved_entries)
+ return rpc_utils.prepare_host_queue_entries_and_special_tasks(
+ interleaved_entries, queue_entries)
-def get_num_host_queue_entries_and_special_tasks(host_id, start_time=None,
+def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
end_time=None):
- filter_data_common = {'host': host_id}
+ filter_data_common = {'host': host}
filter_data_queue_entries, filter_data_special_tasks = (
rpc_utils.inject_times_to_hqe_special_tasks_filters(
filter_data_common, start_time, end_time))
return (models.HostQueueEntry.query_count(filter_data_queue_entries)
- + models.SpecialTask.query_count(filter_data_special_tasks))
+ + get_host_num_special_tasks(**filter_data_special_tasks))
# recurring run