[autotest] Create the get_host_special_tasks() RPC.

The existing get_special_tasks() RPC has two distinct callers:
Callers that only need tasks from the current shard, and callers
that need tasks for a specific host (and therefore, require the
RPC to be forwarded to the host's shard).  This change splits that
function into two separate RPC calls:  get_special_tasks() is used
for the local shard case, and get_host_special_tasks() is used for
single-host queries.

The only caller affected by this change is the status_history
module.  To preserve compatibility with older versions of dut-status
this change is rolling out in three stages.  This change provides
temporary support for both old and new versions of dut-status until
all the old versions of dut-status can be updated.

BUG=None
TEST=Test dut-status on a local instance using the prod database

Change-Id: I06f239d011d20f5a6357c35121951f9ecf34f61e
Reviewed-on: https://chromium-review.googlesource.com/265486
Tested-by: Richard Barnette <jrbarnette@chromium.org>
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Richard Barnette <jrbarnette@chromium.org>
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index 6e2611d..16dd017 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -38,11 +38,12 @@
 import common
 from autotest_lib.client.common_lib import priorities
 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
-from autotest_lib.frontend.afe import models, model_logic, model_attributes
 from autotest_lib.frontend.afe import control_file, rpc_utils
+from autotest_lib.frontend.afe import models, model_logic, model_attributes
 from autotest_lib.frontend.afe import site_rpc_interface
 from autotest_lib.frontend.tko import models as tko_models
 from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
+from autotest_lib.server import frontend
 from autotest_lib.server import utils
 from autotest_lib.server.cros.dynamic_suite import tools
 
@@ -1066,7 +1067,9 @@
                                                    start_time,
                                                    end_time,
                                                    **filter_data)
-    return rpc_utils.get_serialized_local_host_queue_entries(**filter_data)
+    return rpc_utils.prepare_rows_as_nested_dicts(
+            models.HostQueueEntry.query_objects(filter_data),
+            ('host', 'atomic_group', 'job'))
 
 
 def get_num_host_queue_entries(start_time=None, end_time=None, **filter_data):
@@ -1096,14 +1099,71 @@
 
 # special tasks
 
+def _get_local_special_tasks(**filter_data):
+    # TODO(jrbarnette): This function represents the correct
+    # implementation of get_special_tasks().  It's being kept
+    # separate until we can fix get_special_tasks() (see below).
+    return rpc_utils.prepare_rows_as_nested_dicts(
+            models.SpecialTask.query_objects(filter_data),
+            ('host', 'queue_entry'))
+
+
 def get_special_tasks(**filter_data):
-    # Task id is not universally unique, the id passed in would only be
-    # applicable to local db.
+    """Get special task entries from the local database.
+
+    Query the special tasks table for tasks matching the given
+    `filter_data`, and return a list of the results.  No attempt is
+    made to forward the call to shards; the buck will stop here.
+    The caller is expected to know the target shard for such reasons
+    as:
+      * The caller is a service (such as gs_offloader) configured
+        to operate on behalf of one specific shard, and no other.
+      * The caller has a host as a parameter, and knows that this is
+        the shard assigned to that host.
+
+    @param filter_data  Filter keywords to pass to the underlying
+                        database query.
+
+    """
+    # TODO(jrbarnette): Currently, this code still forwards to
+    # shards despite the specification that says we don't.  This is
+    # a temporary measure to provide compatibility to dut-status
+    # clients that haven't been updated to use
+    # get_host_special_tasks().  This code must be fixed/deleted
+    # once all those clients have been updated.
     if 'id' in filter_data or 'id__in' in filter_data:
-        return rpc_utils.get_serialized_local_special_tasks(**filter_data)
+        return _get_local_special_tasks(**filter_data)
     else:
-        return rpc_utils.get_data(rpc_utils.get_serialized_local_special_tasks,
-                'get_special_tasks', **filter_data)
+        return rpc_utils.get_data(_get_local_special_tasks,
+                                  'get_special_tasks', **filter_data)
+
+
+def get_host_special_tasks(host_id, **filter_data):
+    """Get special task entries for a given host.
+
+    Query the special tasks table for tasks that ran on the host
+    given by `host_id` and matching the given `filter_data`.
+    Return a list of the results.  If the host is assigned to a
+    shard, forward this call to that shard.
+
+    @param host_id      Id in the database of the target host.
+    @param filter_data  Filter keywords to pass to the underlying
+                        database query.
+
+    """
+    host = models.Host.smart_get(host_id)
+    if not host.shard:
+        # TODO(jrbarnette): change to get_special_tasks() once
+        # all dut-status clients are updated.
+        return _get_local_special_tasks(host_id=host_id, **filter_data)
+    else:
+        # The return value from AFE.get_special_tasks() is a list of
+        # post-processed objects that aren't JSON-serializable.  So,
+        # we have to call AFE.run() to get the raw, serializable
+        # output from the shard.
+        shard_afe = frontend.AFE(server=host.shard.rpc_hostname())
+        return shard_afe.run('get_special_tasks',
+                             host_id=host_id, **filter_data)
 
 
 # support for host detail view