Implementation of Test Planner execution engine. Is currently able to
schedule single-host tests and place the results in the proper planner
tables for analysis.

TODO: global support object, execution_engine.py unit tests

Signed-off-by: James Ren <jamesren@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@4301 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/frontend/afe/model_logic.py b/frontend/afe/model_logic.py
index 59e5ffb..2f03fef 100644
--- a/frontend/afe/model_logic.py
+++ b/frontend/afe/model_logic.py
@@ -947,7 +947,7 @@
 
         if isinstance(id_or_name, (int, long)):
             return manager.get(pk=id_or_name)
-        if isinstance(id_or_name, basestring):
+        if isinstance(id_or_name, basestring) and hasattr(cls, 'name_field'):
             return manager.get(**{cls.name_field : id_or_name})
         raise ValueError(
             'Invalid positional argument: %s (%s)' % (id_or_name,
diff --git a/frontend/afe/resources.py b/frontend/afe/resources.py
index b31426a..9527acb 100644
--- a/frontend/afe/resources.py
+++ b/frontend/afe/resources.py
@@ -554,6 +554,8 @@
         for label_name in meta_hosts:
             entry = Label.from_uri_args(self._request, label_name)
             entries.append({'meta_host': entry.link()})
+        if atomic_group_class:
+            entries.append({'atomic_group_class': atomic_group_class})
 
         result = self.link()
         result['queue_entries'] = entries
@@ -675,9 +677,9 @@
                 label_entry = containing_collection.resolve_link(
                         queue_entry['meta_host'])
                 metahost_label_objects.append(label_entry.instance)
-            if 'atomic_group' in queue_entry:
+            if 'atomic_group_class' in queue_entry:
                 atomic_group_entry = containing_collection.resolve_link(
-                        queue_entry['atomic_group'])
+                        queue_entry['atomic_group_class'])
                 if atomic_group:
                     assert atomic_group_entry.instance.id == atomic_group.id
                 else:
diff --git a/frontend/planner/execution_engine.py b/frontend/planner/execution_engine.py
index 30016c9..c0d1643 100644
--- a/frontend/planner/execution_engine.py
+++ b/frontend/planner/execution_engine.py
@@ -1,10 +1,26 @@
+import time, logging
+from autotest_lib.frontend.afe import model_attributes as afe_model_attributes
+from autotest_lib.frontend.shared import rest_client
+from autotest_lib.frontend.planner import model_attributes
+from autotest_lib.server import frontend
+
+
+TICK_INTERVAL_SECS = 10
+
 class ExecutionEngine(object):
     """
     Provides the Test Planner execution engine
     """
 
-    def __init__(self, plan_id):
-        self.plan_id = plan_id
+    _planner_rpc = frontend.Planner()
+    _tko_rpc = frontend.TKO()
+
+    def __init__(self, plan_id, server, label_name):
+        self._plan_id = plan_id
+        self._server = server
+        self._afe_rest = rest_client.Resource.load(
+                'http://%s/afe/server/resources' % server)
+        self._label_name = label_name
 
 
     def start(self):
@@ -13,4 +29,173 @@
 
         Thread remains in this method until the execution engine is complete.
         """
-        pass
+        self._initialize_plan()
+
+        while True:
+            if self._tick():
+                break
+            time.sleep(TICK_INTERVAL_SECS)
+
+        self._cleanup()
+
+
+    def _initialize_plan(self):
+        """
+        Performs actions necessary to start a test plan.
+
+        Adds the hosts into the proper atomic group, and waits for the plan to
+        be ready to start before returning
+        """
+        plan = self._planner_rpc.run('get_plan', id=self._plan_id)
+        name = plan['name'] + '_set_atomic_group'
+        if not self._afe_rest.jobs.get(name=name).total_results:
+            self._launch_set_atomic_group_job(name)
+
+        self._wait_for_initialization()
+
+
+    def _launch_set_atomic_group_job(self, name):
+        """
+        Launch the job to set the hosts' atomic group, and initate the plan
+
+        If the hosts are already part of an atomic group, wait for a tick and
+        try again. Return when successful
+        """
+        while True:
+            hosts = self._planner_rpc.run('get_hosts', plan_id=self._plan_id)
+            control = self._planner_rpc.run('get_atomic_group_control_file')
+
+            info = self._afe_rest.execution_info.get().execution_info
+            info['control_file'] = control
+            info['cleanup_before_job'] = afe_model_attributes.RebootBefore.NEVER
+            info['cleanup_after_job'] = afe_model_attributes.RebootAfter.NEVER
+            info['run_verify'] = False
+            info['machines_per_execution'] = len(hosts)
+
+            entries = self._afe_rest.queue_entries_request.get(
+                    hosts=hosts).queue_entries
+
+            keyvals = {'server': self._server,
+                       'label_name': self._label_name,
+                       'plan_id': self._plan_id}
+
+            job_req = {'name' : name,
+                       'execution_info' : info,
+                       'queue_entries' : entries,
+                       'keyvals' : keyvals}
+
+            try:
+                self._afe_rest.jobs.post(job_req)
+                logging.info('created job to set atomic group')
+                break
+            except rest_client.ClientError, e:
+                logging.info('hosts already in atomic group')
+                logging.info('(error was %s)' % e.message)
+                logging.info('waiting...')
+            time.sleep(TICK_INTERVAL_SECS)
+
+
+    def _wait_for_initialization(self):
+        while True:
+            plan = self._planner_rpc.run('get_plan', id=self._plan_id)
+            if plan['initialized']:
+                break
+            logging.info('waiting for initialization...')
+            time.sleep(TICK_INTERVAL_SECS)
+
+
+    def _cleanup(self):
+        self._afe_rest.labels.get(name=self._label_name).members[0].delete()
+
+
+    def _tick(self):
+        """
+        Processes one tick of the execution engine.
+
+        Returns True if the engine has completed the plan.
+        """
+        logging.info('tick')
+        self._process_finished_runs()
+        self._check_tko_jobs()
+        return self._schedule_new_runs()
+
+
+    def _process_finished_runs(self):
+        """
+        Finalize the test runs that have finished.
+
+        Look for runs that are in PASSED or FAILED, perform any additional
+        processing required, and set the entry to 'finalized'.
+        """
+        Status = model_attributes.TestRunStatus
+        runs = self._planner_rpc.run('get_test_runs', plan__id=self._plan_id,
+                                     status__in=(Status.PASSED, Status.FAILED),
+                                     finalized=False)
+        for run in runs:
+            logging.info('finalizing test run %s', run)
+            if run['status'] == Status.FAILED:
+                self._planner_rpc.run('modify_host', id=run['host'],
+                                      blocked=True)
+            self._planner_rpc.run('modify_test_run', id=run['id'],
+                                  finalized=True)
+
+
+    def _check_tko_jobs(self):
+        """
+        Instructs the server to update the Planner test runs table
+
+        Sends an RPC to have the server pull the proper TKO tests and add them
+        to the Planner tables. Logs information about what was added.
+        """
+        test_runs_updated = self._planner_rpc.run('update_test_runs',
+                                                  plan_id=self._plan_id)
+        for update in test_runs_updated:
+            logging.info('added %s test run for tko test id %s (%s)',
+                         update['status'], update['tko_test_idx'],
+                         update['hostname'])
+
+
+    def _schedule_new_runs(self):
+        next_configs = self._planner_rpc.run('get_next_test_configs',
+                                             plan_id=self._plan_id)
+        if next_configs['complete']:
+            return True
+
+        for config in next_configs['next_configs']:
+            self._run_job(hostname=config['host'],
+                          test_config_id=config['next_test_config_id'])
+
+        return False
+
+
+    def _run_job(self, hostname, test_config_id):
+        test_config = self._planner_rpc.run('get_test_config',
+                                            id=test_config_id)
+
+        info = self._afe_rest.execution_info.get().execution_info
+        info['control_file'] = test_config['control_file']['contents']
+        info['is_server'] = test_config['is_server']
+
+        atomic_group_class = self._afe_rest.labels.get(
+                name=self._label_name).members[0].get().atomic_group_class.href
+
+        request = self._afe_rest.queue_entries_request.get(
+                hosts=(hostname,), atomic_group_class=atomic_group_class)
+        entries = request.queue_entries
+
+        plan = self._planner_rpc.run('get_plan', id=self._plan_id)
+        prefix = plan['label_override']
+        if prefix is None:
+            prefix = plan['name']
+        job_req = {'name' : '%s_%s_%s' % (prefix, test_config['alias'],
+                                          hostname),
+                   'execution_info' : info,
+                   'queue_entries' : entries}
+
+        logging.info('starting test alias %s for host %s',
+                     test_config['alias'], hostname)
+        job = self._afe_rest.jobs.post(job_req)
+        self._planner_rpc.run('add_job',
+                              plan_id=self._plan_id,
+                              test_config_id=test_config_id,
+                              afe_job_id=job.get().id)
diff --git a/frontend/planner/execution_engine_control.srv b/frontend/planner/execution_engine_control.srv
index 96a4250..915196b 100644
--- a/frontend/planner/execution_engine_control.srv
+++ b/frontend/planner/execution_engine_control.srv
@@ -1,60 +1,9 @@
-import time
 from autotest_lib.client.common_lib import utils
-from autotest_lib.frontend.afe import model_attributes
-from autotest_lib.frontend.shared import rest_client
 from autotest_lib.frontend.planner import execution_engine
-from autotest_lib.server import frontend
-
-
-TICK_INTERVAL_SECS = 10
 
 keyvals = utils.read_keyval(job.resultdir)
 
-planner_rpc = frontend.Planner()
-afe_rest = rest_client.Resource.load(
-        'http://%s/afe/server/resources' % keyvals['server'])
-
-
-def _launch_set_atomic_group_job(plan, name):
-    """Launch the job to set the hosts' atomic group, and initate the plan"""
-    hosts = planner_rpc.run('get_hosts', plan_id=keyvals['plan_id'])
-    control = planner_rpc.run('get_atomic_group_control_file')
-
-    info = afe_rest.execution_info.get().execution_info
-    info['control_file'] = control
-    info['cleanup_before_job'] = model_attributes.RebootBefore.NEVER
-    info['cleanup_after_job'] = model_attributes.RebootAfter.NEVER
-    info['run_verify'] = False
-    info['machines_per_execution'] = len(hosts)
-
-    entries = afe_rest.queue_entries_request.get(hosts=hosts).queue_entries
-
-    job_req = {'name' : name,
-               'execution_info' : info,
-               'queue_entries' : entries,
-               'keyvals' : keyvals}
-
-    afe_rest.jobs.post(job_req)
-
-
-# Check if the plan is already being initialized, and launch the initialization
-# job if not
-plan = planner_rpc.run('get_plan', id=keyvals['plan_id'])
-name = plan['name'] + '_set_atomic_group'
-if not afe_rest.jobs.get(name=name).total_results:
-    _launch_set_atomic_group_job(plan, name)
-
-
-# Wait for the plan to be initialized
-while True:
-    if planner_rpc.run('get_plan', id=keyvals['plan_id'])['initialized']:
-        break
-    time.sleep(TICK_INTERVAL_SECS)
-
-
-# Execution engine main loop
-execution_engine.ExecutionEngine(plan_id=keyvals['plan_id']).start()
-
-
-# Cleanup
-afe_rest.labels.get(name=keyvals['label_name']).members[0].delete()
+engine = execution_engine.ExecutionEngine(plan_id=keyvals['plan_id'],
+                                          server=keyvals['server'],
+                                          label_name=keyvals['label_name'])
+engine.start()
diff --git a/frontend/planner/models.py b/frontend/planner/models.py
index 52ff1bc..f69aa28 100644
--- a/frontend/planner/models.py
+++ b/frontend/planner/models.py
@@ -3,7 +3,8 @@
 from autotest_lib.frontend.afe import models as afe_models
 from autotest_lib.frontend.afe import model_logic, rpc_utils
 from autotest_lib.frontend.tko import models as tko_models
-from autotest_lib.client.common_lib import enum, utils
+from autotest_lib.frontend.planner import model_attributes
+from autotest_lib.client.common_lib import utils
 
 
 class Plan(dbmodels.Model, model_logic.ModelExtensions):
@@ -75,12 +76,13 @@
         host: The AFE host
         complete: True if and only if this host is finished in the test plan
         blocked: True if and only if the host is blocked (not executing tests)
+        added_by_label: True if and only if the host was added because of a host
+                        label (as opposed to being explicitly added)
     """
     host = dbmodels.ForeignKey(afe_models.Host)
     complete = dbmodels.BooleanField(default=False)
     blocked = dbmodels.BooleanField(default=False)
-
-    Status = enum.Enum('Finished', 'Running', 'Blocked', string_values=True)
+    added_by_label = dbmodels.BooleanField(default=False)
 
     class Meta:
         db_table = 'planner_hosts'
@@ -88,30 +90,26 @@
 
     def status(self):
         if self.complete:
-            return Host.Status.FINISHED
+            return model_attributes.HostStatus.FINISHED
         if self.blocked:
-            return Host.Status.BLOCKED
-        return Host.Status.RUNNING
+            return model_attributes.HostStatus.BLOCKED
+        return model_attributes.HostStatus.RUNNING
 
 
     def _get_details_unicode(self):
         return 'Host: %s' % self.host.hostname
 
 
-    @classmethod
-    def smart_get(cls, id):
-        raise NotImplementedError('Planner hosts do not support smart_get()')
-
-
-class ControlFile(model_logic.ModelWithHash):
+class ControlFile(model_logic.ModelWithHash,
+                  model_logic.ModelExtensions):
     """A control file. Immutable once added to the table
 
     Required:
         contents: The text of the control file
 
     Others:
-        control_hash: The SHA1 hash of the control file, for duplicate detection
-                      and fast search
+        the_hash: The SHA1 hash of the control file, for duplicate detection
+                  and fast search
     """
     contents = dbmodels.TextField()
 
@@ -128,12 +126,13 @@
         return u'Control file id %s (SHA1: %s)' % (self.id, self.control_hash)
 
 
-class Test(ModelWithPlan):
+class TestConfig(ModelWithPlan, model_logic.ModelExtensions):
     """A planned test
 
     Required:
         alias: The name to give this test within the plan. Unique with plan id
         test_control_file: The control file to run
+        is_server: True if this control file is a server-side test
         execution_order: An integer describing when this test should be run in
                          the test plan
         estimated_runtime: Time in hours that the test is expected to run. Will
@@ -142,27 +141,28 @@
     """
     alias = dbmodels.CharField(max_length=255)
     control_file = dbmodels.ForeignKey(ControlFile)
+    is_server = dbmodels.BooleanField(default=True)
     execution_order = dbmodels.IntegerField(blank=True)
     estimated_runtime = dbmodels.IntegerField()
 
     class Meta:
-        db_table = 'planner_tests'
+        db_table = 'planner_test_configs'
         ordering = ('execution_order',)
         unique_together = (('plan', 'alias'),)
 
 
     def _get_details_unicode(self):
-        return 'Planned test - Control file id %s' % self.control_file.id
+        return 'Planned test config - Control file id %s' % self.control_file.id
 
 
-class Job(ModelWithPlan):
+class Job(ModelWithPlan, model_logic.ModelExtensions):
     """Represents an Autotest job initiated for a test plan
 
     Required:
-        test: The Test associated with this Job
+        test: The TestConfig associated with this Job
         afe_job: The Autotest job
     """
-    test = dbmodels.ForeignKey(Test)
+    test_config = dbmodels.ForeignKey(TestConfig)
     afe_job = dbmodels.ForeignKey(afe_models.Job)
 
     class Meta:
@@ -189,7 +189,7 @@
         return u'Bug external ID %s' % self.external_uid
 
 
-class TestRun(ModelWithPlan):
+class TestRun(ModelWithPlan, model_logic.ModelExtensions):
     """An individual test run from an Autotest job for the test plan.
 
     Each Job object may have multiple TestRun objects associated with it.
@@ -209,12 +209,13 @@
     Optional:
         bugs: Bugs filed that a relevant to this run
     """
-    Status = enum.Enum('Active', 'Passed', 'Failed', string_values=True)
-
     test_job = dbmodels.ForeignKey(Job)
     tko_test = dbmodels.ForeignKey(tko_models.Test)
     host = dbmodels.ForeignKey(Host)
-    status = dbmodels.CharField(max_length=16, choices=Status.choices())
+    status = dbmodels.CharField(
+            max_length=16,
+            choices=model_attributes.TestRunStatus.choices(),
+            default=model_attributes.TestRunStatus.ACTIVE)
     finalized = dbmodels.BooleanField(default=False)
     seen = dbmodels.BooleanField(default=False)
     triaged = dbmodels.BooleanField(default=False)
@@ -224,6 +225,7 @@
 
     class Meta:
         db_table = 'planner_test_runs'
+        unique_together = (('plan', 'test_job', 'tko_test', 'host'),)
 
 
     def _get_details_unicode(self):
@@ -294,12 +296,11 @@
         name: The name given to the object
         encoded_object: The actual object
     """
-    Type = enum.Enum('support', 'triage', 'autoprocess', 'custom_query',
-                     string_values=True)
-
     user = dbmodels.ForeignKey(afe_models.User)
-    object_type = dbmodels.CharField(max_length=16,
-                                     choices=Type.choices(), db_column='type')
+    object_type = dbmodels.CharField(
+            max_length=16,
+            choices=model_attributes.SavedObjectType.choices(),
+            db_column='type')
     name = dbmodels.CharField(max_length=255)
     encoded_object = dbmodels.TextField()
 
@@ -337,8 +338,8 @@
         value: The value
 
     Others:
-        keyval_hash: The result of SHA1(SHA1(key) ++ value), for duplicate
-                     detection and fast search.
+        the_hash: The result of SHA1(SHA1(key) ++ value), for duplicate
+                  detection and fast search.
     """
     key = dbmodels.CharField(max_length=1024)
     value = dbmodels.CharField(max_length=1024)
diff --git a/frontend/planner/rpc_interface.py b/frontend/planner/rpc_interface.py
index eaf4de9..219436f 100644
--- a/frontend/planner/rpc_interface.py
+++ b/frontend/planner/rpc_interface.py
@@ -11,6 +11,7 @@
 from autotest_lib.frontend import thread_local
 from autotest_lib.frontend.afe import model_logic, models as afe_models
 from autotest_lib.frontend.afe import rpc_utils as afe_rpc_utils
+from autotest_lib.frontend.tko import models as tko_models
 from autotest_lib.frontend.planner import models, rpc_utils
 from autotest_lib.client.common_lib import utils
 
@@ -26,6 +27,32 @@
     models.Plan.smart_get(id).update_object(data)
 
 
+def get_test_runs(**filter_data):
+    return afe_rpc_utils.prepare_for_serialization(
+            [test_run.get_object_dict() for test_run
+             in models.TestRun.objects.filter(**filter_data)])
+
+
+def modify_test_run(id, **data):
+    models.TestRun.objects.get(id=id).update_object(data)
+
+
+def modify_host(id, **data):
+    models.Host.objects.get(id=id).update_object(data)
+
+
+def get_test_config(id):
+    return afe_rpc_utils.prepare_rows_as_nested_dicts(
+            models.TestConfig.objects.filter(id=id), ('control_file',))[0]
+
+
+def add_job(plan_id, test_config_id, afe_job_id):
+    models.Job.objects.create(
+            plan=models.Plan.objects.get(id=plan_id),
+            test_config=models.TestConfig.objects.get(id=test_config_id),
+            afe_job=afe_models.Job.objects.get(id=afe_job_id))
+
+
 # more advanced calls
 
 def submit_plan(name, hosts, host_labels, tests,
@@ -37,7 +64,12 @@
     @param hosts: a list of hostnames
     @param host_labels: a list of host labels. The hosts under test will update
                         to reflect changes in the label
-    @param tests: a list of test control files to run
+    @param tests: an ordered list of dictionaries:
+                      alias: an alias for the test
+                      control_file: the test control file
+                      is_server: True if is a server-side control file
+                      estimated_runtime: estimated number of hours this test
+                                         will run
     @param support: the global support object
     @param label_override: label to prepend to all AFE jobs for this test plan.
                            Defaults to the plan name.
@@ -60,6 +92,21 @@
             raise model_logic.ValidationError(
                     {'host_labels': 'host label %s does not exist' % label})
 
+    aliases_seen = set()
+    test_required_fields = (
+            'alias', 'control_file', 'is_server', 'estimated_runtime')
+    for test in tests:
+        for field in test_required_fields:
+            if field not in test:
+                raise model_logic.ValidationError(
+                        {'tests': 'field %s is required' % field})
+
+        alias = test['alias']
+        if alias in aliases_seen:
+            raise model_logic.Validationerror(
+                    {'tests': 'alias %s occurs more than once' % alias})
+        aliases_seen.add(alias)
+
     plan, created = models.Plan.objects.get_or_create(name=name)
     if not created:
         raise model_logic.ValidationError(
@@ -67,25 +114,36 @@
 
     try:
         label = rpc_utils.create_plan_label(plan)
+        try:
+            for i, test in enumerate(tests):
+                control, _ = models.ControlFile.objects.get_or_create(
+                        contents=test['control_file'])
+                models.TestConfig.objects.create(
+                        plan=plan, alias=test['alias'], control_file=control,
+                        is_server=test['is_server'], execution_order=i,
+                        estimated_runtime=test['estimated_runtime'])
+
+            plan.label_override = label_override
+            plan.support = support or ''
+            plan.save()
+
+            plan.owners.add(afe_models.User.current_user())
+
+            for host in host_objects:
+                planner_host = models.Host.objects.create(plan=plan, host=host)
+
+            plan.host_labels.add(*label_objects)
+
+            rpc_utils.start_plan(plan, label)
+
+            return plan.id
+        except:
+            label.delete()
+            raise
     except:
         plan.delete()
         raise
 
-    plan.label_override = label_override
-    plan.support = support or ''
-    plan.save()
-
-    plan.owners.add(afe_models.User.current_user())
-
-    for host in host_objects:
-        planner_host = models.Host.objects.create(plan=plan, host=host)
-
-    plan.host_labels.add(*label_objects)
-
-    rpc_utils.start_plan(plan, label)
-
-    return plan.id
-
 
 def get_hosts(plan_id):
     """
@@ -108,3 +166,72 @@
     """
     return rpc_utils.lazy_load(os.path.join(os.path.dirname(__file__),
                                             'set_atomic_group_control.srv'))
+
+
+def get_next_test_configs(plan_id):
+    """
+    Gets information about the next planner test configs that need to be run
+
+    @param plan_id: the ID or name of the test plan
+    @return a dictionary:
+                complete: True or False, shows test plan completion
+                next_configs: a list of dictionaries:
+                    host: ID of the host
+                    next_test_config_id: ID of the next Planner test to run
+    """
+    plan = models.Plan.smart_get(plan_id)
+
+    result = {'next_configs': []}
+
+    rpc_utils.update_hosts_table(plan)
+    for host in models.Host.objects.filter(plan=plan):
+        next_test_config_id = rpc_utils.compute_next_test_config(plan, host)
+        if next_test_config_id:
+            config = {'next_test_config_id': next_test_config_id,
+                      'host': host.host.hostname}
+            result['next_configs'].append(config)
+
+    rpc_utils.check_for_completion(plan)
+    result['complete'] = plan.complete
+
+    return result
+
+
+def update_test_runs(plan_id):
+    """
+    Add all applicable TKO jobs to the Planner DB tables
+
+    Looks for tests in the TKO tables that were started as a part of the test
+    plan, and add them to the Planner tables.
+
+    Also updates the status of the test run if the underlying TKO test move from
+    an active status to a completed status.
+
+    @return a list of dictionaries:
+                status: the status of the new (or updated) test run
+                tko_test_idx: the ID of the TKO test added
+                hostname: the host added
+    """
+    plan = models.Plan.objects.get(id=plan_id)
+    updated = []
+
+    for planner_job in plan.job_set.all():
+        known_statuses = dict((test_run.tko_test.test_idx, test_run.status)
+                              for test_run in planner_job.testrun_set.all())
+        tko_tests_for_job = tko_models.Test.objects.filter(
+                job__afe_job_id=planner_job.afe_job.id)
+
+        for tko_test in tko_tests_for_job:
+            status = rpc_utils.compute_test_run_status(tko_test.status.word)
+            needs_update = (tko_test.test_idx not in known_statuses or
+                            status != known_statuses[tko_test.test_idx])
+            if needs_update:
+                hostnames = tko_test.machine.hostname.split(',')
+                for hostname in hostnames:
+                    rpc_utils.add_test_run(
+                            plan, planner_job, tko_test, hostname, status)
+                    updated.append({'status': status,
+                                    'tko_test_idx': tko_test.test_idx,
+                                    'hostname': hostname})
+
+    return updated
diff --git a/frontend/planner/rpc_interface_unittest.py b/frontend/planner/rpc_interface_unittest.py
index 0f1c71b..081e433 100644
--- a/frontend/planner/rpc_interface_unittest.py
+++ b/frontend/planner/rpc_interface_unittest.py
@@ -3,10 +3,11 @@
 import unittest
 import common
 from autotest_lib.frontend import setup_django_environment
-from autotest_lib.frontend.planner import planner_test_utils
+from autotest_lib.frontend.planner import planner_test_utils, model_attributes
+from autotest_lib.frontend.planner import rpc_interface, models, rpc_utils
 from autotest_lib.frontend.afe import model_logic
 from autotest_lib.frontend.afe import models as afe_models
-from autotest_lib.frontend.planner import rpc_interface, models, rpc_utils
+from autotest_lib.frontend.tko import models as tko_models
 
 
 class RpcInterfaceTest(unittest.TestCase,
@@ -72,5 +73,110 @@
         self.assertEqual(set(('host1', 'host2')), set(hosts))
 
 
+    def test_get_next_test_configs(self):
+        DUMMY_CONFIGS = {'host1': object(),
+                         'host2': object()}
+        DUMMY_COMPLETE = object()
+        self.god.stub_function(rpc_utils, 'compute_next_test_config')
+
+        for host in models.Host.objects.filter(plan=self._plan):
+            rpc_utils.compute_next_test_config.expect_call(
+                    self._plan, host).and_return(
+                    DUMMY_CONFIGS[host.host.hostname])
+
+        def _dummy_check_for_completion(plan):
+            plan.complete = DUMMY_COMPLETE
+        rpc_utils.check_for_completion = _dummy_check_for_completion
+
+        result = rpc_interface.get_next_test_configs(self._plan.id)
+
+        self.god.check_playback()
+        self.assertEqual(result['complete'], DUMMY_COMPLETE)
+        for config in result['next_configs']:
+            self.assertTrue(config['host'] in DUMMY_CONFIGS)
+            self.assertEqual(config['next_test_config_id'],
+                             DUMMY_CONFIGS[config['host']])
+
+
+    def test_update_test_runs(self):
+        GOOD_STATUS_WORD = 'GOOD'
+        RUNNING_STATUS_WORD = 'RUNNING'
+        hostname = self.hosts[0].hostname
+
+        self.god.stub_function(rpc_utils, 'compute_test_run_status')
+        self.god.stub_function(rpc_utils, 'add_test_run')
+
+        control, _ = models.ControlFile.objects.get_or_create(
+                contents='test_control')
+        test_config = models.TestConfig.objects.create(plan=self._plan,
+                                                       alias='config',
+                                                       control_file=control,
+                                                       execution_order=1,
+                                                       estimated_runtime=1)
+        afe_job = self._create_job(hosts=(1,))
+        planner_host = models.Host.objects.create(plan=self._plan,
+                                                  host=self.hosts[0])
+        planner_job = models.Job.objects.create(plan=self._plan,
+                                                test_config=test_config,
+                                                afe_job=afe_job)
+        tko_machine = tko_models.Machine.objects.create(hostname=hostname)
+        tko_job = tko_models.Job.objects.create(tag='job',
+                                                machine=tko_machine,
+                                                afe_job_id=afe_job.id)
+        tko_kernel = tko_models.Kernel.objects.create()
+        running_status = tko_models.Status.objects.create(
+                word=RUNNING_STATUS_WORD)
+        good_status = tko_models.Status.objects.create(word=GOOD_STATUS_WORD)
+
+        # No TKO tests
+        self.assertEqual([], rpc_interface.update_test_runs(self._plan.id))
+        self.god.check_playback()
+
+        # active TKO test
+        tko_test = tko_models.Test.objects.create(job=tko_job,
+                                                  machine=tko_machine,
+                                                  kernel=tko_kernel,
+                                                  status=running_status)
+
+        rpc_utils.compute_test_run_status.expect_call(
+                RUNNING_STATUS_WORD).and_return(
+                model_attributes.TestRunStatus.ACTIVE)
+        rpc_utils.add_test_run.expect_call(
+                self._plan, planner_job, tko_test, hostname,
+                model_attributes.TestRunStatus.ACTIVE)
+        self.assertEqual(rpc_interface.update_test_runs(self._plan.id),
+                         [{'status': model_attributes.TestRunStatus.ACTIVE,
+                           'tko_test_idx': tko_test.test_idx,
+                           'hostname': hostname}])
+        self.god.check_playback()
+        test_run = models.TestRun.objects.create(
+                plan=self._plan, test_job=planner_job,
+                tko_test=tko_test, host=planner_host,
+                status=model_attributes.TestRunStatus.ACTIVE)
+
+        # no change to TKO test
+        rpc_utils.compute_test_run_status.expect_call(
+                RUNNING_STATUS_WORD).and_return(
+                model_attributes.TestRunStatus.ACTIVE)
+        self.assertEqual([], rpc_interface.update_test_runs(self._plan.id))
+        self.god.check_playback()
+
+        # TKO test is now complete, passed
+        tko_test.status = good_status
+        tko_test.save()
+
+        rpc_utils.compute_test_run_status.expect_call(
+                GOOD_STATUS_WORD).and_return(
+                model_attributes.TestRunStatus.PASSED)
+        rpc_utils.add_test_run.expect_call(
+                self._plan, planner_job, tko_test, hostname,
+                model_attributes.TestRunStatus.PASSED)
+        self.assertEqual(rpc_interface.update_test_runs(self._plan.id),
+                         [{'status': model_attributes.TestRunStatus.PASSED,
+                           'tko_test_idx': tko_test.test_idx,
+                           'hostname': hostname}])
+        self.god.check_playback()
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/frontend/planner/rpc_utils.py b/frontend/planner/rpc_utils.py
index 8d359bd..79a6dcc 100644
--- a/frontend/planner/rpc_utils.py
+++ b/frontend/planner/rpc_utils.py
@@ -1,8 +1,7 @@
 import common
 import os
 from autotest_lib.frontend.afe import models as afe_models, model_logic
-from autotest_lib.frontend.planner import models
-from autotest_lib.frontend.shared import rest_client
+from autotest_lib.frontend.planner import models, model_attributes
 from autotest_lib.client.common_lib import global_config, utils
 
 
@@ -36,23 +35,22 @@
     """
     Takes the necessary steps to start a test plan in Autotest
     """
-    afe_rest = rest_client.Resource.load(
-            'http://%s/afe/server/resources' % SERVER)
-
     keyvals = {'server': SERVER,
                'plan_id': plan.id,
                'label_name': label.name}
-
-    info = afe_rest.execution_info.get().execution_info
-    info['control_file'] = _get_execution_engine_control()
-    info['machines_per_execution'] = None
-
-    job_req = {'name': plan.name + '_execution_engine',
-               'execution_info': info,
-               'queue_entries': (),
+    options = {'name': plan.name + '_execution_engine',
+               'priority': afe_models.Job.Priority.MEDIUM,
+               'control_file': _get_execution_engine_control(),
+               'control_type': afe_models.Job.ControlType.SERVER,
+               'synch_count': None,
+               'run_verify': False,
+               'reboot_before': False,
+               'reboot_after': False,
+               'dependencies': (),
                'keyvals': keyvals}
-
-    afe_rest.jobs.post(job_req)
+    job = afe_models.Job.create(owner=afe_models.User.current_user().login,
+                                options=options, hosts=())
+    job.queue(hosts=())
 
 
 def _get_execution_engine_control():
@@ -71,3 +69,93 @@
         LAZY_LOADED_FILES[path] = utils.read_file(path)
 
     return LAZY_LOADED_FILES[path]
+
+
+def update_hosts_table(plan):
+    """
+    Resolves the host labels into host objects
+
+    Adds or removes hosts from the planner Hosts model based on changes to the
+    host label
+    """
+    label_hosts = set()
+
+    for label in plan.host_labels.all():
+        for afe_host in label.host_set.all():
+            host, created = models.Host.objects.get_or_create(plan=plan,
+                                                              host=afe_host)
+            if created:
+                host.added_by_label = True
+                host.save()
+
+            label_hosts.add(host.host.id)
+
+    deleted_hosts = models.Host.objects.filter(
+            plan=plan, added_by_label=True).exclude(host__id__in=label_hosts)
+    deleted_hosts.delete()
+
+
+def compute_next_test_config(plan, host):
+    """
+    Gets the next test config that should be run for this plan and host
+
+    Returns None if the host is already running a job. Also sets the host's
+    complete bit if the host is finished running tests.
+    """
+    if host.blocked:
+        return None
+
+    test_configs = plan.testconfig_set.order_by('execution_order')
+    for test_config in test_configs:
+        afe_jobs = plan.job_set.filter(test_config=test_config)
+        afe_job_ids = afe_jobs.values_list('afe_job', flat=True)
+        hqes = afe_models.HostQueueEntry.objects.filter(job__id__in=afe_job_ids,
+                                                        host=host.host)
+        if not hqes:
+            return test_config.id
+        for hqe in hqes:
+            if not hqe.complete:
+                # HostQueueEntry still active for this host,
+                # should not run another test
+                return None
+
+    # All HQEs related to this host are complete
+    host.complete = True
+    host.save()
+    return None
+
+
+def check_for_completion(plan):
+    """
+    Checks if a plan is actually complete. Sets complete=True if so
+    """
+    if not models.Host.objects.filter(plan=plan, complete=False):
+        plan.complete = True
+        plan.save()
+
+
+def compute_test_run_status(status):
+    """
+    Converts a TKO test status to a Planner test run status
+    """
+    Status = model_attributes.TestRunStatus
+    if status == 'GOOD':
+        return Status.PASSED
+    if status == 'RUNNING':
+        return Status.ACTIVE
+    return Status.FAILED
+
+
+def add_test_run(plan, planner_job, tko_test, hostname, status):
+    """
+    Adds a TKO test to the Planner Test Run tables
+    """
+    host = afe_models.Host.objects.get(hostname=hostname)
+
+    planner_host = models.Host.objects.get(plan=plan, host=host)
+    test_run, _ = models.TestRun.objects.get_or_create(plan=plan,
+                                                       test_job=planner_job,
+                                                       tko_test=tko_test,
+                                                       host=planner_host)
+    test_run.status = status
+    test_run.save()
diff --git a/frontend/planner/views.py b/frontend/planner/views.py
index 541e707..d2d4837 100644
--- a/frontend/planner/views.py
+++ b/frontend/planner/views.py
@@ -15,3 +15,10 @@
 
 def rpc_documentation(request):
     return rpc_handler_obj.get_rpc_documentation()
+
+
+def model_documentation(request):
+    model_names = ('Plan', 'Host', 'ControlFile', 'TestConfig', 'Job', 'Bug',
+                   'TestRun', 'DataType', 'History', 'SavedObject', 'KeyVal',
+                   'AutoProcess')
+    return views_common.model_documentation(models, model_names)