[autotest] Support sharding for getting HQEs and special tasks

Following 2 RPCs that are used by AFE java client are improved
to support sharding.

get_host_queue_entries_and_special_tasks
get_num_host_queue_entries_and_special_tasks

BUG=chromium:462819
DEPLOY=afe,apache
TEST=puppylab. Get HQEs and special tasks for a lumpy host
     which is sharded from the master AFE.

Change-Id: I68c0d1a70fb6b61034c755d3a2b7f26475994bb0
Reviewed-on: https://chromium-review.googlesource.com/268523
Reviewed-by: Mungyung Ryu <mkryu@google.com>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index d9fe281..12b3a28 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -1140,7 +1140,8 @@
                         database query.
 
     """
-    host = models.Host.smart_get(host_id)
+    # Retrieve host data even if the host is in an invalid state.
+    host = models.Host.smart_get(host_id, False)
     if not host.shard:
         return get_special_tasks(host_id=host_id, **filter_data)
     else:
@@ -1153,6 +1154,40 @@
                              host_id=host_id, **filter_data)
 
 
+def get_num_special_tasks(**kwargs):
+    """Get the number of special task entries from the local database.
+
+    Query the special tasks table for tasks matching the given 'kwargs',
+    and return the number of the results. No attempt is made to forward
+    the call to shards; the buck will stop here.
+
+    @param kwargs    Filter keywords to pass to the underlying database query.
+
+    """
+    return models.SpecialTask.query_count(kwargs)
+
+
+def get_host_num_special_tasks(host, **kwargs):
+    """Get special task entries for a given host.
+
+    Query the special tasks table for tasks that ran on the host
+    given by 'host' and matching the given 'kwargs'.
+    Return a list of the results.  If the host is assigned to a
+    shard, forward this call to that shard.
+
+    @param host      id or name of a host. More often a hostname.
+    @param kwargs    Filter keywords to pass to the underlying database query.
+
+    """
+    # Retrieve host data even if the host is in an invalid state.
+    host_model = models.Host.smart_get(host, False)
+    if not host_model.shard:
+        return get_num_special_tasks(host=host, **kwargs)
+    else:
+        shard_afe = frontend.AFE(server=host_model.shard.rpc_hostname())
+        return shard_afe.run('get_num_special_tasks', host=host, **kwargs)
+
+
 def get_status_task(host_id, end_time):
     """Get the "status task" for a host from the local shard.
 
@@ -1210,7 +1245,7 @@
 
 # support for host detail view
 
-def get_host_queue_entries_and_special_tasks(host_id, query_start=None,
+def get_host_queue_entries_and_special_tasks(host, query_start=None,
                                              query_limit=None, start_time=None,
                                              end_time=None):
     """
@@ -1221,18 +1256,17 @@
     total_limit = None
     if query_limit is not None:
         total_limit = query_start + query_limit
-    filter_data_common = {'host': host_id,
+    filter_data_common = {'host': host,
                           'query_limit': total_limit,
                           'sort_by': ['-id']}
 
-    filter_data_queue_entries, filter_data_special_tasks = (
-            rpc_utils.inject_times_to_hqe_special_tasks_filters(
-                    filter_data_common, start_time, end_time))
+    filter_data_special_tasks = rpc_utils.inject_times_to_filter(
+            'time_started__gte', 'time_started__lte', start_time, end_time,
+            **filter_data_common)
 
-    queue_entries = list(models.HostQueueEntry.query_objects(
-            filter_data_queue_entries))
-    special_tasks = list(models.SpecialTask.query_objects(
-            filter_data_special_tasks))
+    queue_entries = get_host_queue_entries(
+            start_time, end_time, **filter_data_common)
+    special_tasks = get_host_special_tasks(host, **filter_data_special_tasks)
 
     interleaved_entries = rpc_utils.interleave_entries(queue_entries,
                                                        special_tasks)
@@ -1240,19 +1274,20 @@
         interleaved_entries = interleaved_entries[query_start:]
     if query_limit is not None:
         interleaved_entries = interleaved_entries[:query_limit]
-    return rpc_utils.prepare_for_serialization(interleaved_entries)
+    return rpc_utils.prepare_host_queue_entries_and_special_tasks(
+            interleaved_entries, queue_entries)
 
 
-def get_num_host_queue_entries_and_special_tasks(host_id, start_time=None,
+def get_num_host_queue_entries_and_special_tasks(host, start_time=None,
                                                  end_time=None):
-    filter_data_common = {'host': host_id}
+    filter_data_common = {'host': host}
 
     filter_data_queue_entries, filter_data_special_tasks = (
             rpc_utils.inject_times_to_hqe_special_tasks_filters(
                     filter_data_common, start_time, end_time))
 
     return (models.HostQueueEntry.query_count(filter_data_queue_entries)
-            + models.SpecialTask.query_count(filter_data_special_tasks))
+            + get_host_num_special_tasks(**filter_data_special_tasks))
 
 
 # recurring run