[autotest] Support sharding for getting HQEs and special tasks
Following 2 RPCs that are used by AFE java client are improved
to support sharding.
get_host_queue_entries_and_special_tasks
get_num_host_queue_entries_and_special_tasks
BUG=chromium:462819
DEPLOY=afe,apache
TEST=puppylab. Get HQEs and special tasks for a lumpy host
which is sharded from the master AFE.
Change-Id: I68c0d1a70fb6b61034c755d3a2b7f26475994bb0
Reviewed-on: https://chromium-review.googlesource.com/268523
Reviewed-by: Mungyung Ryu <mkryu@google.com>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index fd3b645..389e494 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -14,6 +14,7 @@
from autotest_lib.frontend.afe import models, model_logic
from autotest_lib.client.common_lib import control_data, error
from autotest_lib.client.common_lib import global_config, priorities
+from autotest_lib.client.common_lib import time_utils
from autotest_lib.server.cros import provision
from autotest_lib.server import frontend
from autotest_lib.server import utils as server_utils
@@ -658,27 +659,81 @@
# support for get_host_queue_entries_and_special_tasks()
-def _common_entry_to_dict(entry, type, job_dict):
+def _common_entry_to_dict(entry, type, job_dict, exec_path, status, started_on):
return dict(type=type,
- host=entry.host.get_object_dict(),
+ host=entry['host'],
job=job_dict,
- execution_path=entry.execution_path(),
- status=entry.status,
- started_on=entry.started_on,
- id=str(entry.id) + type,
- oid=entry.id)
+ execution_path=exec_path,
+ status=status,
+ started_on=started_on,
+ id=str(entry['id']) + type,
+ oid=entry['id'])
-def _special_task_to_dict(special_task):
+def _special_task_to_dict(task, queue_entries):
+ """Transforms a special task dictionary to another form of dictionary.
+
+ @param task Special task as a dictionary type
+ @param queue_entries Host queue entries as a list of dictionaries.
+
+ @return Transformed dictionary for a special task.
+ """
job_dict = None
- if special_task.queue_entry:
- job_dict = special_task.queue_entry.job.get_object_dict()
- return _common_entry_to_dict(special_task, special_task.task, job_dict)
+ if task['queue_entry']:
+ # Scan queue_entries to get the job detail info.
+ for qentry in queue_entries:
+ if task['queue_entry']['id'] == qentry['id']:
+ job_dict = qentry['job']
+ break
+ # If not found, get it from DB.
+ if job_dict is None:
+ job = models.Job.objects.get(id=task['queue_entry']['job'])
+ job_dict = job.get_object_dict()
+
+ exec_path = server_utils.get_special_task_exec_path(
+ task['host']['hostname'], task['id'], task['task'],
+ time_utils.time_string_to_datetime(task['time_requested']))
+ status = server_utils.get_special_task_status(
+ task['is_complete'], task['success'], task['is_active'])
+ return _common_entry_to_dict(task, task['task'], job_dict,
+ exec_path, status, task['time_started'])
def _queue_entry_to_dict(queue_entry):
- return _common_entry_to_dict(queue_entry, 'Job',
- queue_entry.job.get_object_dict())
+ job_dict = queue_entry['job']
+ tag = server_utils.get_job_tag(job_dict['id'], job_dict['owner'])
+ exec_path = server_utils.get_hqe_exec_path(tag,
+ queue_entry['execution_subdir'])
+ return _common_entry_to_dict(queue_entry, 'Job', job_dict, exec_path,
+ queue_entry['status'], queue_entry['started_on'])
+
+
+def prepare_host_queue_entries_and_special_tasks(interleaved_entries,
+ queue_entries):
+ """
+ Prepare for serialization the interleaved entries of host queue entries
+ and special tasks.
+ Each element in the entries is a dictionary type.
+ The special task dictionary has only a job id for a job and lacks
+ the detail of the job while the host queue entry dictionary has.
+ queue_entries is used to look up the job detail info.
+
+ @param interleaved_entries Host queue entries and special tasks as a list
+ of dictionaries.
+ @param queue_entries Host queue entries as a list of dictionaries.
+
+ @return A post-processed list of dictionaries that is to be serialized.
+ """
+ dict_list = []
+ for e in interleaved_entries:
+ # Distinguish the two mixed entries based on the existence of
+ # the key "task". If an entry has the key, the entry is for
+ # special task. Otherwise, host queue entry.
+ if 'task' in e:
+ dict_list.append(_special_task_to_dict(e, queue_entries))
+ else:
+ dict_list.append(_queue_entry_to_dict(e))
+ return prepare_for_serialization(dict_list)
def _compute_next_job_for_tasks(queue_entries, special_tasks):
@@ -691,26 +746,32 @@
since queue_entries may also have null started_on values.
* if the task has neither, or if use of time_started fails, just use the
last computed job ID.
+
+ @param queue_entries Host queue entries as a list of dictionaries.
+ @param special_tasks Special tasks as a list of dictionaries.
"""
next_job_id = None # most recently computed next job
hqe_index = 0 # index for scanning by started_on times
for task in special_tasks:
- if task.queue_entry:
- next_job_id = task.queue_entry.job.id
- elif task.time_started is not None:
+ if task['queue_entry']:
+ next_job_id = task['queue_entry']['job']
+ elif task['time_started'] is not None:
for queue_entry in queue_entries[hqe_index:]:
- if queue_entry.started_on is None:
+ if queue_entry['started_on'] is None:
continue
- if queue_entry.started_on < task.time_started:
+ t1 = time_utils.time_string_to_datetime(
+ queue_entry['started_on'])
+ t2 = time_utils.time_string_to_datetime(task['time_started'])
+ if t1 < t2:
break
- next_job_id = queue_entry.job.id
+ next_job_id = queue_entry['job']['id']
- task.next_job_id = next_job_id
+ task['next_job_id'] = next_job_id
# advance hqe_index to just after next_job_id
if next_job_id is not None:
for queue_entry in queue_entries[hqe_index:]:
- if queue_entry.job.id < next_job_id:
+ if queue_entry['job']['id'] < next_job_id:
break
hqe_index += 1
@@ -724,19 +785,19 @@
# start with all special tasks that've run since the last job
interleaved_entries = []
for task in special_tasks:
- if task.next_job_id is not None:
+ if task['next_job_id'] is not None:
break
- interleaved_entries.append(_special_task_to_dict(task))
+ interleaved_entries.append(task)
# now interleave queue entries with the remaining special tasks
special_task_index = len(interleaved_entries)
for queue_entry in queue_entries:
- interleaved_entries.append(_queue_entry_to_dict(queue_entry))
+ interleaved_entries.append(queue_entry)
# add all tasks that ran between this job and the previous one
for task in special_tasks[special_task_index:]:
- if task.next_job_id < queue_entry.job.id:
+ if task['next_job_id'] < queue_entry['job']['id']:
break
- interleaved_entries.append(_special_task_to_dict(task))
+ interleaved_entries.append(task)
special_task_index += 1
return interleaved_entries