[autotest] Consult appropriate shard AFE for special tasks
Special tasks are updated in the local DB of a shard.
RPCs hitting a master AFE that are getting special tasks
should consult an appropriate shard AFE.
BUG=chromium:461637
DEPLOY=apache
TEST=puppylab.
./site_utils/dut_status.py test_host41, where test_host41 is the dut
in master.
./site_utils/dut_status.py test_host51, where test_host51 is the dut
in lumpyshard.
Change-Id: I118ed5f4f20eba14c2e4c2c056e217c31f334aac
Reviewed-on: https://chromium-review.googlesource.com/254420
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 70dd91a..dd24949 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -1178,11 +1178,114 @@
def get_global_afe_hostname():
- """Read the hostname of the global AFE from the global configuration."""
+ """Reads the hostname of the global AFE from the global configuration."""
return global_config.global_config.get_config_value(
'SHARD', 'global_afe_hostname')
def route_rpc_to_master(rpc_name, **kwargs):
+ """Routes RPC to master AFE.
+
+ @param rpc_name: The name of the rpc.
+ @param **kwargs: The kwargs for the rpc.
+ """
master_afe = frontend.AFE(server=get_global_afe_hostname())
- master_afe.run(rpc_name, **kwargs)
+ return master_afe.run(rpc_name, **kwargs)
+
+
+def get_serialized_local_host_queue_entries(**kwargs):
+ """Gets serialized HQEs from local DB.
+
+ @param **kwargs: The kwargs for the rpc.
+ @return: List of serialized HQEs. Each HQE is represented by a dict.
+ """
+ return prepare_rows_as_nested_dicts(
+ models.HostQueueEntry.query_objects(kwargs),
+ ('host', 'atomic_group', 'job'))
+
+
+def get_serialized_local_special_tasks(**kwargs):
+ """Gets serialized special tasks from local DB.
+
+ @param **kwargs: The kwargs for the rpc.
+ @return: List of serialized special tasks.
+ Each task is represented by a dict.
+ """
+ return prepare_rows_as_nested_dicts(
+ models.SpecialTask.query_objects(kwargs), ('host', 'queue_entry'))
+
+
+def pack_in_list(obj, list):
+ """Packs objects into a list appropriately.
+
+ @param obj: An object to insert into a list
+ @param list: List that will contain the input object.
+ """
+ if obj is None or not obj:
+ return
+ # In python 2.x, string does not have __iter__ attribute
+ # while it does in python 3.x.
+ # Check string type first to be safe.
+ if isinstance(obj, basestring):
+ list.append(obj)
+ elif hasattr(obj, '__iter__'):
+ list.extend(obj)
+ else:
+ list.append(obj)
+
+
+def get_from_shards(shard_hostnames, rpc_name, **kwargs):
+ """Gets data via RPC to shards.
+
+ @param shard_hostnames: Host names for shards
+ @param rpc_name: Name of the RPC endpoint to call.
+ @param **kwargs: Keyword arguments to pass in the RPCs.
+ @return: List of all RPC results from the shards.
+ """
+ results = list()
+ for shard in shard_hostnames:
+ afe = frontend.AFE(server=shard)
+ pack_in_list(afe.run(rpc_name, **kwargs), results)
+ return results
+
+
+def get_data(serialized_db_access_func, rpc_name, **kwargs):
+ """Gets data according to the role of a node (master/shard).
+
+ Master:
+ When |kwargs| has host_id and the host is in a shard,
+ calls |rpc_name| to the shard, and gets the result.
+ When |kwargs| doesn't have host info, contacts all shards.
+ Retrieves data from local DB and coalesces them with results
+ from shards.
+
+ Shard:
+ Retrieves data from local DB.
+
+ @param serialized_db_access_func: Function object that access local
+ DB and serialize returned records.
+ @param rpc_name: Name of the RPC endpoint to call.
+ @param **kwargs: Keyword arguments to pass in the RPCs.
+ @return: List of objects
+ """
+ results = list()
+ pack_in_list(serialized_db_access_func(**kwargs), results)
+
+ if not server_utils.is_shard():
+ shard_hostnames = list()
+
+ if 'host_id' in kwargs:
+ host_objs = [models.Host.smart_get(kwargs['host_id'])]
+ shard_host_map = bucket_hosts_by_shard(host_objs, True)
+ shard_hostnames.extend(shard_host_map.keys())
+ elif 'host_id__in' in kwargs:
+ host_objs = models.Host.smart_get_bulk(kwargs['host_id__in'])
+ shard_host_map = bucket_hosts_by_shard(host_objs, True)
+ shard_hostnames.extend(shard_host_map.keys())
+ else:
+ for shard in models.Shard.objects.all():
+ shard_hostnames.append(shard.rpc_hostname())
+
+ pack_in_list(get_from_shards(
+ shard_hostnames, rpc_name, **kwargs), results)
+ return results