[autotest] use parent_id to reduce number of db queries in suite.wait

This CL reduces the number of queries required when waiting for a
suite's job results, by replacing (when suite_job_id is known) a loop
over afe.get_jobs(id=job.id, finished=True) for all remaining jobs in a
suite with a single call to
afe.get_jobs(parent_job_id=parent_job_id, finished=True)

I have not yet done a performance benchmark with this change, nor
examined whether the underlying SQL queries could be significantly
optimized with an intelligently chosen new database index.

CQ-DEPEND=CL:I4b407fd0cc1a769ad0d4829bdffed87213e2da04
BUG=chromium:276471
TEST=Used run_suite locally, verified that suite waiting on its child
jobs and that results were reported correctly. Added a unit test, which
passes.

Change-Id: I47dbc5f1a3348bdd5297ffbd2e26b82dc9d705f6
Reviewed-on: https://gerrit.chromium.org/gerrit/66443
Reviewed-by: Aviv Keshet <akeshet@chromium.org>
Tested-by: Aviv Keshet <akeshet@chromium.org>
Commit-Queue: Aviv Keshet <akeshet@chromium.org>
diff --git a/server/cros/dynamic_suite/fakes.py b/server/cros/dynamic_suite/fakes.py
index cd18645..3a5d8d9 100644
--- a/server/cros/dynamic_suite/fakes.py
+++ b/server/cros/dynamic_suite/fakes.py
@@ -26,12 +26,13 @@
 
 class FakeJob(object):
     """Faked out RPC-client-side Job object."""
-    def __init__(self, id=0, statuses=[], hostnames=[]):
+    def __init__(self, id=0, statuses=[], hostnames=[], parent_job_id=None):
         self.id = id
         self.hostnames = hostnames if hostnames else ['host%d' % id]
         self.owner = 'tester'
         self.name = 'Fake Job %d' % self.id
         self.statuses = statuses
+        self.parent_job_id = parent_job_id
 
 
 class FakeHost(object):
diff --git a/server/cros/dynamic_suite/job_status.py b/server/cros/dynamic_suite/job_status.py
index 08b5eee..2119bc5 100644
--- a/server/cros/dynamic_suite/job_status.py
+++ b/server/cros/dynamic_suite/job_status.py
@@ -309,6 +309,67 @@
                 status.test_name.startswith('CLIENT_JOB'))
 
 
+def _yield_job_results(afe, tko, job):
+    """
+    Yields the results of an individual job.
+
+    Yields one Status object per test.
+
+    @param afe: an instance of AFE as defined in server/frontend.py.
+    @param tko: an instance of TKO as defined in server/frontend.py.
+    @param job: Job object to get results from, as defined in
+                server/frontend.py
+    @yields an iterator of Statuses, one per test.
+    """
+    entries = afe.run('get_host_queue_entries', job=job.id)
+    if reduce(_collate_aborted, entries, False):
+        yield Status('ABORT', job.name)
+    else:
+        statuses = tko.get_job_test_statuses_from_db(job.id)
+        for s in statuses:
+            if _status_for_test(s):
+                yield Status(s.status, s.test_name, s.reason,
+                             s.test_started_time, s.test_finished_time,
+                             job.id, job.owner, s.hostname, job.name)
+            else:
+                if s.status != 'GOOD':
+                    yield Status(s.status,
+                                 '%s_%s' % (entries[0]['job']['name'],
+                                            s.test_name),
+                                 s.reason, s.test_started_time,
+                                 s.test_finished_time, job.id,
+                                 job.owner, s.hostname, job.name)
+
+
+def wait_for_child_results(afe, tko, parent_job_id):
+    """
+    Wait for results of all tests in jobs with given parent id.
+
+    Currently polls for results every 5s.  Yields one Status object per test
+    as results become available.
+
+    @param afe: an instance of AFE as defined in server/frontend.py.
+    @param tko: an instance of TKO as defined in server/frontend.py.
+    @param parent_job_id: Parent job id for the jobs to wait on.
+    @yields an iterator of Statuses, one per test.
+    """
+    remaining_child_jobs = set(job.id for job in
+                               afe.get_jobs(parent_job_id=parent_job_id))
+    while remaining_child_jobs:
+        new_finished_jobs = [job for job in
+                             afe.get_jobs(parent_job_id=parent_job_id,
+                                          finished=True)
+                             if job.id in remaining_child_jobs]
+
+        for job in new_finished_jobs:
+
+            remaining_child_jobs.remove(job.id)
+            for result in _yield_job_results(afe, tko, job):
+                yield result
+
+        time.sleep(5)
+
+
 def wait_for_results(afe, tko, jobs):
     """
     Wait for results of all tests in all jobs in |jobs|.
@@ -319,7 +380,7 @@
     @param afe: an instance of AFE as defined in server/frontend.py.
     @param tko: an instance of TKO as defined in server/frontend.py.
     @param jobs: a list of Job objects, as defined in server/frontend.py.
-    @return a list of Statuses, one per test.
+    @yields an iterator of Statuses, one per test.
     """
     local_jobs = list(jobs)
     while local_jobs:
