[autotest] Support sharding for getting HQEs and special tasks

Following 2 RPCs that are used by AFE java client are improved
to support sharding.

get_host_queue_entries_and_special_tasks
get_num_host_queue_entries_and_special_tasks

BUG=chromium:462819
DEPLOY=afe,apache
TEST=puppylab. Get HQEs and special tasks for a lumpy host
     which is sharded from the master AFE.

Change-Id: I68c0d1a70fb6b61034c755d3a2b7f26475994bb0
Reviewed-on: https://chromium-review.googlesource.com/268523
Reviewed-by: Mungyung Ryu <mkryu@google.com>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index fd3b645..389e494 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -14,6 +14,7 @@
 from autotest_lib.frontend.afe import models, model_logic
 from autotest_lib.client.common_lib import control_data, error
 from autotest_lib.client.common_lib import global_config, priorities
+from autotest_lib.client.common_lib import time_utils
 from autotest_lib.server.cros import provision
 from autotest_lib.server import frontend
 from autotest_lib.server import utils as server_utils
@@ -658,27 +659,81 @@
 
 # support for get_host_queue_entries_and_special_tasks()
 
-def _common_entry_to_dict(entry, type, job_dict):
+def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
     return dict(type=type,
-                host=entry.host.get_object_dict(),
+                host=entry['host'],
                 job=job_dict,
-                execution_path=entry.execution_path(),
-                status=entry.status,
-                started_on=entry.started_on,
-                id=str(entry.id) + type,
-                oid=entry.id)
+                execution_path=exec_path,
+                status=status,
+                started_on=started_on,
+                id=str(entry['id']) + type,
+                oid=entry['id'])
 
 
-def _special_task_to_dict(special_task):
+def _special_task_to_dict(task, queue_entries):
+    """Transforms a special task dictionary to another form of dictionary.
+
+    @param task           Special task as a dictionary type
+    @param queue_entries  Host queue entries as a list of dictionaries.
+
+    @return Transformed dictionary for a special task.
+    """
     job_dict = None
-    if special_task.queue_entry:
-        job_dict = special_task.queue_entry.job.get_object_dict()
-    return _common_entry_to_dict(special_task, special_task.task, job_dict)
+    if task['queue_entry']:
+        # Scan queue_entries to get the job detail info.
+        for qentry in queue_entries:
+            if task['queue_entry']['id'] == qentry['id']:
+                job_dict = qentry['job']
+                break
+        # If not found, get it from DB.
+        if job_dict is None:
+            job = models.Job.objects.get(id=task['queue_entry']['job'])
+            job_dict = job.get_object_dict()
+
+    exec_path = server_utils.get_special_task_exec_path(
+            task['host']['hostname'], task['id'], task['task'],
+            time_utils.time_string_to_datetime(task['time_requested']))
+    status = server_utils.get_special_task_status(
+            task['is_complete'], task['success'], task['is_active'])
+    return _common_entry_to_dict(task, task['task'], job_dict,
+            exec_path, status, task['time_started'])
 
 
 def _queue_entry_to_dict(queue_entry):
-    return _common_entry_to_dict(queue_entry, 'Job',
-                                 queue_entry.job.get_object_dict())
+    job_dict = queue_entry['job']
+    tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
+    exec_path = server_utils.get_hqe_exec_path(tag,
+                                               queue_entry['execution_subdir'])
+    return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
+            queue_entry['status'], queue_entry['started_on'])
+
+
+def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
+                                                 queue_entries):
+    """
+    Prepare for serialization the interleaved entries of host queue entries
+    and special tasks.
+    Each element in the entries is a dictionary type.
+    The special task dictionary has only a job id for a job and lacks
+    the detail of the job while the host queue entry dictionary has.
+    queue_entries is used to look up the job detail info.
+
+    @param interleaved_entries  Host queue entries and special tasks as a list
+                                of dictionaries.
+    @param queue_entries        Host queue entries as a list of dictionaries.
+
+    @return A post-processed list of dictionaries that is to be serialized.
+    """
+    dict_list = []
+    for e in interleaved_entries:
+        # Distinguish the two mixed entries based on the existence of
+        # the key "task". If an entry has the key, the entry is for
+        # special task. Otherwise, host queue entry.
+        if 'task' in e:
+            dict_list.append(_special_task_to_dict(e, queue_entries))
+        else:
+            dict_list.append(_queue_entry_to_dict(e))
+    return prepare_for_serialization(dict_list)
 
 
 def _compute_next_job_for_tasks(queue_entries, special_tasks):
@@ -691,26 +746,32 @@
       since queue_entries may also have null started_on values.
     * if the task has neither, or if use of time_started fails, just use the
       last computed job ID.
+
+    @param queue_entries    Host queue entries as a list of dictionaries.
+    @param special_tasks    Special tasks as a list of dictionaries.
     """
     next_job_id = None # most recently computed next job
     hqe_index = 0 # index for scanning by started_on times
     for task in special_tasks:
-        if task.queue_entry:
-            next_job_id = task.queue_entry.job.id
-        elif task.time_started is not None:
+        if task['queue_entry']:
+            next_job_id = task['queue_entry']['job']
+        elif task['time_started'] is not None:
             for queue_entry in queue_entries[hqe_index:]:
-                if queue_entry.started_on is None:
+                if queue_entry['started_on'] is None:
                     continue
-                if queue_entry.started_on < task.time_started:
+                t1 = time_utils.time_string_to_datetime(
+                        queue_entry['started_on'])
+                t2 = time_utils.time_string_to_datetime(task['time_started'])
+                if t1 < t2:
                     break
-                next_job_id = queue_entry.job.id
+                next_job_id = queue_entry['job']['id']
 
-        task.next_job_id = next_job_id
+        task['next_job_id'] = next_job_id
 
         # advance hqe_index to just after next_job_id
         if next_job_id is not None:
             for queue_entry in queue_entries[hqe_index:]:
-                if queue_entry.job.id < next_job_id:
+                if queue_entry['job']['id'] < next_job_id:
                     break
                 hqe_index += 1
 
@@ -724,19 +785,19 @@
     # start with all special tasks that've run since the last job
     interleaved_entries = []
     for task in special_tasks:
-        if task.next_job_id is not None:
+        if task['next_job_id'] is not None:
             break
-        interleaved_entries.append(_special_task_to_dict(task))
+        interleaved_entries.append(task)
 
     # now interleave queue entries with the remaining special tasks
     special_task_index = len(interleaved_entries)
     for queue_entry in queue_entries:
-        interleaved_entries.append(_queue_entry_to_dict(queue_entry))
+        interleaved_entries.append(queue_entry)
         # add all tasks that ran between this job and the previous one
         for task in special_tasks[special_task_index:]:
-            if task.next_job_id < queue_entry.job.id:
+            if task['next_job_id'] < queue_entry['job']['id']:
                 break
-            interleaved_entries.append(_special_task_to_dict(task))
+            interleaved_entries.append(task)
             special_task_index += 1
 
     return interleaved_entries