[autotest] Remove site_rpc_interface

BUG=chromium:672727
TEST=Run unittests

Change-Id: I8535b121ba7f2317cccec981bf2a8a36ca2639cf
Reviewed-on: https://chromium-review.googlesource.com/435850
Commit-Ready: Allen Li <ayatane@chromium.org>
Tested-by: Allen Li <ayatane@chromium.org>
Reviewed-by: Prathmesh Prabhu <pprabhu@chromium.org>
Reviewed-by: Allen Li <ayatane@chromium.org>
diff --git a/frontend/afe/control_file.py b/frontend/afe/control_file.py
index 8971a88..ff0e12a 100644
--- a/frontend/afe/control_file.py
+++ b/frontend/afe/control_file.py
@@ -7,8 +7,11 @@
 import re, os
 
 import common
+from autotest_lib.client.common_lib import error
+from autotest_lib.client.common_lib.cros import dev_server
 from autotest_lib.frontend.afe import model_logic
-from autotest_lib.frontend.afe import site_rpc_interface
+from autotest_lib.server.cros.dynamic_suite import control_file_getter
+from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
 import frontend.settings
 
 AUTOTEST_DIR = os.path.abspath(os.path.join(
@@ -85,7 +88,7 @@
     if not append:
         append = []
     if test_source_build:
-        raw_control_files = site_rpc_interface.get_test_control_files_by_build(
+        raw_control_files = _get_test_control_files_by_build(
                 tests, test_source_build)
     else:
         raw_control_files = [_read_control_file(test) for test in tests]
@@ -196,3 +199,58 @@
                                            client_control_file,
                                            test_source_build)
     return control_file_text
+
+
+def _get_test_control_files_by_build(tests, build, ignore_invalid_tests=False):
+    """Get the test control files that are available for the specified build.
+
+    @param tests A sequence of test objects to run.
+    @param build: unique name by which to refer to the image.
+    @param ignore_invalid_tests: flag on if unparsable tests are ignored.
+
+    @return: A sorted list of all tests that are in the build specified.
+    """
+    raw_control_files = []
+    # shortcut to avoid staging the image.
+    if not tests:
+        return raw_control_files
+
+    cfile_getter = _initialize_control_file_getter(build)
+    if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
+        control_file_info_list = cfile_getter.get_suite_info()
+
+    for test in tests:
+        # Read and parse the control file
+        if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
+            control_file = control_file_info_list[test.path]
+        else:
+            control_file = cfile_getter.get_control_file_contents(
+                    test.path)
+        raw_control_files.append(control_file)
+    return raw_control_files
+
+
+def _initialize_control_file_getter(build):
+    """Get the remote control file getter.
+
+    @param build: unique name by which to refer to a remote build image.
+
+    @return: A control file getter object.
+    """
+    # Stage the test artifacts.
+    try:
+        ds = dev_server.ImageServer.resolve(build)
+        ds_name = ds.hostname
+        build = ds.translate(build)
+    except dev_server.DevServerException as e:
+        raise ValueError('Could not resolve build %s: %s' %
+                         (build, e))
+
+    try:
+        ds.stage_artifacts(image=build, artifacts=['test_suites'])
+    except dev_server.DevServerException as e:
+        raise error.StageControlFileFailure(
+                'Failed to stage %s on %s: %s' % (build, ds_name, e))
+
+    # Collect the control files specified in this build
+    return control_file_getter.DevServerGetter.create(build, ds)
diff --git a/frontend/afe/direct_afe.py b/frontend/afe/direct_afe.py
index 568e7f3..aecae61 100644
--- a/frontend/afe/direct_afe.py
+++ b/frontend/afe/direct_afe.py
@@ -5,14 +5,13 @@
 
 import common
 import autotest_lib.server.frontend as frontend
-from autotest_lib.frontend.afe import site_rpc_interface
 from autotest_lib.frontend.afe import rpc_interface
 
 class directAFE(frontend.AFE):
     """
     A wrapper for frontend.AFE which exposes all of the AFE
-    functionality, but makes direct calls to site_rpc_interface and
-    rpc_interface rather than making RPC calls to an RPC server.
+    functionality, but makes direct calls to rpc_interface rather than
+    making RPC calls to an RPC server.
     """
     def run(self, call, **dargs):
         func = None
@@ -22,13 +21,7 @@
         except AttributeError:
             pass
 
-        try:
-            func = site_rpc_interface.__getattribute__(call)
-        except AttributeError:
-            pass
-
         if not func:
-            raise AttributeError('No function named %s in either '
-                                 'rpc_interface or site_rpc_interface' % call)
+            raise AttributeError('No function %s in rpc_interface' % call)
 
-        return func(**dargs)
\ No newline at end of file
+        return func(**dargs)
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index 6b4c8e9..8f5ae7b 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -34,30 +34,49 @@
 import ast
 import datetime
 import logging
+import os
 import sys
 
 from django.db.models import Count
+
 import common
-from autotest_lib.client.common_lib import control_data
-from autotest_lib.client.common_lib import priorities
 # TODO(akeshet): Replace with monarch stats once we know how to instrument rpc
 # server with ts_mon.
 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
+from autotest_lib.client.common_lib import control_data
+from autotest_lib.client.common_lib import error
+from autotest_lib.client.common_lib import global_config
+from autotest_lib.client.common_lib import priorities
+from autotest_lib.client.common_lib import time_utils
+from autotest_lib.client.common_lib.cros import dev_server
 from autotest_lib.frontend.afe import control_file as control_file_lib
+from autotest_lib.frontend.afe import model_attributes
+from autotest_lib.frontend.afe import model_logic
+from autotest_lib.frontend.afe import models
 from autotest_lib.frontend.afe import rpc_utils
-from autotest_lib.frontend.afe import models, model_logic, model_attributes
-from autotest_lib.frontend.afe import site_rpc_interface
 from autotest_lib.frontend.tko import models as tko_models
 from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
 from autotest_lib.server import frontend
 from autotest_lib.server import utils
 from autotest_lib.server.cros import provision
+from autotest_lib.server.cros.dynamic_suite import constants
+from autotest_lib.server.cros.dynamic_suite import control_file_getter
+from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
 from autotest_lib.server.cros.dynamic_suite import tools
+from autotest_lib.server.cros.dynamic_suite.suite import Suite
 from autotest_lib.server.lib import status_history
+from autotest_lib.site_utils import host_history
+from autotest_lib.site_utils import job_history
+from autotest_lib.site_utils import server_manager_utils
+from autotest_lib.site_utils import stable_version_utils
 
 
 _timer = autotest_stats.Timer('rpc_interface')
 
+_CONFIG = global_config.global_config
+
+# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
+
 def get_parameterized_autoupdate_image_url(job):
     """Get the parameterized autoupdate image url from a parameterized job."""
     known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
@@ -628,8 +647,8 @@
 def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
     """Gets the counts of all passed and failed tests from the matching jobs.
 
-    @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g.,
-            'butterfly-release/R40-6457.21.0/bvt-cq/'.
+    @param job_name_prefix: Name prefix of the jobs to get the summary
+           from, e.g., 'butterfly-release/R40-6457.21.0/bvt-cq/'.
     @param label_name: Label that must be set in the jobs, e.g.,
             'cros-version:butterfly-release/R40-6457.21.0'.
 
@@ -973,7 +992,7 @@
             builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
         if firmware_ro_build:
             builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
-        return site_rpc_interface.create_suite_job(
+        return create_suite_job(
                 name=name, control_file=control_file, priority=priority,
                 builds=builds, test_source_build=test_source_build,
                 is_cloning=is_cloning, **kwargs)
@@ -1688,7 +1707,6 @@
             informative description.
     """
 
-    job_fields = models.Job.get_field_dict()
     default_drone_set_name = models.DroneSet.default_drone_set_name()
     drone_sets = ([default_drone_set_name] +
                   sorted(drone_set.name for drone_set in
@@ -1697,7 +1715,6 @@
 
     result = {}
     result['priorities'] = priorities.Priority.choices()
-    default_priority = priorities.Priority.DEFAULT
     result['default_priority'] = 'Default'
     result['max_schedulable_priority'] = priorities.Priority.DEFAULT
     result['users'] = get_users(sort_by=['login'])
@@ -1770,3 +1787,683 @@
     hosts = models.HostAttribute.query_objects({'attribute': attribute,
                                                 'value': value})
     return [row.host.hostname for row in hosts if row.host.invalid == 0]
+
+
+def canonicalize_suite_name(suite_name):
+    """Canonicalize the suite's name.
+
+    @param suite_name: the name of the suite.
+    """
+    # Do not change this naming convention without updating
+    # site_utils.parse_job_name.
+    return 'test_suites/control.%s' % suite_name
+
+
+def formatted_now():
+    """Format the current datetime."""
+    return datetime.datetime.now().strftime(time_utils.TIME_FMT)
+
+
+def _get_control_file_by_build(build, ds, suite_name):
+    """Return control file contents for |suite_name|.
+
+    Query the dev server at |ds| for the control file |suite_name|, included
+    in |build| for |board|.
+
+    @param build: unique name by which to refer to the image from now on.
+    @param ds: a dev_server.DevServer instance to fetch control file with.
+    @param suite_name: canonicalized suite name, e.g. test_suites/control.bvt.
+    @raises ControlFileNotFound if a unique suite control file doesn't exist.
+    @raises NoControlFileList if we can't list the control files at all.
+    @raises ControlFileEmpty if the control file exists on the server, but
+                             can't be read.
+
+    @return the contents of the desired control file.
+    """
+    getter = control_file_getter.DevServerGetter.create(build, ds)
+    devserver_name = ds.hostname
+    timer = autotest_stats.Timer('control_files.parse.%s.%s' %
+                                 (devserver_name.replace('.', '_'),
+                                  suite_name.rsplit('.')[-1]))
+    # Get the control file for the suite.
+    try:
+        with timer:
+            control_file_in = getter.get_control_file_contents_by_name(
+                    suite_name)
+    except error.CrosDynamicSuiteException as e:
+        raise type(e)('Failed to get control file for %s '
+                      '(devserver: %s) (error: %s)' %
+                      (build, devserver_name, e))
+    if not control_file_in:
+        raise error.ControlFileEmpty(
+            "Fetching %s returned no data. (devserver: %s)" %
+            (suite_name, devserver_name))
+    # Force control files to only contain ascii characters.
+    try:
+        control_file_in.encode('ascii')
+    except UnicodeDecodeError as e:
+        raise error.ControlFileMalformed(str(e))
+
+    return control_file_in
+
+
+def _get_control_file_by_suite(suite_name):
+    """Get control file contents by suite name.
+
+    @param suite_name: Suite name as string.
+    @returns: Control file contents as string.
+    """
+    getter = control_file_getter.FileSystemGetter(
+        [_CONFIG.get_config_value('SCHEDULER',
+                                  'drone_installation_directory')])
+    return getter.get_control_file_contents_by_name(suite_name)
+
+
+def _stage_build_artifacts(build, hostname=None):
+    """
+    Ensure components of |build| necessary for installing images are staged.
+
+    @param build image we want to stage.
+    @param hostname hostname of a dut may run test on. This is to help to locate
+        a devserver closer to duts if needed. Default is None.
+
+    @raises StageControlFileFailure: if the dev server throws 500 while staging
+        suite control files.
+
+    @return: dev_server.ImageServer instance to use with this build.
+    @return: timings dictionary containing staging start/end times.
+    """
+    timings = {}
+    # Ensure components of |build| necessary for installing images are staged
+    # on the dev server. However set synchronous to False to allow other
+    # components to be downloaded in the background.
+    ds = dev_server.resolve(build, hostname=hostname)
+    ds_name = ds.hostname
+    timings[constants.DOWNLOAD_STARTED_TIME] = formatted_now()
+    timer = autotest_stats.Timer('control_files.stage.%s' % (
+            ds_name.replace('.', '_')))
+    try:
+        with timer:
+            ds.stage_artifacts(image=build, artifacts=['test_suites'])
+    except dev_server.DevServerException as e:
+        raise error.StageControlFileFailure(
+                "Failed to stage %s on %s: %s" % (build, ds_name, e))
+    timings[constants.PAYLOAD_FINISHED_TIME] = formatted_now()
+    return (ds, timings)
+
+
+@rpc_utils.route_rpc_to_master
+def create_suite_job(
+        name='',
+        board='',
+        pool='',
+        control_file='',
+        check_hosts=True,
+        num=None,
+        file_bugs=False,
+        timeout=24,
+        timeout_mins=None,
+        priority=priorities.Priority.DEFAULT,
+        suite_args=None,
+        wait_for_results=True,
+        job_retry=False,
+        max_retries=None,
+        max_runtime_mins=None,
+        suite_min_duts=0,
+        offload_failures_only=False,
+        builds=None,
+        test_source_build=None,
+        run_prod_code=False,
+        delay_minutes=0,
+        is_cloning=False,
+        **kwargs
+):
+    """
+    Create a job to run a test suite on the given device with the given image.
+
+    When the timeout specified in the control file is reached, the
+    job is guaranteed to have completed and results will be available.
+
+    @param name: The test name if control_file is supplied, otherwise the name
+                 of the test suite to run, e.g. 'bvt'.
+    @param board: the kind of device to run the tests on.
+    @param builds: the builds to install e.g.
+                   {'cros-version:': 'x86-alex-release/R18-1655.0.0',
+                    'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
+                    'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
+                   If builds is given a value, it overrides argument build.
+    @param test_source_build: Build that contains the server-side test code.
+    @param pool: Specify the pool of machines to use for scheduling
+            purposes.
+    @param control_file: the control file of the job.
+    @param check_hosts: require appropriate live hosts to exist in the lab.
+    @param num: Specify the number of machines to schedule across (integer).
+                Leave unspecified or use None to use default sharding factor.
+    @param file_bugs: File a bug on each test failure in this suite.
+    @param timeout: The max lifetime of this suite, in hours.
+    @param timeout_mins: The max lifetime of this suite, in minutes. Takes
+                         priority over timeout.
+    @param priority: Integer denoting priority. Higher is more important.
+    @param suite_args: Optional arguments which will be parsed by the suite
+                       control file. Used by control.test_that_wrapper to
+                       determine which tests to run.
+    @param wait_for_results: Set to False to run the suite job without waiting
+                             for test jobs to finish. Default is True.
+    @param job_retry: Set to True to enable job-level retry. Default is False.
+    @param max_retries: Integer, maximum job retries allowed at suite level.
+                        None for no max.
+    @param max_runtime_mins: Maximum amount of time a job can be running in
+                             minutes.
+    @param suite_min_duts: Integer. Scheduler will prioritize getting the
+                           minimum number of machines for the suite when it is
+                           competing with another suite that has a higher
+                           priority but already got minimum machines it needs.
+    @param offload_failures_only: Only enable gs_offloading for failed jobs.
+    @param run_prod_code: If True, the suite will run the test code that
+                          lives in prod aka the test code currently on the
+                          lab servers. If False, the control files and test
+                          code for this suite run will be retrieved from the
+                          build artifacts.
+    @param delay_minutes: Delay the creation of test jobs for a given number of
+                          minutes.
+    @param is_cloning: True if creating a cloning job.
+    @param kwargs: extra keyword args. NOT USED.
+
+    @raises ControlFileNotFound: if a unique suite control file doesn't exist.
+    @raises NoControlFileList: if we can't list the control files at all.
+    @raises StageControlFileFailure: If the dev server throws 500 while
+                                     staging test_suites.
+    @raises ControlFileEmpty: if the control file exists on the server, but
+                              can't be read.
+
+    @return: the job ID of the suite; -1 on error.
+    """
+    if type(num) is not int and num is not None:
+        raise error.SuiteArgumentException('Ill specified num argument %r. '
+                                           'Must be an integer or None.' % num)
+    if num == 0:
+        logging.warning("Can't run on 0 hosts; using default.")
+        num = None
+
+    if builds is None:
+        builds = {}
+
+    # Default test source build to CrOS build if it's not specified and
+    # run_prod_code is set to False.
+    if not run_prod_code:
+        test_source_build = Suite.get_test_source_build(
+                builds, test_source_build=test_source_build)
+
+    sample_dut = rpc_utils.get_sample_dut(board, pool)
+
+    suite_name = canonicalize_suite_name(name)
+    if run_prod_code:
+        ds = dev_server.resolve(test_source_build, hostname=sample_dut)
+        keyvals = {}
+    else:
+        (ds, keyvals) = _stage_build_artifacts(
+                test_source_build, hostname=sample_dut)
+    keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
+
+    # Do not change this naming convention without updating
+    # site_utils.parse_job_name.
+    if run_prod_code:
+        # If run_prod_code is True, test_source_build is not set, use the
+        # first build in the builds list for the sutie job name.
+        name = '%s-%s' % (builds.values()[0], suite_name)
+    else:
+        name = '%s-%s' % (test_source_build, suite_name)
+
+    timeout_mins = timeout_mins or timeout * 60
+    max_runtime_mins = max_runtime_mins or timeout * 60
+
+    if not board:
+        board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
+
+    if run_prod_code:
+        control_file = _get_control_file_by_suite(suite_name)
+
+    if not control_file:
+        # No control file was supplied so look it up from the build artifacts.
+        control_file = _get_control_file_by_build(
+                test_source_build, ds, suite_name)
+
+    # Prepend builds and board to the control file.
+    if is_cloning:
+        control_file = tools.remove_injection(control_file)
+
+    inject_dict = {
+        'board': board,
+        # `build` is needed for suites like AU to stage image inside suite
+        # control file.
+        'build': test_source_build,
+        'builds': builds,
+        'check_hosts': check_hosts,
+        'pool': pool,
+        'num': num,
+        'file_bugs': file_bugs,
+        'timeout': timeout,
+        'timeout_mins': timeout_mins,
+        'devserver_url': ds.url(),
+        'priority': priority,
+        'suite_args' : suite_args,
+        'wait_for_results': wait_for_results,
+        'job_retry': job_retry,
+        'max_retries': max_retries,
+        'max_runtime_mins': max_runtime_mins,
+        'offload_failures_only': offload_failures_only,
+        'test_source_build': test_source_build,
+        'run_prod_code': run_prod_code,
+        'delay_minutes': delay_minutes,
+    }
+    control_file = tools.inject_vars(inject_dict, control_file)
+
+    return rpc_utils.create_job_common(name,
+                                       priority=priority,
+                                       timeout_mins=timeout_mins,
+                                       max_runtime_mins=max_runtime_mins,
+                                       control_type='Server',
+                                       control_file=control_file,
+                                       hostless=True,
+                                       keyvals=keyvals)
+
+
+def get_job_history(**filter_data):
+    """Get history of the job, including the special tasks executed for the job
+
+    @param filter_data: filter for the call, should at least include
+                        {'job_id': [job id]}
+    @returns: JSON string of the job's history, including the information such
+              as the hosts run the job and the special tasks executed before
+              and after the job.
+    """
+    job_id = filter_data['job_id']
+    job_info = job_history.get_job_info(job_id)
+    return rpc_utils.prepare_for_serialization(job_info.get_history())
+
+
+def get_host_history(start_time, end_time, hosts=None, board=None, pool=None):
+    """Get history of a list of host.
+
+    The return is a JSON string of host history for each host, for example,
+    {'172.22.33.51': [{'status': 'Resetting'
+                       'start_time': '2014-08-07 10:02:16',
+                       'end_time': '2014-08-07 10:03:16',
+                       'log_url': 'http://autotest/reset-546546/debug',
+                       'dbg_str': 'Task: Special Task 19441991 (host ...)'},
+                       {'status': 'Running'
+                       'start_time': '2014-08-07 10:03:18',
+                       'end_time': '2014-08-07 10:13:00',
+                       'log_url': 'http://autotest/reset-546546/debug',
+                       'dbg_str': 'HQE: 15305005, for job: 14995562'}
+                     ]
+    }
+    @param start_time: start time to search for history, can be string value or
+                       epoch time.
+    @param end_time: end time to search for history, can be string value or
+                     epoch time.
+    @param hosts: A list of hosts to search for history. Default is None.
+    @param board: board type of hosts. Default is None.
+    @param pool: pool type of hosts. Default is None.
+    @returns: JSON string of the host history.
+    """
+    return rpc_utils.prepare_for_serialization(
+            host_history.get_history_details(
+                    start_time=start_time, end_time=end_time,
+                    hosts=hosts, board=board, pool=pool,
+                    process_pool_size=4))
+
+
+def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
+                    known_host_ids=(), known_host_statuses=()):
+    """Receive updates for job statuses from shards and assign hosts and jobs.
+
+    @param shard_hostname: Hostname of the calling shard
+    @param jobs: Jobs in serialized form that should be updated with newer
+                 status from a shard.
+    @param hqes: Hostqueueentries in serialized form that should be updated with
+                 newer status from a shard. Note that for every hostqueueentry
+                 the corresponding job must be in jobs.
+    @param known_job_ids: List of ids of jobs the shard already has.
+    @param known_host_ids: List of ids of hosts the shard already has.
+    @param known_host_statuses: List of statuses of hosts the shard already has.
+
+    @returns: Serialized representations of hosts, jobs, suite job keyvals
+              and their dependencies to be inserted into a shard's database.
+    """
+    # The following alternatives to sending host and job ids in every heartbeat
+    # have been considered:
+    # 1. Sending the highest known job and host ids. This would work for jobs:
+    #    Newer jobs always have larger ids. Also, if a job is not assigned to a
+    #    particular shard during a heartbeat, it never will be assigned to this
+    #    shard later.
+    #    This is not true for hosts though: A host that is leased won't be sent
+    #    to the shard now, but might be sent in a future heartbeat. This means
+    #    sometimes hosts should be transfered that have a lower id than the
+    #    maximum host id the shard knows.
+    # 2. Send the number of jobs/hosts the shard knows to the master in each
+    #    heartbeat. Compare these to the number of records that already have
+    #    the shard_id set to this shard. In the normal case, they should match.
+    #    In case they don't, resend all entities of that type.
+    #    This would work well for hosts, because there aren't that many.
+    #    Resending all jobs is quite a big overhead though.
+    #    Also, this approach might run into edge cases when entities are
+    #    ever deleted.
+    # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
+    #    Using two different approaches isn't consistent and might cause
+    #    confusion. Also the issues with the case of deletions might still
+    #    occur.
+    #
+    # The overhead of sending all job and host ids in every heartbeat is low:
+    # At peaks one board has about 1200 created but unfinished jobs.
+    # See the numbers here: http://goo.gl/gQCGWH
+    # Assuming that job id's have 6 digits and that json serialization takes a
+    # comma and a space as overhead, the traffic per id sent is about 8 bytes.
+    # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
+    # A NOT IN query with 5000 ids took about 30ms in tests made.
+    # These numbers seem low enough to outweigh the disadvantages of the
+    # solutions described above.
+    timer = autotest_stats.Timer('shard_heartbeat')
+    with timer:
+        shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
+        rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
+        assert len(known_host_ids) == len(known_host_statuses)
+        for i in range(len(known_host_ids)):
+            host_model = models.Host.objects.get(pk=known_host_ids[i])
+            if host_model.status != known_host_statuses[i]:
+                host_model.status = known_host_statuses[i]
+                host_model.save()
+
+        hosts, jobs, suite_keyvals = rpc_utils.find_records_for_shard(
+                shard_obj, known_job_ids=known_job_ids,
+                known_host_ids=known_host_ids)
+        return {
+            'hosts': [host.serialize() for host in hosts],
+            'jobs': [job.serialize() for job in jobs],
+            'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
+        }
+
+
+def get_shards(**filter_data):
+    """Return a list of all shards.
+
+    @returns A sequence of nested dictionaries of shard information.
+    """
+    shards = models.Shard.query_objects(filter_data)
+    serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
+    for serialized, shard in zip(serialized_shards, shards):
+        serialized['labels'] = [label.name for label in shard.labels.all()]
+
+    return serialized_shards
+
+
+def _assign_board_to_shard_precheck(labels):
+    """Verify whether board labels are valid to be added to a given shard.
+
+    First check whether board label is in correct format. Second, check whether
+    the board label exist. Third, check whether the board has already been
+    assigned to shard.
+
+    @param labels: Board labels separated by comma.
+
+    @raises error.RPCException: If label provided doesn't start with `board:`
+            or board has been added to shard already.
+    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
+
+    @returns: A list of label models that ready to be added to shard.
+    """
+    labels = labels.split(',')
+    label_models = []
+    for label in labels:
+        # Check whether the board label is in correct format.
+        if not label.startswith('board:'):
+            raise error.RPCException('Sharding only supports `board:.*` label.')
+        # Check whether the board label exist. If not, exception will be thrown
+        # by smart_get function.
+        label = models.Label.smart_get(label)
+        # Check whether the board has been sharded already
+        try:
+            shard = models.Shard.objects.get(labels=label)
+            raise error.RPCException(
+                    '%s is already on shard %s' % (label, shard.hostname))
+        except models.Shard.DoesNotExist:
+            # board is not on any shard, so it's valid.
+            label_models.append(label)
+    return label_models
+
+
+def add_shard(hostname, labels):
+    """Add a shard and start running jobs on it.
+
+    @param hostname: The hostname of the shard to be added; needs to be unique.
+    @param labels: Board labels separated by comma. Jobs of one of the labels
+                   will be assigned to the shard.
+
+    @raises error.RPCException: If label provided doesn't start with `board:` or
+            board has been added to shard already.
+    @raises model_logic.ValidationError: If a shard with the given hostname
+            already exist.
+    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
+
+    @returns: The id of the added shard.
+    """
+    labels = _assign_board_to_shard_precheck(labels)
+    shard = models.Shard.add_object(hostname=hostname)
+    for label in labels:
+        shard.labels.add(label)
+    return shard.id
+
+
+def add_board_to_shard(hostname, labels):
+    """Add boards to a given shard
+
+    @param hostname: The hostname of the shard to be changed.
+    @param labels: Board labels separated by comma.
+
+    @raises error.RPCException: If label provided doesn't start with `board:` or
+            board has been added to shard already.
+    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
+
+    @returns: The id of the changed shard.
+    """
+    labels = _assign_board_to_shard_precheck(labels)
+    shard = models.Shard.objects.get(hostname=hostname)
+    for label in labels:
+        shard.labels.add(label)
+    return shard.id
+
+
+def delete_shard(hostname):
+    """Delete a shard and reclaim all resources from it.
+
+    This claims back all assigned hosts from the shard. To ensure all DUTs are
+    in a sane state, a Reboot task with highest priority is scheduled for them.
+    This reboots the DUTs and then all left tasks continue to run in drone of
+    the master.
+
+    The procedure for deleting a shard:
+        * Lock all unlocked hosts on that shard.
+        * Remove shard information .
+        * Assign a reboot task with highest priority to these hosts.
+        * Unlock these hosts, then, the reboot tasks run in front of all other
+        tasks.
+
+    The status of jobs that haven't been reported to be finished yet, will be
+    lost. The master scheduler will pick up the jobs and execute them.
+
+    @param hostname: Hostname of the shard to delete.
+    """
+    shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
+    hostnames_to_lock = [h.hostname for h in
+                         models.Host.objects.filter(shard=shard, locked=False)]
+
+    # TODO(beeps): Power off shard
+    # For ChromeOS hosts, a reboot test with the highest priority is added to
+    # the DUT. After a reboot it should be ganranteed that no processes from
+    # prior tests that were run by a shard are still running on.
+
+    # Lock all unlocked hosts.
+    dicts = {'locked': True, 'lock_time': datetime.datetime.now()}
+    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
+
+    # Remove shard information.
+    models.Host.objects.filter(shard=shard).update(shard=None)
+    models.Job.objects.filter(shard=shard).update(shard=None)
+    shard.labels.clear()
+    shard.delete()
+
+    # Assign a reboot task with highest priority: Super.
+    t = models.Test.objects.get(name='platform_BootPerfServer:shard')
+    c = utils.read_file(os.path.join(common.autotest_dir, t.path))
+    if hostnames_to_lock:
+        rpc_utils.create_job_common(
+                'reboot_dut_for_shard_deletion',
+                priority=priorities.Priority.SUPER,
+                control_type='Server',
+                control_file=c, hosts=hostnames_to_lock)
+
+    # Unlock these shard-related hosts.
+    dicts = {'locked': False, 'lock_time': None}
+    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
+
+
+def get_servers(hostname=None, role=None, status=None):
+    """Get a list of servers with matching role and status.
+
+    @param hostname: FQDN of the server.
+    @param role: Name of the server role, e.g., drone, scheduler. Default to
+                 None to match any role.
+    @param status: Status of the server, e.g., primary, backup, repair_required.
+                   Default to None to match any server status.
+
+    @raises error.RPCException: If server database is not used.
+    @return: A list of server names for servers with matching role and status.
+    """
+    if not server_manager_utils.use_server_db():
+        raise error.RPCException('Server database is not enabled. Please try '
+                                 'retrieve servers from global config.')
+    servers = server_manager_utils.get_servers(hostname=hostname, role=role,
+                                               status=status)
+    return [s.get_details() for s in servers]
+
+
+@rpc_utils.route_rpc_to_master
+def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
+    """Get stable version for the given board.
+
+    @param board: Name of the board.
+    @param android: If True, the given board is an Android-based device. If
+                    False, assume its a Chrome OS-based device.
+
+    @return: Stable version of the given board. Return global configure value
+             of CROS.stable_cros_version if stable_versinos table does not have
+             entry of board DEFAULT.
+    """
+    return stable_version_utils.get(board=board, android=android)
+
+
+@rpc_utils.route_rpc_to_master
+def get_all_stable_versions():
+    """Get stable versions for all boards.
+
+    @return: A dictionary of board:version.
+    """
+    return stable_version_utils.get_all()
+
+
+@rpc_utils.route_rpc_to_master
+def set_stable_version(version, board=stable_version_utils.DEFAULT):
+    """Modify stable version for the given board.
+
+    @param version: The new value of stable version for given board.
+    @param board: Name of the board, default to value `DEFAULT`.
+    """
+    stable_version_utils.set(version=version, board=board)
+
+
+@rpc_utils.route_rpc_to_master
+def delete_stable_version(board):
+    """Modify stable version for the given board.
+
+    Delete a stable version entry in afe_stable_versions table for a given
+    board, so default stable version will be used.
+
+    @param board: Name of the board.
+    """
+    stable_version_utils.delete(board=board)
+
+
+def get_tests_by_build(build, ignore_invalid_tests=True):
+    """Get the tests that are available for the specified build.
+
+    @param build: unique name by which to refer to the image.
+    @param ignore_invalid_tests: flag on if unparsable tests are ignored.
+
+    @return: A sorted list of all tests that are in the build specified.
+    """
+    # Collect the control files specified in this build
+    cfile_getter = control_file_lib._initialize_control_file_getter(build)
+    if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
+        control_file_info_list = cfile_getter.get_suite_info()
+        control_file_list = control_file_info_list.keys()
+    else:
+        control_file_list = cfile_getter.get_control_file_list()
+
+    test_objects = []
+    _id = 0
+    for control_file_path in control_file_list:
+        # Read and parse the control file
+        if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
+            control_file = control_file_info_list[control_file_path]
+        else:
+            control_file = cfile_getter.get_control_file_contents(
+                    control_file_path)
+        try:
+            control_obj = control_data.parse_control_string(control_file)
+        except:
+            logging.info('Failed to parse control file: %s', control_file_path)
+            if not ignore_invalid_tests:
+                raise
+
+        # Extract the values needed for the AFE from the control_obj.
+        # The keys list represents attributes in the control_obj that
+        # are required by the AFE
+        keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
+                'test_category', 'test_class', 'dependencies', 'run_verify',
+                'sync_count', 'job_retries', 'retries', 'path']
+
+        test_object = {}
+        for key in keys:
+            test_object[key] = getattr(control_obj, key) if hasattr(
+                    control_obj, key) else ''
+
+        # Unfortunately, the AFE expects different key-names for certain
+        # values, these must be corrected to avoid the risk of tests
+        # being omitted by the AFE.
+        # The 'id' is an additional value used in the AFE.
+        # The control_data parsing does not reference 'run_reset', but it
+        # is also used in the AFE and defaults to True.
+        test_object['id'] = _id
+        test_object['run_reset'] = True
+        test_object['description'] = test_object.get('doc', '')
+        test_object['test_time'] = test_object.get('time', 0)
+        test_object['test_retry'] = test_object.get('retries', 0)
+
+        # Fix the test name to be consistent with the current presentation
+        # of test names in the AFE.
+        testpath, subname = os.path.split(control_file_path)
+        testname = os.path.basename(testpath)
+        subname = subname.split('.')[1:]
+        if subname:
+            testname = '%s:%s' % (testname, ':'.join(subname))
+
+        test_object['name'] = testname
+
+        # Correct the test path as parse_control_string sets an empty string.
+        test_object['path'] = control_file_path
+
+        _id += 1
+        test_objects.append(test_object)
+
+    test_objects = sorted(test_objects, key=lambda x: x.get('name'))
+    return rpc_utils.prepare_for_serialization(test_objects)
diff --git a/frontend/afe/rpc_interface_unittest.py b/frontend/afe/rpc_interface_unittest.py
index a4b7db1..89a7144 100755
--- a/frontend/afe/rpc_interface_unittest.py
+++ b/frontend/afe/rpc_interface_unittest.py
@@ -2,20 +2,28 @@
 #pylint: disable-msg=C0111
 
 import datetime
+import mox
 import unittest
 
 import common
-from autotest_lib.frontend import setup_django_environment
-from autotest_lib.frontend.afe import frontend_test_utils
-from autotest_lib.frontend.afe import models, rpc_interface, frontend_test_utils
-from autotest_lib.frontend.afe import model_logic, model_attributes
-from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import control_data
 from autotest_lib.client.common_lib import error
+from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import priorities
+from autotest_lib.client.common_lib.cros import dev_server
 from autotest_lib.client.common_lib.test_utils import mock
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend.afe import frontend_test_utils
+from autotest_lib.frontend.afe import model_attributes
+from autotest_lib.frontend.afe import model_logic
+from autotest_lib.frontend.afe import models
+from autotest_lib.frontend.afe import rpc_interface
+from autotest_lib.frontend.afe import rpc_utils
 from autotest_lib.server import frontend
 from autotest_lib.server import utils as server_utils
+from autotest_lib.server.cros import provision
+from autotest_lib.server.cros.dynamic_suite import constants
+from autotest_lib.server.cros.dynamic_suite import control_file_getter
 from autotest_lib.server.cros.dynamic_suite import frontend_wrappers
 
 CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT
@@ -705,5 +713,648 @@
         self.assertEquals('cool-image', image)
 
 
+class ExtraRpcInterfaceTest(mox.MoxTestBase,
+                           frontend_test_utils.FrontendTestMixin):
+    """Unit tests for functions originally in site_rpc_interface.py.
+
+    @var _NAME: fake suite name.
+    @var _BOARD: fake board to reimage.
+    @var _BUILD: fake build with which to reimage.
+    @var _PRIORITY: fake priority with which to reimage.
+    """
+    _NAME = 'name'
+    _BOARD = 'link'
+    _BUILD = 'link-release/R36-5812.0.0'
+    _BUILDS = {provision.CROS_VERSION_PREFIX: _BUILD}
+    _PRIORITY = priorities.Priority.DEFAULT
+    _TIMEOUT = 24
+
+
+    def setUp(self):
+        super(ExtraRpcInterfaceTest, self).setUp()
+        self._SUITE_NAME = rpc_interface.canonicalize_suite_name(
+            self._NAME)
+        self.dev_server = self.mox.CreateMock(dev_server.ImageServer)
+        self._frontend_common_setup(fill_data=False)
+
+
+    def tearDown(self):
+        self._frontend_common_teardown()
+
+
+    def _setupDevserver(self):
+        self.mox.StubOutClassWithMocks(dev_server, 'ImageServer')
+        dev_server.resolve(self._BUILD).AndReturn(self.dev_server)
+
+
+    def _mockDevServerGetter(self, get_control_file=True):
+        self._setupDevserver()
+        if get_control_file:
+          self.getter = self.mox.CreateMock(
+              control_file_getter.DevServerGetter)
+          self.mox.StubOutWithMock(control_file_getter.DevServerGetter,
+                                   'create')
+          control_file_getter.DevServerGetter.create(
+              mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(self.getter)
+
+
+    def _mockRpcUtils(self, to_return, control_file_substring=''):
+        """Fake out the autotest rpc_utils module with a mockable class.
+
+        @param to_return: the value that rpc_utils.create_job_common() should
+                          be mocked out to return.
+        @param control_file_substring: A substring that is expected to appear
+                                       in the control file output string that
+                                       is passed to create_job_common.
+                                       Default: ''
+        """
+        download_started_time = constants.DOWNLOAD_STARTED_TIME
+        payload_finished_time = constants.PAYLOAD_FINISHED_TIME
+        self.mox.StubOutWithMock(rpc_utils, 'create_job_common')
+        rpc_utils.create_job_common(mox.And(mox.StrContains(self._NAME),
+                                    mox.StrContains(self._BUILD)),
+                            priority=self._PRIORITY,
+                            timeout_mins=self._TIMEOUT*60,
+                            max_runtime_mins=self._TIMEOUT*60,
+                            control_type='Server',
+                            control_file=mox.And(mox.StrContains(self._BOARD),
+                                                 mox.StrContains(self._BUILD),
+                                                 mox.StrContains(
+                                                     control_file_substring)),
+                            hostless=True,
+                            keyvals=mox.And(mox.In(download_started_time),
+                                            mox.In(payload_finished_time))
+                            ).AndReturn(to_return)
+
+
+    def testStageBuildFail(self):
+        """Ensure that a failure to stage the desired build fails the RPC."""
+        self._setupDevserver()
+
+        self.dev_server.hostname = 'mox_url'
+        self.dev_server.stage_artifacts(
+                image=self._BUILD, artifacts=['test_suites']).AndRaise(
+                dev_server.DevServerException())
+        self.mox.ReplayAll()
+        self.assertRaises(error.StageControlFileFailure,
+                          rpc_interface.create_suite_job,
+                          name=self._NAME,
+                          board=self._BOARD,
+                          builds=self._BUILDS,
+                          pool=None)
+
+
+    def testGetControlFileFail(self):
+        """Ensure that a failure to get needed control file fails the RPC."""
+        self._mockDevServerGetter()
+
+        self.dev_server.hostname = 'mox_url'
+        self.dev_server.stage_artifacts(
+                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
+
+        self.getter.get_control_file_contents_by_name(
+            self._SUITE_NAME).AndReturn(None)
+        self.mox.ReplayAll()
+        self.assertRaises(error.ControlFileEmpty,
+                          rpc_interface.create_suite_job,
+                          name=self._NAME,
+                          board=self._BOARD,
+                          builds=self._BUILDS,
+                          pool=None)
+
+
+    def testGetControlFileListFail(self):
+        """Ensure that a failure to get needed control file fails the RPC."""
+        self._mockDevServerGetter()
+
+        self.dev_server.hostname = 'mox_url'
+        self.dev_server.stage_artifacts(
+                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
+
+        self.getter.get_control_file_contents_by_name(
+            self._SUITE_NAME).AndRaise(error.NoControlFileList())
+        self.mox.ReplayAll()
+        self.assertRaises(error.NoControlFileList,
+                          rpc_interface.create_suite_job,
+                          name=self._NAME,
+                          board=self._BOARD,
+                          builds=self._BUILDS,
+                          pool=None)
+
+
+    def testBadNumArgument(self):
+        """Ensure we handle bad values for the |num| argument."""
+        self.assertRaises(error.SuiteArgumentException,
+                          rpc_interface.create_suite_job,
+                          name=self._NAME,
+                          board=self._BOARD,
+                          builds=self._BUILDS,
+                          pool=None,
+                          num='goo')
+        self.assertRaises(error.SuiteArgumentException,
+                          rpc_interface.create_suite_job,
+                          name=self._NAME,
+                          board=self._BOARD,
+                          builds=self._BUILDS,
+                          pool=None,
+                          num=[])
+        self.assertRaises(error.SuiteArgumentException,
+                          rpc_interface.create_suite_job,
+                          name=self._NAME,
+                          board=self._BOARD,
+                          builds=self._BUILDS,
+                          pool=None,
+                          num='5')
+
+
+
+    def testCreateSuiteJobFail(self):
+        """Ensure that failure to schedule the suite job fails the RPC."""
+        self._mockDevServerGetter()
+
+        self.dev_server.hostname = 'mox_url'
+        self.dev_server.stage_artifacts(
+                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
+
+        self.getter.get_control_file_contents_by_name(
+            self._SUITE_NAME).AndReturn('f')
+
+        self.dev_server.url().AndReturn('mox_url')
+        self._mockRpcUtils(-1)
+        self.mox.ReplayAll()
+        self.assertEquals(
+            rpc_interface.create_suite_job(name=self._NAME,
+                                           board=self._BOARD,
+                                           builds=self._BUILDS, pool=None),
+            -1)
+
+
+    def testCreateSuiteJobSuccess(self):
+        """Ensures that success results in a successful RPC."""
+        self._mockDevServerGetter()
+
+        self.dev_server.hostname = 'mox_url'
+        self.dev_server.stage_artifacts(
+                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
+
+        self.getter.get_control_file_contents_by_name(
+            self._SUITE_NAME).AndReturn('f')
+
+        self.dev_server.url().AndReturn('mox_url')
+        job_id = 5
+        self._mockRpcUtils(job_id)
+        self.mox.ReplayAll()
+        self.assertEquals(
+            rpc_interface.create_suite_job(name=self._NAME,
+                                           board=self._BOARD,
+                                           builds=self._BUILDS,
+                                           pool=None),
+            job_id)
+
+
+    def testCreateSuiteJobNoHostCheckSuccess(self):
+        """Ensures that success results in a successful RPC."""
+        self._mockDevServerGetter()
+
+        self.dev_server.hostname = 'mox_url'
+        self.dev_server.stage_artifacts(
+                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
+
+        self.getter.get_control_file_contents_by_name(
+            self._SUITE_NAME).AndReturn('f')
+
+        self.dev_server.url().AndReturn('mox_url')
+        job_id = 5
+        self._mockRpcUtils(job_id)
+        self.mox.ReplayAll()
+        self.assertEquals(
+          rpc_interface.create_suite_job(name=self._NAME,
+                                         board=self._BOARD,
+                                         builds=self._BUILDS,
+                                         pool=None, check_hosts=False),
+          job_id)
+
+    def testCreateSuiteIntegerNum(self):
+        """Ensures that success results in a successful RPC."""
+        self._mockDevServerGetter()
+
+        self.dev_server.hostname = 'mox_url'
+        self.dev_server.stage_artifacts(
+                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
+
+        self.getter.get_control_file_contents_by_name(
+            self._SUITE_NAME).AndReturn('f')
+
+        self.dev_server.url().AndReturn('mox_url')
+        job_id = 5
+        self._mockRpcUtils(job_id, control_file_substring='num=17')
+        self.mox.ReplayAll()
+        self.assertEquals(
+            rpc_interface.create_suite_job(name=self._NAME,
+                                           board=self._BOARD,
+                                           builds=self._BUILDS,
+                                           pool=None,
+                                           check_hosts=False,
+                                           num=17),
+            job_id)
+
+
+    def testCreateSuiteJobControlFileSupplied(self):
+        """Ensure we can supply the control file to create_suite_job."""
+        self._mockDevServerGetter(get_control_file=False)
+
+        self.dev_server.hostname = 'mox_url'
+        self.dev_server.stage_artifacts(
+                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
+        self.dev_server.url().AndReturn('mox_url')
+        job_id = 5
+        self._mockRpcUtils(job_id)
+        self.mox.ReplayAll()
+        self.assertEquals(
+            rpc_interface.create_suite_job(name='%s/%s' % (self._NAME,
+                                                           self._BUILD),
+                                           board=None,
+                                           builds=self._BUILDS,
+                                           pool=None,
+                                           control_file='CONTROL FILE'),
+            job_id)
+
+
+    def _get_records_for_sending_to_master(self):
+        return [{'control_file': 'foo',
+                 'control_type': 1,
+                 'created_on': datetime.datetime(2014, 8, 21),
+                 'drone_set': None,
+                 'email_list': '',
+                 'max_runtime_hrs': 72,
+                 'max_runtime_mins': 1440,
+                 'name': 'dummy',
+                 'owner': 'autotest_system',
+                 'parse_failed_repair': True,
+                 'priority': 40,
+                 'reboot_after': 0,
+                 'reboot_before': 1,
+                 'run_reset': True,
+                 'run_verify': False,
+                 'synch_count': 0,
+                 'test_retry': 10,
+                 'timeout': 24,
+                 'timeout_mins': 1440,
+                 'id': 1
+                 }], [{
+                    'aborted': False,
+                    'active': False,
+                    'complete': False,
+                    'deleted': False,
+                    'execution_subdir': '',
+                    'finished_on': None,
+                    'started_on': None,
+                    'status': 'Queued',
+                    'id': 1
+                }]
+
+
+    def _do_heartbeat_and_assert_response(self, shard_hostname='shard1',
+                                          upload_jobs=(), upload_hqes=(),
+                                          known_jobs=(), known_hosts=(),
+                                          **kwargs):
+        known_job_ids = [job.id for job in known_jobs]
+        known_host_ids = [host.id for host in known_hosts]
+        known_host_statuses = [host.status for host in known_hosts]
+
+        retval = rpc_interface.shard_heartbeat(
+            shard_hostname=shard_hostname,
+            jobs=upload_jobs, hqes=upload_hqes,
+            known_job_ids=known_job_ids, known_host_ids=known_host_ids,
+            known_host_statuses=known_host_statuses)
+
+        self._assert_shard_heartbeat_response(shard_hostname, retval,
+                                              **kwargs)
+
+        return shard_hostname
+
+
+    def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[],
+                                         hosts=[], hqes=[]):
+
+        retval_hosts, retval_jobs = retval['hosts'], retval['jobs']
+
+        expected_jobs = [
+            (job.id, job.name, shard_hostname) for job in jobs]
+        returned_jobs = [(job['id'], job['name'], job['shard']['hostname'])
+                         for job in retval_jobs]
+        self.assertEqual(returned_jobs, expected_jobs)
+
+        expected_hosts = [(host.id, host.hostname) for host in hosts]
+        returned_hosts = [(host['id'], host['hostname'])
+                          for host in retval_hosts]
+        self.assertEqual(returned_hosts, expected_hosts)
+
+        retval_hqes = []
+        for job in retval_jobs:
+            retval_hqes += job['hostqueueentry_set']
+
+        expected_hqes = [(hqe.id) for hqe in hqes]
+        returned_hqes = [(hqe['id']) for hqe in retval_hqes]
+        self.assertEqual(returned_hqes, expected_hqes)
+
+
+    def _send_records_to_master_helper(
+        self, jobs, hqes, shard_hostname='host1',
+        exception_to_throw=error.UnallowedRecordsSentToMaster, aborted=False):
+        job_id = rpc_interface.create_job(
+                name='dummy',
+                priority=self._PRIORITY,
+                control_file='foo',
+                control_type=SERVER,
+                test_retry=10, hostless=True)
+        job = models.Job.objects.get(pk=job_id)
+        shard = models.Shard.objects.create(hostname='host1')
+        job.shard = shard
+        job.save()
+
+        if aborted:
+            job.hostqueueentry_set.update(aborted=True)
+            job.shard = None
+            job.save()
+
+        hqe = job.hostqueueentry_set.all()[0]
+        if not exception_to_throw:
+            self._do_heartbeat_and_assert_response(
+                shard_hostname=shard_hostname,
+                upload_jobs=jobs, upload_hqes=hqes)
+        else:
+            self.assertRaises(
+                exception_to_throw,
+                self._do_heartbeat_and_assert_response,
+                shard_hostname=shard_hostname,
+                upload_jobs=jobs, upload_hqes=hqes)
+
+
+    def testSendingRecordsToMaster(self):
+        """Send records to the master and ensure they are persisted."""
+        jobs, hqes = self._get_records_for_sending_to_master()
+        hqes[0]['status'] = 'Completed'
+        self._send_records_to_master_helper(
+            jobs=jobs, hqes=hqes, exception_to_throw=None)
+
+        # Check the entry was actually written to db
+        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
+                         'Completed')
+
+
+    def testSendingRecordsToMasterAbortedOnMaster(self):
+        """Send records to the master and ensure they are persisted."""
+        jobs, hqes = self._get_records_for_sending_to_master()
+        hqes[0]['status'] = 'Completed'
+        self._send_records_to_master_helper(
+            jobs=jobs, hqes=hqes, exception_to_throw=None, aborted=True)
+
+        # Check the entry was actually written to db
+        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
+                         'Completed')
+
+
+    def testSendingRecordsToMasterJobAssignedToDifferentShard(self):
+        """Ensure records that belong to a different shard are rejected."""
+        jobs, hqes = self._get_records_for_sending_to_master()
+        models.Shard.objects.create(hostname='other_shard')
+        self._send_records_to_master_helper(
+            jobs=jobs, hqes=hqes, shard_hostname='other_shard')
+
+
+    def testSendingRecordsToMasterJobHqeWithoutJob(self):
+        """Ensure update for hqe without update for it's job gets rejected."""
+        _, hqes = self._get_records_for_sending_to_master()
+        self._send_records_to_master_helper(
+            jobs=[], hqes=hqes)
+
+
+    def testSendingRecordsToMasterNotExistingJob(self):
+        """Ensure update for non existing job gets rejected."""
+        jobs, hqes = self._get_records_for_sending_to_master()
+        jobs[0]['id'] = 3
+
+        self._send_records_to_master_helper(
+            jobs=jobs, hqes=hqes)
+
+
+    def _createShardAndHostWithLabel(self, shard_hostname='shard1',
+                                     host_hostname='host1',
+                                     label_name='board:lumpy'):
+        label = models.Label.objects.create(name=label_name)
+
+        shard = models.Shard.objects.create(hostname=shard_hostname)
+        shard.labels.add(label)
+
+        host = models.Host.objects.create(hostname=host_hostname, leased=False)
+        host.labels.add(label)
+
+        return shard, host, label
+
+
+    def _createJobForLabel(self, label):
+        job_id = rpc_interface.create_job(name='dummy', priority=self._PRIORITY,
+                                          control_file='foo',
+                                          control_type=CLIENT,
+                                          meta_hosts=[label.name],
+                                          dependencies=(label.name,))
+        return models.Job.objects.get(id=job_id)
+
+
+    def testShardHeartbeatFetchHostlessJob(self):
+        """Create a hostless job and ensure it's not assigned to a shard."""
+        shard1, host1, lumpy_label = self._createShardAndHostWithLabel(
+            'shard1', 'host1', 'board:lumpy')
+
+        label2 = models.Label.objects.create(name='bluetooth', platform=False)
+
+        job1 = self._create_job(hostless=True)
+
+        # Hostless jobs should be executed by the global scheduler.
+        self._do_heartbeat_and_assert_response(hosts=[host1])
+
+
+    def testShardRetrieveJobs(self):
+        """Create jobs and retrieve them."""
+        # should never be returned by heartbeat
+        leased_host = models.Host.objects.create(hostname='leased_host',
+                                                 leased=True)
+
+        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
+        shard2, host2, grumpy_label = self._createShardAndHostWithLabel(
+            'shard2', 'host2', 'board:grumpy')
+
+        leased_host.labels.add(lumpy_label)
+
+        job1 = self._createJobForLabel(lumpy_label)
+
+        job2 = self._createJobForLabel(grumpy_label)
+
+        job_completed = self._createJobForLabel(lumpy_label)
+        # Job is already being run, so don't sync it
+        job_completed.hostqueueentry_set.update(complete=True)
+        job_completed.hostqueueentry_set.create(complete=False)
+
+        job_active = self._createJobForLabel(lumpy_label)
+        # Job is already started, so don't sync it
+        job_active.hostqueueentry_set.update(active=True)
+        job_active.hostqueueentry_set.create(complete=False, active=False)
+
+        self._do_heartbeat_and_assert_response(
+            jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all())
+
+        self._do_heartbeat_and_assert_response(
+            shard_hostname=shard2.hostname,
+            jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all())
+
+        host3 = models.Host.objects.create(hostname='host3', leased=False)
+        host3.labels.add(lumpy_label)
+
+        self._do_heartbeat_and_assert_response(
+            known_jobs=[job1], known_hosts=[host1], hosts=[host3])
+
+
+    def testResendJobsAfterFailedHeartbeat(self):
+        """Create jobs, retrieve them, fail on client, fetch them again."""
+        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
+
+        job1 = self._createJobForLabel(lumpy_label)
+
+        self._do_heartbeat_and_assert_response(
+            jobs=[job1],
+            hqes=job1.hostqueueentry_set.all(), hosts=[host1])
+
+        # Make sure it's resubmitted by sending last_job=None again
+        self._do_heartbeat_and_assert_response(
+            known_hosts=[host1],
+            jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[])
+
+        # Now it worked, make sure it's not sent again
+        self._do_heartbeat_and_assert_response(
+            known_jobs=[job1], known_hosts=[host1])
+
+        job1 = models.Job.objects.get(pk=job1.id)
+        job1.hostqueueentry_set.all().update(complete=True)
+
+        # Job is completed, make sure it's not sent again
+        self._do_heartbeat_and_assert_response(
+            known_hosts=[host1])
+
+        job2 = self._createJobForLabel(lumpy_label)
+
+        # job2's creation was later, it should be returned now.
+        self._do_heartbeat_and_assert_response(
+            known_hosts=[host1],
+            jobs=[job2], hqes=job2.hostqueueentry_set.all())
+
+        self._do_heartbeat_and_assert_response(
+            known_jobs=[job2], known_hosts=[host1])
+
+        job2 = models.Job.objects.get(pk=job2.pk)
+        job2.hostqueueentry_set.update(aborted=True)
+        # Setting a job to a complete status will set the shard_id to None in
+        # scheduler_models. We have to emulate that here, because we use Django
+        # models in tests.
+        job2.shard = None
+        job2.save()
+
+        self._do_heartbeat_and_assert_response(
+            known_jobs=[job2], known_hosts=[host1],
+            jobs=[job2],
+            hqes=job2.hostqueueentry_set.all())
+
+        models.Test.objects.create(name='platform_BootPerfServer:shard',
+                                   test_type=1)
+        self.mox.StubOutWithMock(server_utils, 'read_file')
+        server_utils.read_file(mox.IgnoreArg()).AndReturn('')
+        self.mox.ReplayAll()
+        rpc_interface.delete_shard(hostname=shard1.hostname)
+
+        self.assertRaises(
+            models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id)
+
+        job1 = models.Job.objects.get(pk=job1.id)
+        lumpy_label = models.Label.objects.get(pk=lumpy_label.id)
+        host1 = models.Host.objects.get(pk=host1.id)
+        super_job = models.Job.objects.get(priority=priorities.Priority.SUPER)
+        super_job_host = models.HostQueueEntry.objects.get(
+                job_id=super_job.id)
+
+        self.assertIsNone(job1.shard)
+        self.assertEqual(len(lumpy_label.shard_set.all()), 0)
+        self.assertIsNone(host1.shard)
+        self.assertIsNotNone(super_job)
+        self.assertEqual(super_job_host.host_id, host1.id)
+
+
+    def testCreateListShard(self):
+        """Retrieve a list of all shards."""
+        lumpy_label = models.Label.objects.create(name='board:lumpy',
+                                                  platform=True)
+        stumpy_label = models.Label.objects.create(name='board:stumpy',
+                                                  platform=True)
+        peppy_label = models.Label.objects.create(name='board:peppy',
+                                                  platform=True)
+
+        shard_id = rpc_interface.add_shard(
+            hostname='host1', labels='board:lumpy,board:stumpy')
+        self.assertRaises(error.RPCException,
+                          rpc_interface.add_shard,
+                          hostname='host1', labels='board:lumpy,board:stumpy')
+        self.assertRaises(model_logic.ValidationError,
+                          rpc_interface.add_shard,
+                          hostname='host1', labels='board:peppy')
+        shard = models.Shard.objects.get(pk=shard_id)
+        self.assertEqual(shard.hostname, 'host1')
+        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
+        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
+
+        self.assertEqual(rpc_interface.get_shards(),
+                         [{'labels': ['board:lumpy','board:stumpy'],
+                           'hostname': 'host1',
+                           'id': 1}])
+
+
+    def testAddBoardsToShard(self):
+        """Add boards to a given shard."""
+        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
+        stumpy_label = models.Label.objects.create(name='board:stumpy',
+                                                   platform=True)
+        shard_id = rpc_interface.add_board_to_shard(
+            hostname='shard1', labels='board:stumpy')
+        # Test whether raise exception when board label does not exist.
+        self.assertRaises(models.Label.DoesNotExist,
+                          rpc_interface.add_board_to_shard,
+                          hostname='shard1', labels='board:test')
+        # Test whether raise exception when board already sharded.
+        self.assertRaises(error.RPCException,
+                          rpc_interface.add_board_to_shard,
+                          hostname='shard1', labels='board:lumpy')
+        shard = models.Shard.objects.get(pk=shard_id)
+        self.assertEqual(shard.hostname, 'shard1')
+        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
+        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
+
+        self.assertEqual(rpc_interface.get_shards(),
+                         [{'labels': ['board:lumpy','board:stumpy'],
+                           'hostname': 'shard1',
+                           'id': 1}])
+
+
+    def testResendHostsAfterFailedHeartbeat(self):
+        """Check that master accepts resending updated records after failure."""
+        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
+
+        # Send the host
+        self._do_heartbeat_and_assert_response(hosts=[host1])
+
+        # Send it again because previous one didn't persist correctly
+        self._do_heartbeat_and_assert_response(hosts=[host1])
+
+        # Now it worked, make sure it isn't sent again
+        self._do_heartbeat_and_assert_response(known_hosts=[host1])
+
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/frontend/afe/site_rpc_interface.py b/frontend/afe/site_rpc_interface.py
deleted file mode 100644
index 5fd7b77..0000000
--- a/frontend/afe/site_rpc_interface.py
+++ /dev/null
@@ -1,773 +0,0 @@
-# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-__author__ = 'cmasone@chromium.org (Chris Masone)'
-
-import common
-import datetime
-import logging
-import os
-
-from autotest_lib.frontend.afe import models
-from autotest_lib.client.common_lib import control_data
-from autotest_lib.client.common_lib import error
-from autotest_lib.client.common_lib import global_config
-from autotest_lib.client.common_lib import priorities
-from autotest_lib.client.common_lib import time_utils
-from autotest_lib.client.common_lib.cros import dev_server
-# TODO(akeshet): Replace with monarch.
-from autotest_lib.client.common_lib.cros.graphite import autotest_stats
-from autotest_lib.frontend.afe import rpc_utils
-from autotest_lib.server import utils
-from autotest_lib.server.cros import provision
-from autotest_lib.server.cros.dynamic_suite import constants
-from autotest_lib.server.cros.dynamic_suite import control_file_getter
-from autotest_lib.server.cros.dynamic_suite import tools
-from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
-from autotest_lib.server.cros.dynamic_suite.suite import Suite
-from autotest_lib.site_utils import host_history
-from autotest_lib.site_utils import job_history
-from autotest_lib.site_utils import server_manager_utils
-from autotest_lib.site_utils import stable_version_utils
-
-
-_CONFIG = global_config.global_config
-
-# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
-
-
-def canonicalize_suite_name(suite_name):
-    """Canonicalize the suite's name.
-
-    @param suite_name: the name of the suite.
-    """
-    # Do not change this naming convention without updating
-    # site_utils.parse_job_name.
-    return 'test_suites/control.%s' % suite_name
-
-
-def formatted_now():
-    """Format the current datetime."""
-    return datetime.datetime.now().strftime(time_utils.TIME_FMT)
-
-
-def _get_control_file_by_build(build, ds, suite_name):
-    """Return control file contents for |suite_name|.
-
-    Query the dev server at |ds| for the control file |suite_name|, included
-    in |build| for |board|.
-
-    @param build: unique name by which to refer to the image from now on.
-    @param ds: a dev_server.DevServer instance to fetch control file with.
-    @param suite_name: canonicalized suite name, e.g. test_suites/control.bvt.
-    @raises ControlFileNotFound if a unique suite control file doesn't exist.
-    @raises NoControlFileList if we can't list the control files at all.
-    @raises ControlFileEmpty if the control file exists on the server, but
-                             can't be read.
-
-    @return the contents of the desired control file.
-    """
-    getter = control_file_getter.DevServerGetter.create(build, ds)
-    devserver_name = ds.hostname
-    timer = autotest_stats.Timer('control_files.parse.%s.%s' %
-                                 (devserver_name.replace('.', '_'),
-                                  suite_name.rsplit('.')[-1]))
-    # Get the control file for the suite.
-    try:
-        with timer:
-            control_file_in = getter.get_control_file_contents_by_name(
-                    suite_name)
-    except error.CrosDynamicSuiteException as e:
-        raise type(e)('Failed to get control file for %s '
-                      '(devserver: %s) (error: %s)' %
-                      (build, devserver_name, e))
-    if not control_file_in:
-        raise error.ControlFileEmpty(
-            "Fetching %s returned no data. (devserver: %s)" %
-            (suite_name, devserver_name))
-    # Force control files to only contain ascii characters.
-    try:
-        control_file_in.encode('ascii')
-    except UnicodeDecodeError as e:
-        raise error.ControlFileMalformed(str(e))
-
-    return control_file_in
-
-
-def _get_control_file_by_suite(suite_name):
-    """Get control file contents by suite name.
-
-    @param suite_name: Suite name as string.
-    @returns: Control file contents as string.
-    """
-    getter = control_file_getter.FileSystemGetter(
-        [_CONFIG.get_config_value('SCHEDULER',
-                                  'drone_installation_directory')])
-    return getter.get_control_file_contents_by_name(suite_name)
-
-
-def _stage_build_artifacts(build, hostname=None):
-    """
-    Ensure components of |build| necessary for installing images are staged.
-
-    @param build image we want to stage.
-    @param hostname hostname of a dut may run test on. This is to help to locate
-        a devserver closer to duts if needed. Default is None.
-
-    @raises StageControlFileFailure: if the dev server throws 500 while staging
-        suite control files.
-
-    @return: dev_server.ImageServer instance to use with this build.
-    @return: timings dictionary containing staging start/end times.
-    """
-    timings = {}
-    # Ensure components of |build| necessary for installing images are staged
-    # on the dev server. However set synchronous to False to allow other
-    # components to be downloaded in the background.
-    ds = dev_server.resolve(build, hostname=hostname)
-    ds_name = ds.hostname
-    timings[constants.DOWNLOAD_STARTED_TIME] = formatted_now()
-    timer = autotest_stats.Timer('control_files.stage.%s' % (
-            ds_name.replace('.', '_')))
-    try:
-        with timer:
-            ds.stage_artifacts(image=build, artifacts=['test_suites'])
-    except dev_server.DevServerException as e:
-        raise error.StageControlFileFailure(
-                "Failed to stage %s on %s: %s" % (build, ds_name, e))
-    timings[constants.PAYLOAD_FINISHED_TIME] = formatted_now()
-    return (ds, timings)
-
-
-@rpc_utils.route_rpc_to_master
-def create_suite_job(
-        name='',
-        board='',
-        pool='',
-        control_file='',
-        check_hosts=True,
-        num=None,
-        file_bugs=False,
-        timeout=24,
-        timeout_mins=None,
-        priority=priorities.Priority.DEFAULT,
-        suite_args=None,
-        wait_for_results=True,
-        job_retry=False,
-        max_retries=None,
-        max_runtime_mins=None,
-        suite_min_duts=0,
-        offload_failures_only=False,
-        builds=None,
-        test_source_build=None,
-        run_prod_code=False,
-        delay_minutes=0,
-        is_cloning=False,
-        **kwargs
-):
-    """
-    Create a job to run a test suite on the given device with the given image.
-
-    When the timeout specified in the control file is reached, the
-    job is guaranteed to have completed and results will be available.
-
-    @param name: The test name if control_file is supplied, otherwise the name
-                 of the test suite to run, e.g. 'bvt'.
-    @param board: the kind of device to run the tests on.
-    @param builds: the builds to install e.g.
-                   {'cros-version:': 'x86-alex-release/R18-1655.0.0',
-                    'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
-                    'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
-                   If builds is given a value, it overrides argument build.
-    @param test_source_build: Build that contains the server-side test code.
-    @param pool: Specify the pool of machines to use for scheduling
-            purposes.
-    @param control_file: the control file of the job.
-    @param check_hosts: require appropriate live hosts to exist in the lab.
-    @param num: Specify the number of machines to schedule across (integer).
-                Leave unspecified or use None to use default sharding factor.
-    @param file_bugs: File a bug on each test failure in this suite.
-    @param timeout: The max lifetime of this suite, in hours.
-    @param timeout_mins: The max lifetime of this suite, in minutes. Takes
-                         priority over timeout.
-    @param priority: Integer denoting priority. Higher is more important.
-    @param suite_args: Optional arguments which will be parsed by the suite
-                       control file. Used by control.test_that_wrapper to
-                       determine which tests to run.
-    @param wait_for_results: Set to False to run the suite job without waiting
-                             for test jobs to finish. Default is True.
-    @param job_retry: Set to True to enable job-level retry. Default is False.
-    @param max_retries: Integer, maximum job retries allowed at suite level.
-                        None for no max.
-    @param max_runtime_mins: Maximum amount of time a job can be running in
-                             minutes.
-    @param suite_min_duts: Integer. Scheduler will prioritize getting the
-                           minimum number of machines for the suite when it is
-                           competing with another suite that has a higher
-                           priority but already got minimum machines it needs.
-    @param offload_failures_only: Only enable gs_offloading for failed jobs.
-    @param run_prod_code: If True, the suite will run the test code that
-                          lives in prod aka the test code currently on the
-                          lab servers. If False, the control files and test
-                          code for this suite run will be retrieved from the
-                          build artifacts.
-    @param delay_minutes: Delay the creation of test jobs for a given number of
-                          minutes.
-    @param is_cloning: True if creating a cloning job.
-    @param kwargs: extra keyword args. NOT USED.
-
-    @raises ControlFileNotFound: if a unique suite control file doesn't exist.
-    @raises NoControlFileList: if we can't list the control files at all.
-    @raises StageControlFileFailure: If the dev server throws 500 while
-                                     staging test_suites.
-    @raises ControlFileEmpty: if the control file exists on the server, but
-                              can't be read.
-
-    @return: the job ID of the suite; -1 on error.
-    """
-    if type(num) is not int and num is not None:
-        raise error.SuiteArgumentException('Ill specified num argument %r. '
-                                           'Must be an integer or None.' % num)
-    if num == 0:
-        logging.warning("Can't run on 0 hosts; using default.")
-        num = None
-
-    if builds is None:
-        builds = {}
-
-    # Default test source build to CrOS build if it's not specified and
-    # run_prod_code is set to False.
-    if not run_prod_code:
-        test_source_build = Suite.get_test_source_build(
-                builds, test_source_build=test_source_build)
-
-    sample_dut = rpc_utils.get_sample_dut(board, pool)
-
-    suite_name = canonicalize_suite_name(name)
-    if run_prod_code:
-        ds = dev_server.resolve(test_source_build, hostname=sample_dut)
-        keyvals = {}
-    else:
-        (ds, keyvals) = _stage_build_artifacts(
-                test_source_build, hostname=sample_dut)
-    keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
-
-    # Do not change this naming convention without updating
-    # site_utils.parse_job_name.
-    if run_prod_code:
-        # If run_prod_code is True, test_source_build is not set, use the
-        # first build in the builds list for the sutie job name.
-        name = '%s-%s' % (builds.values()[0], suite_name)
-    else:
-        name = '%s-%s' % (test_source_build, suite_name)
-
-    timeout_mins = timeout_mins or timeout * 60
-    max_runtime_mins = max_runtime_mins or timeout * 60
-
-    if not board:
-        board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
-
-    if run_prod_code:
-        control_file = _get_control_file_by_suite(suite_name)
-
-    if not control_file:
-        # No control file was supplied so look it up from the build artifacts.
-        control_file = _get_control_file_by_build(
-                test_source_build, ds, suite_name)
-
-    # Prepend builds and board to the control file.
-    if is_cloning:
-        control_file = tools.remove_injection(control_file)
-
-    inject_dict = {
-        'board': board,
-        # `build` is needed for suites like AU to stage image inside suite
-        # control file.
-        'build': test_source_build,
-        'builds': builds,
-        'check_hosts': check_hosts,
-        'pool': pool,
-        'num': num,
-        'file_bugs': file_bugs,
-        'timeout': timeout,
-        'timeout_mins': timeout_mins,
-        'devserver_url': ds.url(),
-        'priority': priority,
-        'suite_args' : suite_args,
-        'wait_for_results': wait_for_results,
-        'job_retry': job_retry,
-        'max_retries': max_retries,
-        'max_runtime_mins': max_runtime_mins,
-        'offload_failures_only': offload_failures_only,
-        'test_source_build': test_source_build,
-        'run_prod_code': run_prod_code,
-        'delay_minutes': delay_minutes,
-    }
-    control_file = tools.inject_vars(inject_dict, control_file)
-
-    return rpc_utils.create_job_common(name,
-                                       priority=priority,
-                                       timeout_mins=timeout_mins,
-                                       max_runtime_mins=max_runtime_mins,
-                                       control_type='Server',
-                                       control_file=control_file,
-                                       hostless=True,
-                                       keyvals=keyvals)
-
-
-def get_job_history(**filter_data):
-    """Get history of the job, including the special tasks executed for the job
-
-    @param filter_data: filter for the call, should at least include
-                        {'job_id': [job id]}
-    @returns: JSON string of the job's history, including the information such
-              as the hosts run the job and the special tasks executed before
-              and after the job.
-    """
-    job_id = filter_data['job_id']
-    job_info = job_history.get_job_info(job_id)
-    return rpc_utils.prepare_for_serialization(job_info.get_history())
-
-
-def get_host_history(start_time, end_time, hosts=None, board=None, pool=None):
-    """Get history of a list of host.
-
-    The return is a JSON string of host history for each host, for example,
-    {'172.22.33.51': [{'status': 'Resetting'
-                       'start_time': '2014-08-07 10:02:16',
-                       'end_time': '2014-08-07 10:03:16',
-                       'log_url': 'http://autotest/reset-546546/debug',
-                       'dbg_str': 'Task: Special Task 19441991 (host ...)'},
-                       {'status': 'Running'
-                       'start_time': '2014-08-07 10:03:18',
-                       'end_time': '2014-08-07 10:13:00',
-                       'log_url': 'http://autotest/reset-546546/debug',
-                       'dbg_str': 'HQE: 15305005, for job: 14995562'}
-                     ]
-    }
-    @param start_time: start time to search for history, can be string value or
-                       epoch time.
-    @param end_time: end time to search for history, can be string value or
-                     epoch time.
-    @param hosts: A list of hosts to search for history. Default is None.
-    @param board: board type of hosts. Default is None.
-    @param pool: pool type of hosts. Default is None.
-    @returns: JSON string of the host history.
-    """
-    return rpc_utils.prepare_for_serialization(
-            host_history.get_history_details(
-                    start_time=start_time, end_time=end_time,
-                    hosts=hosts, board=board, pool=pool,
-                    process_pool_size=4))
-
-
-def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
-                    known_host_ids=(), known_host_statuses=()):
-    """Receive updates for job statuses from shards and assign hosts and jobs.
-
-    @param shard_hostname: Hostname of the calling shard
-    @param jobs: Jobs in serialized form that should be updated with newer
-                 status from a shard.
-    @param hqes: Hostqueueentries in serialized form that should be updated with
-                 newer status from a shard. Note that for every hostqueueentry
-                 the corresponding job must be in jobs.
-    @param known_job_ids: List of ids of jobs the shard already has.
-    @param known_host_ids: List of ids of hosts the shard already has.
-    @param known_host_statuses: List of statuses of hosts the shard already has.
-
-    @returns: Serialized representations of hosts, jobs, suite job keyvals
-              and their dependencies to be inserted into a shard's database.
-    """
-    # The following alternatives to sending host and job ids in every heartbeat
-    # have been considered:
-    # 1. Sending the highest known job and host ids. This would work for jobs:
-    #    Newer jobs always have larger ids. Also, if a job is not assigned to a
-    #    particular shard during a heartbeat, it never will be assigned to this
-    #    shard later.
-    #    This is not true for hosts though: A host that is leased won't be sent
-    #    to the shard now, but might be sent in a future heartbeat. This means
-    #    sometimes hosts should be transfered that have a lower id than the
-    #    maximum host id the shard knows.
-    # 2. Send the number of jobs/hosts the shard knows to the master in each
-    #    heartbeat. Compare these to the number of records that already have
-    #    the shard_id set to this shard. In the normal case, they should match.
-    #    In case they don't, resend all entities of that type.
-    #    This would work well for hosts, because there aren't that many.
-    #    Resending all jobs is quite a big overhead though.
-    #    Also, this approach might run into edge cases when entities are
-    #    ever deleted.
-    # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
-    #    Using two different approaches isn't consistent and might cause
-    #    confusion. Also the issues with the case of deletions might still
-    #    occur.
-    #
-    # The overhead of sending all job and host ids in every heartbeat is low:
-    # At peaks one board has about 1200 created but unfinished jobs.
-    # See the numbers here: http://goo.gl/gQCGWH
-    # Assuming that job id's have 6 digits and that json serialization takes a
-    # comma and a space as overhead, the traffic per id sent is about 8 bytes.
-    # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
-    # A NOT IN query with 5000 ids took about 30ms in tests made.
-    # These numbers seem low enough to outweigh the disadvantages of the
-    # solutions described above.
-    timer = autotest_stats.Timer('shard_heartbeat')
-    with timer:
-        shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
-        rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
-        assert len(known_host_ids) == len(known_host_statuses)
-        for i in range(len(known_host_ids)):
-            host_model = models.Host.objects.get(pk=known_host_ids[i])
-            if host_model.status != known_host_statuses[i]:
-                host_model.status = known_host_statuses[i]
-                host_model.save()
-
-        hosts, jobs, suite_keyvals = rpc_utils.find_records_for_shard(
-                shard_obj, known_job_ids=known_job_ids,
-                known_host_ids=known_host_ids)
-        return {
-            'hosts': [host.serialize() for host in hosts],
-            'jobs': [job.serialize() for job in jobs],
-            'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
-        }
-
-
-def get_shards(**filter_data):
-    """Return a list of all shards.
-
-    @returns A sequence of nested dictionaries of shard information.
-    """
-    shards = models.Shard.query_objects(filter_data)
-    serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
-    for serialized, shard in zip(serialized_shards, shards):
-        serialized['labels'] = [label.name for label in shard.labels.all()]
-
-    return serialized_shards
-
-
-def _assign_board_to_shard_precheck(labels):
-    """Verify whether board labels are valid to be added to a given shard.
-
-    First check whether board label is in correct format. Second, check whether
-    the board label exist. Third, check whether the board has already been
-    assigned to shard.
-
-    @param labels: Board labels separated by comma.
-
-    @raises error.RPCException: If label provided doesn't start with `board:`
-            or board has been added to shard already.
-    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
-
-    @returns: A list of label models that ready to be added to shard.
-    """
-    labels = labels.split(',')
-    label_models = []
-    for label in labels:
-        # Check whether the board label is in correct format.
-        if not label.startswith('board:'):
-            raise error.RPCException('Sharding only supports `board:.*` label.')
-        # Check whether the board label exist. If not, exception will be thrown
-        # by smart_get function.
-        label = models.Label.smart_get(label)
-        label_id = models.Label.list_objects({'name':label})[0].get('id')
-        # Check whether the board has been sharded already
-        try:
-            shard = models.Shard.objects.get(labels=label)
-            raise error.RPCException(
-                    '%s is already on shard %s' % (label, shard.hostname))
-        except models.Shard.DoesNotExist as e:
-            # board is not on any shard, so it's valid.
-            label_models.append(label)
-    return label_models
-
-
-def add_shard(hostname, labels):
-    """Add a shard and start running jobs on it.
-
-    @param hostname: The hostname of the shard to be added; needs to be unique.
-    @param labels: Board labels separated by comma. Jobs of one of the labels
-                   will be assigned to the shard.
-
-    @raises error.RPCException: If label provided doesn't start with `board:` or
-            board has been added to shard already.
-    @raises model_logic.ValidationError: If a shard with the given hostname
-            already exist.
-    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
-
-    @returns: The id of the added shard.
-    """
-    labels = _assign_board_to_shard_precheck(labels)
-    shard = models.Shard.add_object(hostname=hostname)
-    for label in labels:
-        shard.labels.add(label)
-    return shard.id
-
-
-def add_board_to_shard(hostname, labels):
-    """Add boards to a given shard
-
-    @param hostname: The hostname of the shard to be changed.
-    @param labels: Board labels separated by comma.
-
-    @raises error.RPCException: If label provided doesn't start with `board:` or
-            board has been added to shard already.
-    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
-
-    @returns: The id of the changed shard.
-    """
-    labels = _assign_board_to_shard_precheck(labels)
-    shard = models.Shard.objects.get(hostname=hostname)
-    for label in labels:
-        shard.labels.add(label)
-    return shard.id
-
-
-def delete_shard(hostname):
-    """Delete a shard and reclaim all resources from it.
-
-    This claims back all assigned hosts from the shard. To ensure all DUTs are
-    in a sane state, a Reboot task with highest priority is scheduled for them.
-    This reboots the DUTs and then all left tasks continue to run in drone of
-    the master.
-
-    The procedure for deleting a shard:
-        * Lock all unlocked hosts on that shard.
-        * Remove shard information .
-        * Assign a reboot task with highest priority to these hosts.
-        * Unlock these hosts, then, the reboot tasks run in front of all other
-        tasks.
-
-    The status of jobs that haven't been reported to be finished yet, will be
-    lost. The master scheduler will pick up the jobs and execute them.
-
-    @param hostname: Hostname of the shard to delete.
-    """
-    shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
-    hostnames_to_lock = [h.hostname for h in
-                         models.Host.objects.filter(shard=shard, locked=False)]
-
-    # TODO(beeps): Power off shard
-    # For ChromeOS hosts, a reboot test with the highest priority is added to
-    # the DUT. After a reboot it should be ganranteed that no processes from
-    # prior tests that were run by a shard are still running on.
-
-    # Lock all unlocked hosts.
-    dicts = {'locked': True, 'lock_time': datetime.datetime.now()}
-    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
-
-    # Remove shard information.
-    models.Host.objects.filter(shard=shard).update(shard=None)
-    models.Job.objects.filter(shard=shard).update(shard=None)
-    shard.labels.clear()
-    shard.delete()
-
-    # Assign a reboot task with highest priority: Super.
-    t = models.Test.objects.get(name='platform_BootPerfServer:shard')
-    c = utils.read_file(os.path.join(common.autotest_dir, t.path))
-    if hostnames_to_lock:
-        rpc_utils.create_job_common(
-                'reboot_dut_for_shard_deletion',
-                priority=priorities.Priority.SUPER,
-                control_type='Server',
-                control_file=c, hosts=hostnames_to_lock)
-
-    # Unlock these shard-related hosts.
-    dicts = {'locked': False, 'lock_time': None}
-    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
-
-
-def get_servers(hostname=None, role=None, status=None):
-    """Get a list of servers with matching role and status.
-
-    @param hostname: FQDN of the server.
-    @param role: Name of the server role, e.g., drone, scheduler. Default to
-                 None to match any role.
-    @param status: Status of the server, e.g., primary, backup, repair_required.
-                   Default to None to match any server status.
-
-    @raises error.RPCException: If server database is not used.
-    @return: A list of server names for servers with matching role and status.
-    """
-    if not server_manager_utils.use_server_db():
-        raise error.RPCException('Server database is not enabled. Please try '
-                                 'retrieve servers from global config.')
-    servers = server_manager_utils.get_servers(hostname=hostname, role=role,
-                                               status=status)
-    return [s.get_details() for s in servers]
-
-
-@rpc_utils.route_rpc_to_master
-def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
-    """Get stable version for the given board.
-
-    @param board: Name of the board.
-    @param android: If True, the given board is an Android-based device. If
-                    False, assume its a Chrome OS-based device.
-
-    @return: Stable version of the given board. Return global configure value
-             of CROS.stable_cros_version if stable_versinos table does not have
-             entry of board DEFAULT.
-    """
-    return stable_version_utils.get(board=board, android=android)
-
-
-@rpc_utils.route_rpc_to_master
-def get_all_stable_versions():
-    """Get stable versions for all boards.
-
-    @return: A dictionary of board:version.
-    """
-    return stable_version_utils.get_all()
-
-
-@rpc_utils.route_rpc_to_master
-def set_stable_version(version, board=stable_version_utils.DEFAULT):
-    """Modify stable version for the given board.
-
-    @param version: The new value of stable version for given board.
-    @param board: Name of the board, default to value `DEFAULT`.
-    """
-    stable_version_utils.set(version=version, board=board)
-
-
-@rpc_utils.route_rpc_to_master
-def delete_stable_version(board):
-    """Modify stable version for the given board.
-
-    Delete a stable version entry in afe_stable_versions table for a given
-    board, so default stable version will be used.
-
-    @param board: Name of the board.
-    """
-    stable_version_utils.delete(board=board)
-
-
-def _initialize_control_file_getter(build):
-    """Get the remote control file getter.
-
-    @param build: unique name by which to refer to a remote build image.
-
-    @return: A control file getter object.
-    """
-    # Stage the test artifacts.
-    try:
-        ds = dev_server.ImageServer.resolve(build)
-        ds_name = ds.hostname
-        build = ds.translate(build)
-    except dev_server.DevServerException as e:
-        raise ValueError('Could not resolve build %s: %s' %
-                         (build, e))
-
-    try:
-        ds.stage_artifacts(image=build, artifacts=['test_suites'])
-    except dev_server.DevServerException as e:
-        raise error.StageControlFileFailure(
-                'Failed to stage %s on %s: %s' % (build, ds_name, e))
-
-    # Collect the control files specified in this build
-    return control_file_getter.DevServerGetter.create(build, ds)
-
-
-def get_tests_by_build(build, ignore_invalid_tests=True):
-    """Get the tests that are available for the specified build.
-
-    @param build: unique name by which to refer to the image.
-    @param ignore_invalid_tests: flag on if unparsable tests are ignored.
-
-    @return: A sorted list of all tests that are in the build specified.
-    """
-    # Collect the control files specified in this build
-    cfile_getter = _initialize_control_file_getter(build)
-    if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
-        control_file_info_list = cfile_getter.get_suite_info()
-        control_file_list = control_file_info_list.keys()
-    else:
-        control_file_list = cfile_getter.get_control_file_list()
-
-    test_objects = []
-    _id = 0
-    for control_file_path in control_file_list:
-        # Read and parse the control file
-        if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
-            control_file = control_file_info_list[control_file_path]
-        else:
-            control_file = cfile_getter.get_control_file_contents(
-                    control_file_path)
-        try:
-            control_obj = control_data.parse_control_string(control_file)
-        except:
-            logging.info('Failed to parse control file: %s', control_file_path)
-            if not ignore_invalid_tests:
-                raise
-
-        # Extract the values needed for the AFE from the control_obj.
-        # The keys list represents attributes in the control_obj that
-        # are required by the AFE
-        keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
-                'test_category', 'test_class', 'dependencies', 'run_verify',
-                'sync_count', 'job_retries', 'retries', 'path']
-
-        test_object = {}
-        for key in keys:
-            test_object[key] = getattr(control_obj, key) if hasattr(
-                    control_obj, key) else ''
-
-        # Unfortunately, the AFE expects different key-names for certain
-        # values, these must be corrected to avoid the risk of tests
-        # being omitted by the AFE.
-        # The 'id' is an additional value used in the AFE.
-        # The control_data parsing does not reference 'run_reset', but it
-        # is also used in the AFE and defaults to True.
-        test_object['id'] = _id
-        test_object['run_reset'] = True
-        test_object['description'] = test_object.get('doc', '')
-        test_object['test_time'] = test_object.get('time', 0)
-        test_object['test_retry'] = test_object.get('retries', 0)
-
-        # Fix the test name to be consistent with the current presentation
-        # of test names in the AFE.
-        testpath, subname = os.path.split(control_file_path)
-        testname = os.path.basename(testpath)
-        subname = subname.split('.')[1:]
-        if subname:
-            testname = '%s:%s' % (testname, ':'.join(subname))
-
-        test_object['name'] = testname
-
-        # Correct the test path as parse_control_string sets an empty string.
-        test_object['path'] = control_file_path
-
-        _id += 1
-        test_objects.append(test_object)
-
-    test_objects = sorted(test_objects, key=lambda x: x.get('name'))
-    return rpc_utils.prepare_for_serialization(test_objects)
-
-
-def get_test_control_files_by_build(tests, build, ignore_invalid_tests=False):
-    """Get the test control files that are available for the specified build.
-
-    @param tests A sequence of test objects to run.
-    @param build: unique name by which to refer to the image.
-    @param ignore_invalid_tests: flag on if unparsable tests are ignored.
-
-    @return: A sorted list of all tests that are in the build specified.
-    """
-    raw_control_files = []
-    # shortcut to avoid staging the image.
-    if not tests:
-        return raw_control_files
-
-    cfile_getter = _initialize_control_file_getter(build)
-    if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
-        control_file_info_list = cfile_getter.get_suite_info()
-
-    for test in tests:
-        # Read and parse the control file
-        if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
-            control_file = control_file_info_list[test.path]
-        else:
-            control_file = cfile_getter.get_control_file_contents(
-                    test.path)
-        raw_control_files.append(control_file)
-    return raw_control_files
diff --git a/frontend/afe/site_rpc_interface_unittest.py b/frontend/afe/site_rpc_interface_unittest.py
deleted file mode 100755
index 2421d00..0000000
--- a/frontend/afe/site_rpc_interface_unittest.py
+++ /dev/null
@@ -1,681 +0,0 @@
-#!/usr/bin/python
-#
-# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Unit tests for frontend/afe/site_rpc_interface.py."""
-
-
-import datetime
-import mox
-import unittest
-
-import common
-
-from autotest_lib.client.common_lib import control_data
-from autotest_lib.client.common_lib import error
-from autotest_lib.client.common_lib import priorities
-from autotest_lib.client.common_lib.cros import dev_server
-from autotest_lib.frontend import setup_django_environment
-from autotest_lib.frontend.afe import frontend_test_utils
-from autotest_lib.frontend.afe import models
-from autotest_lib.frontend.afe import model_logic
-from autotest_lib.frontend.afe import rpc_utils
-from autotest_lib.frontend.afe import rpc_interface
-from autotest_lib.frontend.afe import site_rpc_interface
-from autotest_lib.server import utils
-from autotest_lib.server.cros import provision
-from autotest_lib.server.cros.dynamic_suite import constants
-from autotest_lib.server.cros.dynamic_suite import control_file_getter
-
-
-CLIENT = control_data.CONTROL_TYPE_NAMES.CLIENT
-SERVER = control_data.CONTROL_TYPE_NAMES.SERVER
-
-
-class SiteRpcInterfaceTest(mox.MoxTestBase,
-                           frontend_test_utils.FrontendTestMixin):
-    """Unit tests for functions in site_rpc_interface.py.
-
-    @var _NAME: fake suite name.
-    @var _BOARD: fake board to reimage.
-    @var _BUILD: fake build with which to reimage.
-    @var _PRIORITY: fake priority with which to reimage.
-    """
-    _NAME = 'name'
-    _BOARD = 'link'
-    _BUILD = 'link-release/R36-5812.0.0'
-    _BUILDS = {provision.CROS_VERSION_PREFIX: _BUILD}
-    _PRIORITY = priorities.Priority.DEFAULT
-    _TIMEOUT = 24
-
-
-    def setUp(self):
-        super(SiteRpcInterfaceTest, self).setUp()
-        self._SUITE_NAME = site_rpc_interface.canonicalize_suite_name(
-            self._NAME)
-        self.dev_server = self.mox.CreateMock(dev_server.ImageServer)
-        self._frontend_common_setup(fill_data=False)
-
-
-    def tearDown(self):
-        self._frontend_common_teardown()
-
-
-    def _setupDevserver(self):
-        self.mox.StubOutClassWithMocks(dev_server, 'ImageServer')
-        dev_server.resolve(self._BUILD).AndReturn(self.dev_server)
-
-
-    def _mockDevServerGetter(self, get_control_file=True):
-        self._setupDevserver()
-        if get_control_file:
-          self.getter = self.mox.CreateMock(
-              control_file_getter.DevServerGetter)
-          self.mox.StubOutWithMock(control_file_getter.DevServerGetter,
-                                   'create')
-          control_file_getter.DevServerGetter.create(
-              mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(self.getter)
-
-
-    def _mockRpcUtils(self, to_return, control_file_substring=''):
-        """Fake out the autotest rpc_utils module with a mockable class.
-
-        @param to_return: the value that rpc_utils.create_job_common() should
-                          be mocked out to return.
-        @param control_file_substring: A substring that is expected to appear
-                                       in the control file output string that
-                                       is passed to create_job_common.
-                                       Default: ''
-        """
-        download_started_time = constants.DOWNLOAD_STARTED_TIME
-        payload_finished_time = constants.PAYLOAD_FINISHED_TIME
-        self.mox.StubOutWithMock(rpc_utils, 'create_job_common')
-        rpc_utils.create_job_common(mox.And(mox.StrContains(self._NAME),
-                                    mox.StrContains(self._BUILD)),
-                            priority=self._PRIORITY,
-                            timeout_mins=self._TIMEOUT*60,
-                            max_runtime_mins=self._TIMEOUT*60,
-                            control_type='Server',
-                            control_file=mox.And(mox.StrContains(self._BOARD),
-                                                 mox.StrContains(self._BUILD),
-                                                 mox.StrContains(
-                                                     control_file_substring)),
-                            hostless=True,
-                            keyvals=mox.And(mox.In(download_started_time),
-                                            mox.In(payload_finished_time))
-                            ).AndReturn(to_return)
-
-
-    def testStageBuildFail(self):
-        """Ensure that a failure to stage the desired build fails the RPC."""
-        self._setupDevserver()
-
-        self.dev_server.hostname = 'mox_url'
-        self.dev_server.stage_artifacts(
-                image=self._BUILD, artifacts=['test_suites']).AndRaise(
-                dev_server.DevServerException())
-        self.mox.ReplayAll()
-        self.assertRaises(error.StageControlFileFailure,
-                          site_rpc_interface.create_suite_job,
-                          name=self._NAME,
-                          board=self._BOARD,
-                          builds=self._BUILDS,
-                          pool=None)
-
-
-    def testGetControlFileFail(self):
-        """Ensure that a failure to get needed control file fails the RPC."""
-        self._mockDevServerGetter()
-
-        self.dev_server.hostname = 'mox_url'
-        self.dev_server.stage_artifacts(
-                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
-
-        self.getter.get_control_file_contents_by_name(
-            self._SUITE_NAME).AndReturn(None)
-        self.mox.ReplayAll()
-        self.assertRaises(error.ControlFileEmpty,
-                          site_rpc_interface.create_suite_job,
-                          name=self._NAME,
-                          board=self._BOARD,
-                          builds=self._BUILDS,
-                          pool=None)
-
-
-    def testGetControlFileListFail(self):
-        """Ensure that a failure to get needed control file fails the RPC."""
-        self._mockDevServerGetter()
-
-        self.dev_server.hostname = 'mox_url'
-        self.dev_server.stage_artifacts(
-                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
-
-        self.getter.get_control_file_contents_by_name(
-            self._SUITE_NAME).AndRaise(error.NoControlFileList())
-        self.mox.ReplayAll()
-        self.assertRaises(error.NoControlFileList,
-                          site_rpc_interface.create_suite_job,
-                          name=self._NAME,
-                          board=self._BOARD,
-                          builds=self._BUILDS,
-                          pool=None)
-
-
-    def testBadNumArgument(self):
-        """Ensure we handle bad values for the |num| argument."""
-        self.assertRaises(error.SuiteArgumentException,
-                          site_rpc_interface.create_suite_job,
-                          name=self._NAME,
-                          board=self._BOARD,
-                          builds=self._BUILDS,
-                          pool=None,
-                          num='goo')
-        self.assertRaises(error.SuiteArgumentException,
-                          site_rpc_interface.create_suite_job,
-                          name=self._NAME,
-                          board=self._BOARD,
-                          builds=self._BUILDS,
-                          pool=None,
-                          num=[])
-        self.assertRaises(error.SuiteArgumentException,
-                          site_rpc_interface.create_suite_job,
-                          name=self._NAME,
-                          board=self._BOARD,
-                          builds=self._BUILDS,
-                          pool=None,
-                          num='5')
-
-
-
-    def testCreateSuiteJobFail(self):
-        """Ensure that failure to schedule the suite job fails the RPC."""
-        self._mockDevServerGetter()
-
-        self.dev_server.hostname = 'mox_url'
-        self.dev_server.stage_artifacts(
-                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
-
-        self.getter.get_control_file_contents_by_name(
-            self._SUITE_NAME).AndReturn('f')
-
-        self.dev_server.url().AndReturn('mox_url')
-        self._mockRpcUtils(-1)
-        self.mox.ReplayAll()
-        self.assertEquals(
-            site_rpc_interface.create_suite_job(name=self._NAME,
-                                                board=self._BOARD,
-                                                builds=self._BUILDS, pool=None),
-            -1)
-
-
-    def testCreateSuiteJobSuccess(self):
-        """Ensures that success results in a successful RPC."""
-        self._mockDevServerGetter()
-
-        self.dev_server.hostname = 'mox_url'
-        self.dev_server.stage_artifacts(
-                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
-
-        self.getter.get_control_file_contents_by_name(
-            self._SUITE_NAME).AndReturn('f')
-
-        self.dev_server.url().AndReturn('mox_url')
-        job_id = 5
-        self._mockRpcUtils(job_id)
-        self.mox.ReplayAll()
-        self.assertEquals(
-            site_rpc_interface.create_suite_job(name=self._NAME,
-                                                board=self._BOARD,
-                                                builds=self._BUILDS,
-                                                pool=None),
-            job_id)
-
-
-    def testCreateSuiteJobNoHostCheckSuccess(self):
-        """Ensures that success results in a successful RPC."""
-        self._mockDevServerGetter()
-
-        self.dev_server.hostname = 'mox_url'
-        self.dev_server.stage_artifacts(
-                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
-
-        self.getter.get_control_file_contents_by_name(
-            self._SUITE_NAME).AndReturn('f')
-
-        self.dev_server.url().AndReturn('mox_url')
-        job_id = 5
-        self._mockRpcUtils(job_id)
-        self.mox.ReplayAll()
-        self.assertEquals(
-          site_rpc_interface.create_suite_job(name=self._NAME,
-                                              board=self._BOARD,
-                                              builds=self._BUILDS,
-                                              pool=None, check_hosts=False),
-          job_id)
-
-    def testCreateSuiteIntegerNum(self):
-        """Ensures that success results in a successful RPC."""
-        self._mockDevServerGetter()
-
-        self.dev_server.hostname = 'mox_url'
-        self.dev_server.stage_artifacts(
-                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
-
-        self.getter.get_control_file_contents_by_name(
-            self._SUITE_NAME).AndReturn('f')
-
-        self.dev_server.url().AndReturn('mox_url')
-        job_id = 5
-        self._mockRpcUtils(job_id, control_file_substring='num=17')
-        self.mox.ReplayAll()
-        self.assertEquals(
-            site_rpc_interface.create_suite_job(name=self._NAME,
-                                                board=self._BOARD,
-                                                builds=self._BUILDS,
-                                                pool=None,
-                                                check_hosts=False,
-                                                num=17),
-            job_id)
-
-
-    def testCreateSuiteJobControlFileSupplied(self):
-        """Ensure we can supply the control file to create_suite_job."""
-        self._mockDevServerGetter(get_control_file=False)
-
-        self.dev_server.hostname = 'mox_url'
-        self.dev_server.stage_artifacts(
-                image=self._BUILD, artifacts=['test_suites']).AndReturn(True)
-        self.dev_server.url().AndReturn('mox_url')
-        job_id = 5
-        self._mockRpcUtils(job_id)
-        self.mox.ReplayAll()
-        self.assertEquals(
-            site_rpc_interface.create_suite_job(name='%s/%s' % (self._NAME,
-                                                                self._BUILD),
-                                                board=None,
-                                                builds=self._BUILDS,
-                                                pool=None,
-                                                control_file='CONTROL FILE'),
-            job_id)
-
-
-    def _get_records_for_sending_to_master(self):
-        return [{'control_file': 'foo',
-                 'control_type': 1,
-                 'created_on': datetime.datetime(2014, 8, 21),
-                 'drone_set': None,
-                 'email_list': '',
-                 'max_runtime_hrs': 72,
-                 'max_runtime_mins': 1440,
-                 'name': 'dummy',
-                 'owner': 'autotest_system',
-                 'parse_failed_repair': True,
-                 'priority': 40,
-                 'reboot_after': 0,
-                 'reboot_before': 1,
-                 'run_reset': True,
-                 'run_verify': False,
-                 'synch_count': 0,
-                 'test_retry': 10,
-                 'timeout': 24,
-                 'timeout_mins': 1440,
-                 'id': 1
-                 }], [{
-                    'aborted': False,
-                    'active': False,
-                    'complete': False,
-                    'deleted': False,
-                    'execution_subdir': '',
-                    'finished_on': None,
-                    'started_on': None,
-                    'status': 'Queued',
-                    'id': 1
-                }]
-
-
-    def _do_heartbeat_and_assert_response(self, shard_hostname='shard1',
-                                          upload_jobs=(), upload_hqes=(),
-                                          known_jobs=(), known_hosts=(),
-                                          **kwargs):
-        known_job_ids = [job.id for job in known_jobs]
-        known_host_ids = [host.id for host in known_hosts]
-        known_host_statuses = [host.status for host in known_hosts]
-
-        retval = site_rpc_interface.shard_heartbeat(
-            shard_hostname=shard_hostname,
-            jobs=upload_jobs, hqes=upload_hqes,
-            known_job_ids=known_job_ids, known_host_ids=known_host_ids,
-            known_host_statuses=known_host_statuses)
-
-        self._assert_shard_heartbeat_response(shard_hostname, retval,
-                                              **kwargs)
-
-        return shard_hostname
-
-
-    def _assert_shard_heartbeat_response(self, shard_hostname, retval, jobs=[],
-                                         hosts=[], hqes=[]):
-
-        retval_hosts, retval_jobs = retval['hosts'], retval['jobs']
-
-        expected_jobs = [
-            (job.id, job.name, shard_hostname) for job in jobs]
-        returned_jobs = [(job['id'], job['name'], job['shard']['hostname'])
-                         for job in retval_jobs]
-        self.assertEqual(returned_jobs, expected_jobs)
-
-        expected_hosts = [(host.id, host.hostname) for host in hosts]
-        returned_hosts = [(host['id'], host['hostname'])
-                          for host in retval_hosts]
-        self.assertEqual(returned_hosts, expected_hosts)
-
-        retval_hqes = []
-        for job in retval_jobs:
-            retval_hqes += job['hostqueueentry_set']
-
-        expected_hqes = [(hqe.id) for hqe in hqes]
-        returned_hqes = [(hqe['id']) for hqe in retval_hqes]
-        self.assertEqual(returned_hqes, expected_hqes)
-
-
-    def _send_records_to_master_helper(
-        self, jobs, hqes, shard_hostname='host1',
-        exception_to_throw=error.UnallowedRecordsSentToMaster, aborted=False):
-        job_id = rpc_interface.create_job(
-                name='dummy',
-                priority=self._PRIORITY,
-                control_file='foo',
-                control_type=SERVER,
-                test_retry=10, hostless=True)
-        job = models.Job.objects.get(pk=job_id)
-        shard = models.Shard.objects.create(hostname='host1')
-        job.shard = shard
-        job.save()
-
-        if aborted:
-            job.hostqueueentry_set.update(aborted=True)
-            job.shard = None
-            job.save()
-
-        hqe = job.hostqueueentry_set.all()[0]
-        if not exception_to_throw:
-            self._do_heartbeat_and_assert_response(
-                shard_hostname=shard_hostname,
-                upload_jobs=jobs, upload_hqes=hqes)
-        else:
-            self.assertRaises(
-                exception_to_throw,
-                self._do_heartbeat_and_assert_response,
-                shard_hostname=shard_hostname,
-                upload_jobs=jobs, upload_hqes=hqes)
-
-
-    def testSendingRecordsToMaster(self):
-        """Send records to the master and ensure they are persisted."""
-        jobs, hqes = self._get_records_for_sending_to_master()
-        hqes[0]['status'] = 'Completed'
-        self._send_records_to_master_helper(
-            jobs=jobs, hqes=hqes, exception_to_throw=None)
-
-        # Check the entry was actually written to db
-        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
-                         'Completed')
-
-
-    def testSendingRecordsToMasterAbortedOnMaster(self):
-        """Send records to the master and ensure they are persisted."""
-        jobs, hqes = self._get_records_for_sending_to_master()
-        hqes[0]['status'] = 'Completed'
-        self._send_records_to_master_helper(
-            jobs=jobs, hqes=hqes, exception_to_throw=None, aborted=True)
-
-        # Check the entry was actually written to db
-        self.assertEqual(models.HostQueueEntry.objects.all()[0].status,
-                         'Completed')
-
-
-    def testSendingRecordsToMasterJobAssignedToDifferentShard(self):
-        """Ensure records that belong to a different shard are rejected."""
-        jobs, hqes = self._get_records_for_sending_to_master()
-        models.Shard.objects.create(hostname='other_shard')
-        self._send_records_to_master_helper(
-            jobs=jobs, hqes=hqes, shard_hostname='other_shard')
-
-
-    def testSendingRecordsToMasterJobHqeWithoutJob(self):
-        """Ensure update for hqe without update for it's job gets rejected."""
-        _, hqes = self._get_records_for_sending_to_master()
-        self._send_records_to_master_helper(
-            jobs=[], hqes=hqes)
-
-
-    def testSendingRecordsToMasterNotExistingJob(self):
-        """Ensure update for non existing job gets rejected."""
-        jobs, hqes = self._get_records_for_sending_to_master()
-        jobs[0]['id'] = 3
-
-        self._send_records_to_master_helper(
-            jobs=jobs, hqes=hqes)
-
-
-    def _createShardAndHostWithLabel(self, shard_hostname='shard1',
-                                     host_hostname='host1',
-                                     label_name='board:lumpy'):
-        label = models.Label.objects.create(name=label_name)
-
-        shard = models.Shard.objects.create(hostname=shard_hostname)
-        shard.labels.add(label)
-
-        host = models.Host.objects.create(hostname=host_hostname, leased=False)
-        host.labels.add(label)
-
-        return shard, host, label
-
-
-    def _createJobForLabel(self, label):
-        job_id = rpc_interface.create_job(name='dummy', priority=self._PRIORITY,
-                                          control_file='foo',
-                                          control_type=CLIENT,
-                                          meta_hosts=[label.name],
-                                          dependencies=(label.name,))
-        return models.Job.objects.get(id=job_id)
-
-
-    def testShardHeartbeatFetchHostlessJob(self):
-        """Create a hostless job and ensure it's not assigned to a shard."""
-        shard1, host1, lumpy_label = self._createShardAndHostWithLabel(
-            'shard1', 'host1', 'board:lumpy')
-
-        label2 = models.Label.objects.create(name='bluetooth', platform=False)
-
-        job1 = self._create_job(hostless=True)
-
-        # Hostless jobs should be executed by the global scheduler.
-        self._do_heartbeat_and_assert_response(hosts=[host1])
-
-
-    def testShardRetrieveJobs(self):
-        """Create jobs and retrieve them."""
-        # should never be returned by heartbeat
-        leased_host = models.Host.objects.create(hostname='leased_host',
-                                                 leased=True)
-
-        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
-        shard2, host2, grumpy_label = self._createShardAndHostWithLabel(
-            'shard2', 'host2', 'board:grumpy')
-
-        leased_host.labels.add(lumpy_label)
-
-        job1 = self._createJobForLabel(lumpy_label)
-
-        job2 = self._createJobForLabel(grumpy_label)
-
-        job_completed = self._createJobForLabel(lumpy_label)
-        # Job is already being run, so don't sync it
-        job_completed.hostqueueentry_set.update(complete=True)
-        job_completed.hostqueueentry_set.create(complete=False)
-
-        job_active = self._createJobForLabel(lumpy_label)
-        # Job is already started, so don't sync it
-        job_active.hostqueueentry_set.update(active=True)
-        job_active.hostqueueentry_set.create(complete=False, active=False)
-
-        self._do_heartbeat_and_assert_response(
-            jobs=[job1], hosts=[host1], hqes=job1.hostqueueentry_set.all())
-
-        self._do_heartbeat_and_assert_response(
-            shard_hostname=shard2.hostname,
-            jobs=[job2], hosts=[host2], hqes=job2.hostqueueentry_set.all())
-
-        host3 = models.Host.objects.create(hostname='host3', leased=False)
-        host3.labels.add(lumpy_label)
-
-        self._do_heartbeat_and_assert_response(
-            known_jobs=[job1], known_hosts=[host1], hosts=[host3])
-
-
-    def testResendJobsAfterFailedHeartbeat(self):
-        """Create jobs, retrieve them, fail on client, fetch them again."""
-        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
-
-        job1 = self._createJobForLabel(lumpy_label)
-
-        self._do_heartbeat_and_assert_response(
-            jobs=[job1],
-            hqes=job1.hostqueueentry_set.all(), hosts=[host1])
-
-        # Make sure it's resubmitted by sending last_job=None again
-        self._do_heartbeat_and_assert_response(
-            known_hosts=[host1],
-            jobs=[job1], hqes=job1.hostqueueentry_set.all(), hosts=[])
-
-        # Now it worked, make sure it's not sent again
-        self._do_heartbeat_and_assert_response(
-            known_jobs=[job1], known_hosts=[host1])
-
-        job1 = models.Job.objects.get(pk=job1.id)
-        job1.hostqueueentry_set.all().update(complete=True)
-
-        # Job is completed, make sure it's not sent again
-        self._do_heartbeat_and_assert_response(
-            known_hosts=[host1])
-
-        job2 = self._createJobForLabel(lumpy_label)
-
-        # job2's creation was later, it should be returned now.
-        self._do_heartbeat_and_assert_response(
-            known_hosts=[host1],
-            jobs=[job2], hqes=job2.hostqueueentry_set.all())
-
-        self._do_heartbeat_and_assert_response(
-            known_jobs=[job2], known_hosts=[host1])
-
-        job2 = models.Job.objects.get(pk=job2.pk)
-        job2.hostqueueentry_set.update(aborted=True)
-        # Setting a job to a complete status will set the shard_id to None in
-        # scheduler_models. We have to emulate that here, because we use Django
-        # models in tests.
-        job2.shard = None
-        job2.save()
-
-        self._do_heartbeat_and_assert_response(
-            known_jobs=[job2], known_hosts=[host1],
-            jobs=[job2],
-            hqes=job2.hostqueueentry_set.all())
-
-        models.Test.objects.create(name='platform_BootPerfServer:shard',
-                                   test_type=1)
-        self.mox.StubOutWithMock(utils, 'read_file')
-        utils.read_file(mox.IgnoreArg()).AndReturn('')
-        self.mox.ReplayAll()
-        site_rpc_interface.delete_shard(hostname=shard1.hostname)
-
-        self.assertRaises(
-            models.Shard.DoesNotExist, models.Shard.objects.get, pk=shard1.id)
-
-        job1 = models.Job.objects.get(pk=job1.id)
-        lumpy_label = models.Label.objects.get(pk=lumpy_label.id)
-        host1 = models.Host.objects.get(pk=host1.id)
-        super_job = models.Job.objects.get(priority=priorities.Priority.SUPER)
-        super_job_host = models.HostQueueEntry.objects.get(
-                job_id=super_job.id)
-
-        self.assertIsNone(job1.shard)
-        self.assertEqual(len(lumpy_label.shard_set.all()), 0)
-        self.assertIsNone(host1.shard)
-        self.assertIsNotNone(super_job)
-        self.assertEqual(super_job_host.host_id, host1.id)
-
-
-    def testCreateListShard(self):
-        """Retrieve a list of all shards."""
-        lumpy_label = models.Label.objects.create(name='board:lumpy',
-                                                  platform=True)
-        stumpy_label = models.Label.objects.create(name='board:stumpy',
-                                                  platform=True)
-        peppy_label = models.Label.objects.create(name='board:peppy',
-                                                  platform=True)
-
-        shard_id = site_rpc_interface.add_shard(
-            hostname='host1', labels='board:lumpy,board:stumpy')
-        self.assertRaises(error.RPCException,
-                          site_rpc_interface.add_shard,
-                          hostname='host1', labels='board:lumpy,board:stumpy')
-        self.assertRaises(model_logic.ValidationError,
-                          site_rpc_interface.add_shard,
-                          hostname='host1', labels='board:peppy')
-        shard = models.Shard.objects.get(pk=shard_id)
-        self.assertEqual(shard.hostname, 'host1')
-        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
-        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
-
-        self.assertEqual(site_rpc_interface.get_shards(),
-                         [{'labels': ['board:lumpy','board:stumpy'],
-                           'hostname': 'host1',
-                           'id': 1}])
-
-
-    def testAddBoardsToShard(self):
-        """Add boards to a given shard."""
-        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
-        stumpy_label = models.Label.objects.create(name='board:stumpy',
-                                                   platform=True)
-        shard_id = site_rpc_interface.add_board_to_shard(
-            hostname='shard1', labels='board:stumpy')
-        # Test whether raise exception when board label does not exist.
-        self.assertRaises(models.Label.DoesNotExist,
-                          site_rpc_interface.add_board_to_shard,
-                          hostname='shard1', labels='board:test')
-        # Test whether raise exception when board already sharded.
-        self.assertRaises(error.RPCException,
-                          site_rpc_interface.add_board_to_shard,
-                          hostname='shard1', labels='board:lumpy')
-        shard = models.Shard.objects.get(pk=shard_id)
-        self.assertEqual(shard.hostname, 'shard1')
-        self.assertEqual(shard.labels.values_list('pk')[0], (lumpy_label.id,))
-        self.assertEqual(shard.labels.values_list('pk')[1], (stumpy_label.id,))
-
-        self.assertEqual(site_rpc_interface.get_shards(),
-                         [{'labels': ['board:lumpy','board:stumpy'],
-                           'hostname': 'shard1',
-                           'id': 1}])
-
-
-    def testResendHostsAfterFailedHeartbeat(self):
-        """Check that master accepts resending updated records after failure."""
-        shard1, host1, lumpy_label = self._createShardAndHostWithLabel()
-
-        # Send the host
-        self._do_heartbeat_and_assert_response(hosts=[host1])
-
-        # Send it again because previous one didn't persist correctly
-        self._do_heartbeat_and_assert_response(hosts=[host1])
-
-        # Now it worked, make sure it isn't sent again
-        self._do_heartbeat_and_assert_response(known_hosts=[host1])
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/frontend/afe/views.py b/frontend/afe/views.py
index 5c59aad..b2acfeb 100644
--- a/frontend/afe/views.py
+++ b/frontend/afe/views.py
@@ -8,17 +8,13 @@
 from autotest_lib.frontend.afe import models, rpc_handler, rpc_interface
 from autotest_lib.frontend.afe import rpc_utils
 
-site_rpc_interface = utils.import_site_module(
-        __file__, 'autotest_lib.frontend.afe.site_rpc_interface',
-        dummy=object())
-
 moblab_rpc_interface = utils.import_site_module(
         __file__, 'autotest_lib.frontend.afe.moblab_rpc_interface',
         dummy=object())
 
-# since site_rpc_interface is later in the list, its methods will override those
-# of rpc_interface
-rpc_handler_obj = rpc_handler.RpcHandler((rpc_interface, site_rpc_interface,
+# since moblab_rpc_interface is later in the list, its methods will
+# override those of rpc_interface
+rpc_handler_obj = rpc_handler.RpcHandler((rpc_interface,
                                           moblab_rpc_interface),
                                          document_module=rpc_interface)