@@ -328,25 +389,9 @@
                 continue
 
             local_jobs.remove(job)
+            for result in _yield_job_results(afe, tko, job):
+                yield result
 
-            entries = afe.run('get_host_queue_entries', job=job.id)
-            if reduce(_collate_aborted, entries, False):
-                yield Status('ABORT', job.name)
-            else:
-                statuses = tko.get_job_test_statuses_from_db(job.id)
-                for s in statuses:
-                    if _status_for_test(s):
-                        yield Status(s.status, s.test_name, s.reason,
-                                     s.test_started_time, s.test_finished_time,
-                                     job.id, job.owner, s.hostname, job.name)
-                    else:
-                        if s.status != 'GOOD':
-                            yield Status(s.status,
-                                         '%s_%s' % (entries[0]['job']['name'],
-                                                    s.test_name),
-                                         s.reason, s.test_started_time,
-                                         s.test_finished_time, job.id,
-                                         job.owner, s.hostname, job.name)
         time.sleep(5)
 
 
diff --git a/server/cros/dynamic_suite/job_status_unittest.py b/server/cros/dynamic_suite/job_status_unittest.py
index d2f6214..e223bf7 100644
--- a/server/cros/dynamic_suite/job_status_unittest.py
+++ b/server/cros/dynamic_suite/job_status_unittest.py
@@ -4,6 +4,8 @@
 # Use of this source code is governed by a BSD-style license that can be
 # found in the LICENSE file.
 
+# pylint: disable-msg=C0111
+
 """Unit tests for server/cros/dynamic_suite/job_status.py."""
 
 import mox
@@ -407,6 +409,10 @@
 
     def expect_result_gathering(self, job):
         self.afe.get_jobs(id=job.id, finished=True).AndReturn(job)
+        self.expect_yield_job_entries(job)
+
+
+    def expect_yield_job_entries(self, job):
         entries = [s.entry for s in job.statuses]
         self.afe.run('get_host_queue_entries',
                      job=job.id).AndReturn(entries)
@@ -414,6 +420,7 @@
             self.tko.get_job_test_statuses_from_db(job.id).AndReturn(
                     job.statuses)
 
