[autotest] use parent_id to reduce number of db queries in suite.wait
This CL reduces the number of queries required when waiting for a
suite's job results, by replacing (when suite_job_id is known) a loop
over afe.get_jobs(id=job.id, finished=True) for all remaining jobs in a
suite with a single call to
afe.get_jobs(parent_job_id=parent_job_id, finished=True)
I have not yet done a performance benchmark with this change, nor
examined whether the underlying SQL queries could be significantly
optimized with an intelligently chosen new database index.
CQ-DEPEND=CL:I4b407fd0cc1a769ad0d4829bdffed87213e2da04
BUG=chromium:276471
TEST=Used run_suite locally, verified that suite waiting on its child
jobs and that results were reported correctly. Added a unit test, which
passes.
Change-Id: I47dbc5f1a3348bdd5297ffbd2e26b82dc9d705f6
Reviewed-on: https://gerrit.chromium.org/gerrit/66443
Reviewed-by: Aviv Keshet <akeshet@chromium.org>
Tested-by: Aviv Keshet <akeshet@chromium.org>
Commit-Queue: Aviv Keshet <akeshet@chromium.org>
diff --git a/server/cros/dynamic_suite/fakes.py b/server/cros/dynamic_suite/fakes.py
index cd18645..3a5d8d9 100644
--- a/server/cros/dynamic_suite/fakes.py
+++ b/server/cros/dynamic_suite/fakes.py
@@ -26,12 +26,13 @@
class FakeJob(object):
"""Faked out RPC-client-side Job object."""
- def __init__(self, id=0, statuses=[], hostnames=[]):
+ def __init__(self, id=0, statuses=[], hostnames=[], parent_job_id=None):
self.id = id
self.hostnames = hostnames if hostnames else ['host%d' % id]
self.owner = 'tester'
self.name = 'Fake Job %d' % self.id
self.statuses = statuses
+ self.parent_job_id = parent_job_id
class FakeHost(object):
diff --git a/server/cros/dynamic_suite/job_status.py b/server/cros/dynamic_suite/job_status.py
index 08b5eee..2119bc5 100644
--- a/server/cros/dynamic_suite/job_status.py
+++ b/server/cros/dynamic_suite/job_status.py
@@ -309,6 +309,67 @@
status.test_name.startswith('CLIENT_JOB'))
+def _yield_job_results(afe, tko, job):
+ """
+ Yields the results of an individual job.
+
+ Yields one Status object per test.
+
+ @param afe: an instance of AFE as defined in server/frontend.py.
+ @param tko: an instance of TKO as defined in server/frontend.py.
+ @param job: Job object to get results from, as defined in
+ server/frontend.py
+ @yields an iterator of Statuses, one per test.
+ """
+ entries = afe.run('get_host_queue_entries', job=job.id)
+ if reduce(_collate_aborted, entries, False):
+ yield Status('ABORT', job.name)
+ else:
+ statuses = tko.get_job_test_statuses_from_db(job.id)
+ for s in statuses:
+ if _status_for_test(s):
+ yield Status(s.status, s.test_name, s.reason,
+ s.test_started_time, s.test_finished_time,
+ job.id, job.owner, s.hostname, job.name)
+ else:
+ if s.status != 'GOOD':
+ yield Status(s.status,
+ '%s_%s' % (entries[0]['job']['name'],
+ s.test_name),
+ s.reason, s.test_started_time,
+ s.test_finished_time, job.id,
+ job.owner, s.hostname, job.name)
+
+
+def wait_for_child_results(afe, tko, parent_job_id):
+ """
+ Wait for results of all tests in jobs with given parent id.
+
+ Currently polls for results every 5s. Yields one Status object per test
+ as results become available.
+
+ @param afe: an instance of AFE as defined in server/frontend.py.
+ @param tko: an instance of TKO as defined in server/frontend.py.
+ @param parent_job_id: Parent job id for the jobs to wait on.
+ @yields an iterator of Statuses, one per test.
+ """
+ remaining_child_jobs = set(job.id for job in
+ afe.get_jobs(parent_job_id=parent_job_id))
+ while remaining_child_jobs:
+ new_finished_jobs = [job for job in
+ afe.get_jobs(parent_job_id=parent_job_id,
+ finished=True)
+ if job.id in remaining_child_jobs]
+
+ for job in new_finished_jobs:
+
+ remaining_child_jobs.remove(job.id)
+ for result in _yield_job_results(afe, tko, job):
+ yield result
+
+ time.sleep(5)
+
+
def wait_for_results(afe, tko, jobs):
"""
Wait for results of all tests in all jobs in |jobs|.
@@ -319,7 +380,7 @@
@param afe: an instance of AFE as defined in server/frontend.py.
@param tko: an instance of TKO as defined in server/frontend.py.
@param jobs: a list of Job objects, as defined in server/frontend.py.
- @return a list of Statuses, one per test.
+ @yields an iterator of Statuses, one per test.
"""
local_jobs = list(jobs)
while local_jobs:
@@ -328,25 +389,9 @@
continue
local_jobs.remove(job)
+ for result in _yield_job_results(afe, tko, job):
+ yield result
- entries = afe.run('get_host_queue_entries', job=job.id)
- if reduce(_collate_aborted, entries, False):
- yield Status('ABORT', job.name)
- else:
- statuses = tko.get_job_test_statuses_from_db(job.id)
- for s in statuses:
- if _status_for_test(s):
- yield Status(s.status, s.test_name, s.reason,
- s.test_started_time, s.test_finished_time,
- job.id, job.owner, s.hostname, job.name)
- else:
- if s.status != 'GOOD':
- yield Status(s.status,
- '%s_%s' % (entries[0]['job']['name'],
- s.test_name),
- s.reason, s.test_started_time,
- s.test_finished_time, job.id,
- job.owner, s.hostname, job.name)
time.sleep(5)
diff --git a/server/cros/dynamic_suite/job_status_unittest.py b/server/cros/dynamic_suite/job_status_unittest.py
index d2f6214..e223bf7 100644
--- a/server/cros/dynamic_suite/job_status_unittest.py
+++ b/server/cros/dynamic_suite/job_status_unittest.py
@@ -4,6 +4,8 @@
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
+# pylint: disable-msg=C0111
+
"""Unit tests for server/cros/dynamic_suite/job_status.py."""
import mox
@@ -407,6 +409,10 @@
def expect_result_gathering(self, job):
self.afe.get_jobs(id=job.id, finished=True).AndReturn(job)
+ self.expect_yield_job_entries(job)
+
+
+ def expect_yield_job_entries(self, job):
entries = [s.entry for s in job.statuses]
self.afe.run('get_host_queue_entries',
job=job.id).AndReturn(entries)
@@ -414,6 +420,7 @@
self.tko.get_job_test_statuses_from_db(job.id).AndReturn(
job.statuses)
+
def testWaitForResults(self):
"""Should gather status and return records for job summaries."""
jobs = [FakeJob(0, [FakeStatus('GOOD', 'T0', ''),
@@ -451,6 +458,65 @@
self.assertTrue(True in map(status.equals_record, results))
+ def testWaitForChildResults(self):
+ """Should gather status and return records for job summaries."""
+ parent_job_id = 54321
+ jobs = [FakeJob(0, [FakeStatus('GOOD', 'T0', ''),
+ FakeStatus('GOOD', 'T1', '')],
+ parent_job_id=parent_job_id),
+ FakeJob(1, [FakeStatus('ERROR', 'T0', 'err', False),
+ FakeStatus('GOOD', 'T1', '')],
+ parent_job_id=parent_job_id),
+ FakeJob(2, [FakeStatus('TEST_NA', 'T0', 'no')],
+ parent_job_id=parent_job_id),
+ FakeJob(3, [FakeStatus('FAIL', 'T0', 'broken')],
+ parent_job_id=parent_job_id),
+ FakeJob(4, [FakeStatus('ERROR', 'SERVER_JOB', 'server error'),
+ FakeStatus('GOOD', 'T0', '')],
+ parent_job_id=parent_job_id),
+ FakeJob(5, [FakeStatus('ERROR', 'T0', 'gah', True)],
+ parent_job_id=parent_job_id),
+ # The next job shouldn't be recorded in the results.
+ FakeJob(6, [FakeStatus('GOOD', 'SERVER_JOB', '')],
+ parent_job_id=12345)]
+ for status in jobs[4].statuses:
+ status.entry['job'] = {'name': 'broken_infra_job'}
+
+ # Expect one call to get a list of all child jobs.
+ self.afe.get_jobs(parent_job_id=parent_job_id).AndReturn(jobs[:6])
+
+ # Have the first two jobs be finished by the first polling,
+ # and the remaining ones (not including #6) for the second polling.
+ self.afe.get_jobs(parent_job_id=parent_job_id,
+ finished=True).AndReturn([jobs[1]])
+ self.expect_yield_job_entries(jobs[1])
+
+ self.afe.get_jobs(parent_job_id=parent_job_id,
+ finished=True).AndReturn(jobs[:2])
+ self.expect_yield_job_entries(jobs[0])
+
+ self.afe.get_jobs(parent_job_id=parent_job_id,
+ finished=True).AndReturn(jobs[:6])
+ for job in jobs[2:6]:
+ self.expect_yield_job_entries(job)
+ # Then, expect job[0] to be ready.
+
+ # Expect us to poll thrice
+ self.mox.StubOutWithMock(time, 'sleep')
+ time.sleep(5)
+ time.sleep(5)
+ time.sleep(5)
+ self.mox.ReplayAll()
+
+ results = [result for result in job_status.wait_for_child_results(
+ self.afe,
+ self.tko,
+ parent_job_id)]
+ for job in jobs[:6]: # the 'GOOD' SERVER_JOB shouldn't be there.
+ for status in job.statuses:
+ self.assertTrue(True in map(status.equals_record, results))
+
+
def testGatherPerHostResults(self):
"""Should gather per host results."""
# For the 0th job, the 1st entry is more bad/specific.
diff --git a/server/cros/dynamic_suite/suite.py b/server/cros/dynamic_suite/suite.py
index 2f35ed0..1bd6ef3 100644
--- a/server/cros/dynamic_suite/suite.py
+++ b/server/cros/dynamic_suite/suite.py
@@ -441,9 +441,16 @@
if self._file_bugs:
bug_reporter = reporting.Reporter()
try:
- for result in job_status.wait_for_results(self._afe,
- self._tko,
- self._jobs):
+ if self._suite_job_id:
+ results_generator = job_status.wait_for_child_results(
+ self._afe, self._tko, self._suite_job_id)
+ else:
+ logging.warn('Unknown suite_job_id, falling back to less '
+ 'efficient results_generator.')
+ results_generator = job_status.wait_for_results(self._afe,
+ self._tko,
+ self._jobs)
+ for result in results_generator:
result.record_all(record)
if (self._results_dir and
job_status.is_for_infrastructure_fail(result)):