[autotest] Consult appropriate shard AFE for special tasks

Special tasks are updated in the local DB of a shard.
RPCs hitting a master AFE that are getting special tasks
should consult an appropriate shard AFE.

BUG=chromium:461637
DEPLOY=apache
TEST=puppylab.
./site_utils/dut_status.py test_host41, where test_host41 is the dut
in master.
./site_utils/dut_status.py test_host51, where test_host51 is the dut
in lumpyshard.

Change-Id: I118ed5f4f20eba14c2e4c2c056e217c31f334aac
Reviewed-on: https://chromium-review.googlesource.com/254420
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 70dd91a..dd24949 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -1178,11 +1178,114 @@
 
 
 def get_global_afe_hostname():
-    """Read the hostname of the global AFE from the global configuration."""
+    """Reads the hostname of the global AFE from the global configuration."""
     return global_config.global_config.get_config_value(
             'SHARD', 'global_afe_hostname')
 
 
 def route_rpc_to_master(rpc_name, **kwargs):
+    """Routes RPC to master AFE.
+
+    @param rpc_name: The name of the rpc.
+    @param **kwargs: The kwargs for the rpc.
+    """
     master_afe = frontend.AFE(server=get_global_afe_hostname())
-    master_afe.run(rpc_name, **kwargs)
+    return master_afe.run(rpc_name, **kwargs)
+
+
+def get_serialized_local_host_queue_entries(**kwargs):
+    """Gets serialized HQEs from local DB.
+
+    @param **kwargs: The kwargs for the rpc.
+    @return: List of serialized HQEs. Each HQE is represented by a dict.
+    """
+    return prepare_rows_as_nested_dicts(
+            models.HostQueueEntry.query_objects(kwargs),
+            ('host', 'atomic_group', 'job'))
+
+
+def get_serialized_local_special_tasks(**kwargs):
+    """Gets serialized special tasks from local DB.
+
+    @param **kwargs: The kwargs for the rpc.
+    @return: List of serialized special tasks.
+             Each task is represented by a dict.
+    """
+    return prepare_rows_as_nested_dicts(
+            models.SpecialTask.query_objects(kwargs), ('host', 'queue_entry'))
+
+
+def pack_in_list(obj, list):
+    """Packs objects into a list appropriately.
+
+    @param obj:  An object to insert into a list
+    @param list: List that will contain the input object.
+    """
+    if obj is None or not obj:
+        return
+    # In python 2.x, string does not have __iter__ attribute
+    # while it does in python 3.x.
+    # Check string type first to be safe.
+    if isinstance(obj, basestring):
+        list.append(obj)
+    elif hasattr(obj, '__iter__'):
+        list.extend(obj)
+    else:
+        list.append(obj)
+
+
+def get_from_shards(shard_hostnames, rpc_name, **kwargs):
+    """Gets data via RPC to shards.
+
+    @param shard_hostnames: Host names for shards
+    @param rpc_name:        Name of the RPC endpoint to call.
+    @param **kwargs:        Keyword arguments to pass in the RPCs.
+    @return: List of all RPC results from the shards.
+    """
+    results = list()
+    for shard in shard_hostnames:
+        afe = frontend.AFE(server=shard)
+        pack_in_list(afe.run(rpc_name, **kwargs), results)
+    return results
+
+
+def get_data(serialized_db_access_func, rpc_name, **kwargs):
+    """Gets data according to the role of a node (master/shard).
+
+    Master:
+    When |kwargs| has host_id and the host is in a shard,
+    calls |rpc_name| to the shard, and gets the result.
+    When |kwargs| doesn't have host info, contacts all shards.
+    Retrieves data from local DB and coalesces them with results
+    from shards.
+
+    Shard:
+    Retrieves data from local DB.
+
+    @param serialized_db_access_func: Function object that access local
+           DB and serialize returned records.
+    @param rpc_name: Name of the RPC endpoint to call.
+    @param **kwargs: Keyword arguments to pass in the RPCs.
+    @return: List of objects
+    """
+    results = list()
+    pack_in_list(serialized_db_access_func(**kwargs), results)
+
+    if not server_utils.is_shard():
+        shard_hostnames = list()
+
+        if 'host_id' in kwargs:
+            host_objs = [models.Host.smart_get(kwargs['host_id'])]
+            shard_host_map = bucket_hosts_by_shard(host_objs, True)
+            shard_hostnames.extend(shard_host_map.keys())
+        elif 'host_id__in' in kwargs:
+            host_objs = models.Host.smart_get_bulk(kwargs['host_id__in'])
+            shard_host_map = bucket_hosts_by_shard(host_objs, True)
+            shard_hostnames.extend(shard_host_map.keys())
+        else:
+            for shard in models.Shard.objects.all():
+                shard_hostnames.append(shard.rpc_hostname())
+
+        pack_in_list(get_from_shards(
+                shard_hostnames, rpc_name, **kwargs), results)
+    return results