+
     def testWaitForResults(self):
         """Should gather status and return records for job summaries."""
         jobs = [FakeJob(0, [FakeStatus('GOOD', 'T0', ''),
@@ -451,6 +458,65 @@
                 self.assertTrue(True in map(status.equals_record, results))
 
 
+    def testWaitForChildResults(self):
+        """Should gather status and return records for job summaries."""
+        parent_job_id = 54321
+        jobs = [FakeJob(0, [FakeStatus('GOOD', 'T0', ''),
+                            FakeStatus('GOOD', 'T1', '')],
+                        parent_job_id=parent_job_id),
+                FakeJob(1, [FakeStatus('ERROR', 'T0', 'err', False),
+                            FakeStatus('GOOD', 'T1', '')],
+                        parent_job_id=parent_job_id),
+                FakeJob(2, [FakeStatus('TEST_NA', 'T0', 'no')],
+                        parent_job_id=parent_job_id),
+                FakeJob(3, [FakeStatus('FAIL', 'T0', 'broken')],
+                        parent_job_id=parent_job_id),
+                FakeJob(4, [FakeStatus('ERROR', 'SERVER_JOB', 'server error'),
+                            FakeStatus('GOOD', 'T0', '')],
+                        parent_job_id=parent_job_id),
+                FakeJob(5, [FakeStatus('ERROR', 'T0', 'gah', True)],
+                        parent_job_id=parent_job_id),
+                # The next job shouldn't be recorded in the results.
+                FakeJob(6, [FakeStatus('GOOD', 'SERVER_JOB', '')],
+                        parent_job_id=12345)]
+        for status in jobs[4].statuses:
+            status.entry['job'] = {'name': 'broken_infra_job'}
+
+        # Expect one call to get a list of all child jobs.
+        self.afe.get_jobs(parent_job_id=parent_job_id).AndReturn(jobs[:6])
+
+        # Have the first two jobs be finished by the first polling,
+        # and the remaining ones (not including #6) for the second polling.
+        self.afe.get_jobs(parent_job_id=parent_job_id,
+                          finished=True).AndReturn([jobs[1]])
+        self.expect_yield_job_entries(jobs[1])
+
+        self.afe.get_jobs(parent_job_id=parent_job_id,
+                          finished=True).AndReturn(jobs[:2])
+        self.expect_yield_job_entries(jobs[0])
+
+        self.afe.get_jobs(parent_job_id=parent_job_id,
+                          finished=True).AndReturn(jobs[:6])
+        for job in jobs[2:6]:
+            self.expect_yield_job_entries(job)
+        # Then, expect job[0] to be ready.
+
+        # Expect us to poll thrice
+        self.mox.StubOutWithMock(time, 'sleep')
+        time.sleep(5)
+        time.sleep(5)
+        time.sleep(5)
+        self.mox.ReplayAll()
+
+        results = [result for result in job_status.wait_for_child_results(
+                                                self.afe,
+                                                self.tko,
+                                                parent_job_id)]
+        for job in jobs[:6]:  # the 'GOOD' SERVER_JOB shouldn't be there.
+            for status in job.statuses:
+                self.assertTrue(True in map(status.equals_record, results))
+
+
     def testGatherPerHostResults(self):
         """Should gather per host results."""
         # For the 0th job, the 1st entry is more bad/specific.
diff --git a/server/cros/dynamic_suite/suite.py b/server/cros/dynamic_suite/suite.py
index 2f35ed0..1bd6ef3 100644
--- a/server/cros/dynamic_suite/suite.py
+++ b/server/cros/dynamic_suite/suite.py
@@ -441,9 +441,16 @@
         if self._file_bugs:
             bug_reporter = reporting.Reporter()
         try:
-            for result in job_status.wait_for_results(self._afe,
-                                                      self._tko,
-                                                      self._jobs):
+            if self._suite_job_id:
+                results_generator = job_status.wait_for_child_results(
+                        self._afe, self._tko, self._suite_job_id)
+            else:
+                logging.warn('Unknown suite_job_id, falling back to less '
+                             'efficient results_generator.')
+                results_generator = job_status.wait_for_results(self._afe,
+                                                                self._tko,
+                                                                self._jobs)
+            for result in results_generator:
                 result.record_all(record)
                 if (self._results_dir and
                     job_status.is_for_infrastructure_fail(result)):