[autotest] Remove site_rpc_interface

BUG=chromium:672727
TEST=Run unittests

Change-Id: I8535b121ba7f2317cccec981bf2a8a36ca2639cf
Reviewed-on: https://chromium-review.googlesource.com/435850
Commit-Ready: Allen Li <ayatane@chromium.org>
Tested-by: Allen Li <ayatane@chromium.org>
Reviewed-by: Prathmesh Prabhu <pprabhu@chromium.org>
Reviewed-by: Allen Li <ayatane@chromium.org>
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index 6b4c8e9..8f5ae7b 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -34,30 +34,49 @@
 import ast
 import datetime
 import logging
+import os
 import sys
 
 from django.db.models import Count
+
 import common
-from autotest_lib.client.common_lib import control_data
-from autotest_lib.client.common_lib import priorities
 # TODO(akeshet): Replace with monarch stats once we know how to instrument rpc
 # server with ts_mon.
 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
+from autotest_lib.client.common_lib import control_data
+from autotest_lib.client.common_lib import error
+from autotest_lib.client.common_lib import global_config
+from autotest_lib.client.common_lib import priorities
+from autotest_lib.client.common_lib import time_utils
+from autotest_lib.client.common_lib.cros import dev_server
 from autotest_lib.frontend.afe import control_file as control_file_lib
+from autotest_lib.frontend.afe import model_attributes
+from autotest_lib.frontend.afe import model_logic
+from autotest_lib.frontend.afe import models
 from autotest_lib.frontend.afe import rpc_utils
-from autotest_lib.frontend.afe import models, model_logic, model_attributes
-from autotest_lib.frontend.afe import site_rpc_interface
 from autotest_lib.frontend.tko import models as tko_models
 from autotest_lib.frontend.tko import rpc_interface as tko_rpc_interface
 from autotest_lib.server import frontend
 from autotest_lib.server import utils
 from autotest_lib.server.cros import provision
+from autotest_lib.server.cros.dynamic_suite import constants
+from autotest_lib.server.cros.dynamic_suite import control_file_getter
+from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
 from autotest_lib.server.cros.dynamic_suite import tools
+from autotest_lib.server.cros.dynamic_suite.suite import Suite
 from autotest_lib.server.lib import status_history
+from autotest_lib.site_utils import host_history
+from autotest_lib.site_utils import job_history
+from autotest_lib.site_utils import server_manager_utils
+from autotest_lib.site_utils import stable_version_utils
 
 
 _timer = autotest_stats.Timer('rpc_interface')
 
+_CONFIG = global_config.global_config
+
+# Relevant CrosDynamicSuiteExceptions are defined in client/common_lib/error.py.
+
 def get_parameterized_autoupdate_image_url(job):
     """Get the parameterized autoupdate image url from a parameterized job."""
     known_test_obj = models.Test.smart_get('autoupdate_ParameterizedJob')
@@ -628,8 +647,8 @@
 def get_tests_status_counts_by_job_name_label(job_name_prefix, label_name):
     """Gets the counts of all passed and failed tests from the matching jobs.
 
-    @param job_name_prefix: Name prefix of the jobs to get the summary from, e.g.,
-            'butterfly-release/R40-6457.21.0/bvt-cq/'.
+    @param job_name_prefix: Name prefix of the jobs to get the summary
+           from, e.g., 'butterfly-release/R40-6457.21.0/bvt-cq/'.
     @param label_name: Label that must be set in the jobs, e.g.,
             'cros-version:butterfly-release/R40-6457.21.0'.
 
@@ -973,7 +992,7 @@
             builds[provision.FW_RW_VERSION_PREFIX] = firmware_rw_build
         if firmware_ro_build:
             builds[provision.FW_RO_VERSION_PREFIX] = firmware_ro_build
-        return site_rpc_interface.create_suite_job(
+        return create_suite_job(
                 name=name, control_file=control_file, priority=priority,
                 builds=builds, test_source_build=test_source_build,
                 is_cloning=is_cloning, **kwargs)
@@ -1688,7 +1707,6 @@
             informative description.
     """
 
-    job_fields = models.Job.get_field_dict()
     default_drone_set_name = models.DroneSet.default_drone_set_name()
     drone_sets = ([default_drone_set_name] +
                   sorted(drone_set.name for drone_set in
@@ -1697,7 +1715,6 @@
 
     result = {}
     result['priorities'] = priorities.Priority.choices()
-    default_priority = priorities.Priority.DEFAULT
     result['default_priority'] = 'Default'
     result['max_schedulable_priority'] = priorities.Priority.DEFAULT
     result['users'] = get_users(sort_by=['login'])
@@ -1770,3 +1787,683 @@
     hosts = models.HostAttribute.query_objects({'attribute': attribute,
                                                 'value': value})
     return [row.host.hostname for row in hosts if row.host.invalid == 0]
+
+
+def canonicalize_suite_name(suite_name):
+    """Canonicalize the suite's name.
+
+    @param suite_name: the name of the suite.
+    """
+    # Do not change this naming convention without updating
+    # site_utils.parse_job_name.
+    return 'test_suites/control.%s' % suite_name
+
+
+def formatted_now():
+    """Format the current datetime."""
+    return datetime.datetime.now().strftime(time_utils.TIME_FMT)
+
+
+def _get_control_file_by_build(build, ds, suite_name):
+    """Return control file contents for |suite_name|.
+
+    Query the dev server at |ds| for the control file |suite_name|, included
+    in |build| for |board|.
+
+    @param build: unique name by which to refer to the image from now on.
+    @param ds: a dev_server.DevServer instance to fetch control file with.
+    @param suite_name: canonicalized suite name, e.g. test_suites/control.bvt.
+    @raises ControlFileNotFound if a unique suite control file doesn't exist.
+    @raises NoControlFileList if we can't list the control files at all.
+    @raises ControlFileEmpty if the control file exists on the server, but
+                             can't be read.
+
+    @return the contents of the desired control file.
+    """
+    getter = control_file_getter.DevServerGetter.create(build, ds)
+    devserver_name = ds.hostname
+    timer = autotest_stats.Timer('control_files.parse.%s.%s' %
+                                 (devserver_name.replace('.', '_'),
+                                  suite_name.rsplit('.')[-1]))
+    # Get the control file for the suite.
+    try:
+        with timer:
+            control_file_in = getter.get_control_file_contents_by_name(
+                    suite_name)
+    except error.CrosDynamicSuiteException as e:
+        raise type(e)('Failed to get control file for %s '
+                      '(devserver: %s) (error: %s)' %
+                      (build, devserver_name, e))
+    if not control_file_in:
+        raise error.ControlFileEmpty(
+            "Fetching %s returned no data. (devserver: %s)" %
+            (suite_name, devserver_name))
+    # Force control files to only contain ascii characters.
+    try:
+        control_file_in.encode('ascii')
+    except UnicodeDecodeError as e:
+        raise error.ControlFileMalformed(str(e))
+
+    return control_file_in
+
+
+def _get_control_file_by_suite(suite_name):
+    """Get control file contents by suite name.
+
+    @param suite_name: Suite name as string.
+    @returns: Control file contents as string.
+    """
+    getter = control_file_getter.FileSystemGetter(
+        [_CONFIG.get_config_value('SCHEDULER',
+                                  'drone_installation_directory')])
+    return getter.get_control_file_contents_by_name(suite_name)
+
+
+def _stage_build_artifacts(build, hostname=None):
+    """
+    Ensure components of |build| necessary for installing images are staged.
+
+    @param build image we want to stage.
+    @param hostname hostname of a dut may run test on. This is to help to locate
+        a devserver closer to duts if needed. Default is None.
+
+    @raises StageControlFileFailure: if the dev server throws 500 while staging
+        suite control files.
+
+    @return: dev_server.ImageServer instance to use with this build.
+    @return: timings dictionary containing staging start/end times.
+    """
+    timings = {}
+    # Ensure components of |build| necessary for installing images are staged
+    # on the dev server. However set synchronous to False to allow other
+    # components to be downloaded in the background.
+    ds = dev_server.resolve(build, hostname=hostname)
+    ds_name = ds.hostname
+    timings[constants.DOWNLOAD_STARTED_TIME] = formatted_now()
+    timer = autotest_stats.Timer('control_files.stage.%s' % (
+            ds_name.replace('.', '_')))
+    try:
+        with timer:
+            ds.stage_artifacts(image=build, artifacts=['test_suites'])
+    except dev_server.DevServerException as e:
+        raise error.StageControlFileFailure(
+                "Failed to stage %s on %s: %s" % (build, ds_name, e))
+    timings[constants.PAYLOAD_FINISHED_TIME] = formatted_now()
+    return (ds, timings)
+
+
+@rpc_utils.route_rpc_to_master
+def create_suite_job(
+        name='',
+        board='',
+        pool='',
+        control_file='',
+        check_hosts=True,
+        num=None,
+        file_bugs=False,
+        timeout=24,
+        timeout_mins=None,
+        priority=priorities.Priority.DEFAULT,
+        suite_args=None,
+        wait_for_results=True,
+        job_retry=False,
+        max_retries=None,
+        max_runtime_mins=None,
+        suite_min_duts=0,
+        offload_failures_only=False,
+        builds=None,
+        test_source_build=None,
+        run_prod_code=False,
+        delay_minutes=0,
+        is_cloning=False,
+        **kwargs
+):
+    """
+    Create a job to run a test suite on the given device with the given image.
+
+    When the timeout specified in the control file is reached, the
+    job is guaranteed to have completed and results will be available.
+
+    @param name: The test name if control_file is supplied, otherwise the name
+                 of the test suite to run, e.g. 'bvt'.
+    @param board: the kind of device to run the tests on.
+    @param builds: the builds to install e.g.
+                   {'cros-version:': 'x86-alex-release/R18-1655.0.0',
+                    'fwrw-version:':  'x86-alex-firmware/R36-5771.50.0',
+                    'fwro-version:':  'x86-alex-firmware/R36-5771.49.0'}
+                   If builds is given a value, it overrides argument build.
+    @param test_source_build: Build that contains the server-side test code.
+    @param pool: Specify the pool of machines to use for scheduling
+            purposes.
+    @param control_file: the control file of the job.
+    @param check_hosts: require appropriate live hosts to exist in the lab.
+    @param num: Specify the number of machines to schedule across (integer).
+                Leave unspecified or use None to use default sharding factor.
+    @param file_bugs: File a bug on each test failure in this suite.
+    @param timeout: The max lifetime of this suite, in hours.
+    @param timeout_mins: The max lifetime of this suite, in minutes. Takes
+                         priority over timeout.
+    @param priority: Integer denoting priority. Higher is more important.
+    @param suite_args: Optional arguments which will be parsed by the suite
+                       control file. Used by control.test_that_wrapper to
+                       determine which tests to run.
+    @param wait_for_results: Set to False to run the suite job without waiting
+                             for test jobs to finish. Default is True.
+    @param job_retry: Set to True to enable job-level retry. Default is False.
+    @param max_retries: Integer, maximum job retries allowed at suite level.
+                        None for no max.
+    @param max_runtime_mins: Maximum amount of time a job can be running in
+                             minutes.
+    @param suite_min_duts: Integer. Scheduler will prioritize getting the
+                           minimum number of machines for the suite when it is
+                           competing with another suite that has a higher
+                           priority but already got minimum machines it needs.
+    @param offload_failures_only: Only enable gs_offloading for failed jobs.
+    @param run_prod_code: If True, the suite will run the test code that
+                          lives in prod aka the test code currently on the
+                          lab servers. If False, the control files and test
+                          code for this suite run will be retrieved from the
+                          build artifacts.
+    @param delay_minutes: Delay the creation of test jobs for a given number of
+                          minutes.
+    @param is_cloning: True if creating a cloning job.
+    @param kwargs: extra keyword args. NOT USED.
+
+    @raises ControlFileNotFound: if a unique suite control file doesn't exist.
+    @raises NoControlFileList: if we can't list the control files at all.
+    @raises StageControlFileFailure: If the dev server throws 500 while
+                                     staging test_suites.
+    @raises ControlFileEmpty: if the control file exists on the server, but
+                              can't be read.
+
+    @return: the job ID of the suite; -1 on error.
+    """
+    if type(num) is not int and num is not None:
+        raise error.SuiteArgumentException('Ill specified num argument %r. '
+                                           'Must be an integer or None.' % num)
+    if num == 0:
+        logging.warning("Can't run on 0 hosts; using default.")
+        num = None
+
+    if builds is None:
+        builds = {}
+
+    # Default test source build to CrOS build if it's not specified and
+    # run_prod_code is set to False.
+    if not run_prod_code:
+        test_source_build = Suite.get_test_source_build(
+                builds, test_source_build=test_source_build)
+
+    sample_dut = rpc_utils.get_sample_dut(board, pool)
+
+    suite_name = canonicalize_suite_name(name)
+    if run_prod_code:
+        ds = dev_server.resolve(test_source_build, hostname=sample_dut)
+        keyvals = {}
+    else:
+        (ds, keyvals) = _stage_build_artifacts(
+                test_source_build, hostname=sample_dut)
+    keyvals[constants.SUITE_MIN_DUTS_KEY] = suite_min_duts
+
+    # Do not change this naming convention without updating
+    # site_utils.parse_job_name.
+    if run_prod_code:
+        # If run_prod_code is True, test_source_build is not set, use the
+        # first build in the builds list for the sutie job name.
+        name = '%s-%s' % (builds.values()[0], suite_name)
+    else:
+        name = '%s-%s' % (test_source_build, suite_name)
+
+    timeout_mins = timeout_mins or timeout * 60
+    max_runtime_mins = max_runtime_mins or timeout * 60
+
+    if not board:
+        board = utils.ParseBuildName(builds[provision.CROS_VERSION_PREFIX])[0]
+
+    if run_prod_code:
+        control_file = _get_control_file_by_suite(suite_name)
+
+    if not control_file:
+        # No control file was supplied so look it up from the build artifacts.
+        control_file = _get_control_file_by_build(
+                test_source_build, ds, suite_name)
+
+    # Prepend builds and board to the control file.
+    if is_cloning:
+        control_file = tools.remove_injection(control_file)
+
+    inject_dict = {
+        'board': board,
+        # `build` is needed for suites like AU to stage image inside suite
+        # control file.
+        'build': test_source_build,
+        'builds': builds,
+        'check_hosts': check_hosts,
+        'pool': pool,
+        'num': num,
+        'file_bugs': file_bugs,
+        'timeout': timeout,
+        'timeout_mins': timeout_mins,
+        'devserver_url': ds.url(),
+        'priority': priority,
+        'suite_args' : suite_args,
+        'wait_for_results': wait_for_results,
+        'job_retry': job_retry,
+        'max_retries': max_retries,
+        'max_runtime_mins': max_runtime_mins,
+        'offload_failures_only': offload_failures_only,
+        'test_source_build': test_source_build,
+        'run_prod_code': run_prod_code,
+        'delay_minutes': delay_minutes,
+    }
+    control_file = tools.inject_vars(inject_dict, control_file)
+
+    return rpc_utils.create_job_common(name,
+                                       priority=priority,
+                                       timeout_mins=timeout_mins,
+                                       max_runtime_mins=max_runtime_mins,
+                                       control_type='Server',
+                                       control_file=control_file,
+                                       hostless=True,
+                                       keyvals=keyvals)
+
+
+def get_job_history(**filter_data):
+    """Get history of the job, including the special tasks executed for the job
+
+    @param filter_data: filter for the call, should at least include
+                        {'job_id': [job id]}
+    @returns: JSON string of the job's history, including the information such
+              as the hosts run the job and the special tasks executed before
+              and after the job.
+    """
+    job_id = filter_data['job_id']
+    job_info = job_history.get_job_info(job_id)
+    return rpc_utils.prepare_for_serialization(job_info.get_history())
+
+
+def get_host_history(start_time, end_time, hosts=None, board=None, pool=None):
+    """Get history of a list of host.
+
+    The return is a JSON string of host history for each host, for example,
+    {'172.22.33.51': [{'status': 'Resetting'
+                       'start_time': '2014-08-07 10:02:16',
+                       'end_time': '2014-08-07 10:03:16',
+                       'log_url': 'http://autotest/reset-546546/debug',
+                       'dbg_str': 'Task: Special Task 19441991 (host ...)'},
+                       {'status': 'Running'
+                       'start_time': '2014-08-07 10:03:18',
+                       'end_time': '2014-08-07 10:13:00',
+                       'log_url': 'http://autotest/reset-546546/debug',
+                       'dbg_str': 'HQE: 15305005, for job: 14995562'}
+                     ]
+    }
+    @param start_time: start time to search for history, can be string value or
+                       epoch time.
+    @param end_time: end time to search for history, can be string value or
+                     epoch time.
+    @param hosts: A list of hosts to search for history. Default is None.
+    @param board: board type of hosts. Default is None.
+    @param pool: pool type of hosts. Default is None.
+    @returns: JSON string of the host history.
+    """
+    return rpc_utils.prepare_for_serialization(
+            host_history.get_history_details(
+                    start_time=start_time, end_time=end_time,
+                    hosts=hosts, board=board, pool=pool,
+                    process_pool_size=4))
+
+
+def shard_heartbeat(shard_hostname, jobs=(), hqes=(), known_job_ids=(),
+                    known_host_ids=(), known_host_statuses=()):
+    """Receive updates for job statuses from shards and assign hosts and jobs.
+
+    @param shard_hostname: Hostname of the calling shard
+    @param jobs: Jobs in serialized form that should be updated with newer
+                 status from a shard.
+    @param hqes: Hostqueueentries in serialized form that should be updated with
+                 newer status from a shard. Note that for every hostqueueentry
+                 the corresponding job must be in jobs.
+    @param known_job_ids: List of ids of jobs the shard already has.
+    @param known_host_ids: List of ids of hosts the shard already has.
+    @param known_host_statuses: List of statuses of hosts the shard already has.
+
+    @returns: Serialized representations of hosts, jobs, suite job keyvals
+              and their dependencies to be inserted into a shard's database.
+    """
+    # The following alternatives to sending host and job ids in every heartbeat
+    # have been considered:
+    # 1. Sending the highest known job and host ids. This would work for jobs:
+    #    Newer jobs always have larger ids. Also, if a job is not assigned to a
+    #    particular shard during a heartbeat, it never will be assigned to this
+    #    shard later.
+    #    This is not true for hosts though: A host that is leased won't be sent
+    #    to the shard now, but might be sent in a future heartbeat. This means
+    #    sometimes hosts should be transfered that have a lower id than the
+    #    maximum host id the shard knows.
+    # 2. Send the number of jobs/hosts the shard knows to the master in each
+    #    heartbeat. Compare these to the number of records that already have
+    #    the shard_id set to this shard. In the normal case, they should match.
+    #    In case they don't, resend all entities of that type.
+    #    This would work well for hosts, because there aren't that many.
+    #    Resending all jobs is quite a big overhead though.
+    #    Also, this approach might run into edge cases when entities are
+    #    ever deleted.
+    # 3. Mixtures of the above: Use 1 for jobs and 2 for hosts.
+    #    Using two different approaches isn't consistent and might cause
+    #    confusion. Also the issues with the case of deletions might still
+    #    occur.
+    #
+    # The overhead of sending all job and host ids in every heartbeat is low:
+    # At peaks one board has about 1200 created but unfinished jobs.
+    # See the numbers here: http://goo.gl/gQCGWH
+    # Assuming that job id's have 6 digits and that json serialization takes a
+    # comma and a space as overhead, the traffic per id sent is about 8 bytes.
+    # If 5000 ids need to be sent, this means 40 kilobytes of traffic.
+    # A NOT IN query with 5000 ids took about 30ms in tests made.
+    # These numbers seem low enough to outweigh the disadvantages of the
+    # solutions described above.
+    timer = autotest_stats.Timer('shard_heartbeat')
+    with timer:
+        shard_obj = rpc_utils.retrieve_shard(shard_hostname=shard_hostname)
+        rpc_utils.persist_records_sent_from_shard(shard_obj, jobs, hqes)
+        assert len(known_host_ids) == len(known_host_statuses)
+        for i in range(len(known_host_ids)):
+            host_model = models.Host.objects.get(pk=known_host_ids[i])
+            if host_model.status != known_host_statuses[i]:
+                host_model.status = known_host_statuses[i]
+                host_model.save()
+
+        hosts, jobs, suite_keyvals = rpc_utils.find_records_for_shard(
+                shard_obj, known_job_ids=known_job_ids,
+                known_host_ids=known_host_ids)
+        return {
+            'hosts': [host.serialize() for host in hosts],
+            'jobs': [job.serialize() for job in jobs],
+            'suite_keyvals': [kv.serialize() for kv in suite_keyvals],
+        }
+
+
+def get_shards(**filter_data):
+    """Return a list of all shards.
+
+    @returns A sequence of nested dictionaries of shard information.
+    """
+    shards = models.Shard.query_objects(filter_data)
+    serialized_shards = rpc_utils.prepare_rows_as_nested_dicts(shards, ())
+    for serialized, shard in zip(serialized_shards, shards):
+        serialized['labels'] = [label.name for label in shard.labels.all()]
+
+    return serialized_shards
+
+
+def _assign_board_to_shard_precheck(labels):
+    """Verify whether board labels are valid to be added to a given shard.
+
+    First check whether board label is in correct format. Second, check whether
+    the board label exist. Third, check whether the board has already been
+    assigned to shard.
+
+    @param labels: Board labels separated by comma.
+
+    @raises error.RPCException: If label provided doesn't start with `board:`
+            or board has been added to shard already.
+    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
+
+    @returns: A list of label models that ready to be added to shard.
+    """
+    labels = labels.split(',')
+    label_models = []
+    for label in labels:
+        # Check whether the board label is in correct format.
+        if not label.startswith('board:'):
+            raise error.RPCException('Sharding only supports `board:.*` label.')
+        # Check whether the board label exist. If not, exception will be thrown
+        # by smart_get function.
+        label = models.Label.smart_get(label)
+        # Check whether the board has been sharded already
+        try:
+            shard = models.Shard.objects.get(labels=label)
+            raise error.RPCException(
+                    '%s is already on shard %s' % (label, shard.hostname))
+        except models.Shard.DoesNotExist:
+            # board is not on any shard, so it's valid.
+            label_models.append(label)
+    return label_models
+
+
+def add_shard(hostname, labels):
+    """Add a shard and start running jobs on it.
+
+    @param hostname: The hostname of the shard to be added; needs to be unique.
+    @param labels: Board labels separated by comma. Jobs of one of the labels
+                   will be assigned to the shard.
+
+    @raises error.RPCException: If label provided doesn't start with `board:` or
+            board has been added to shard already.
+    @raises model_logic.ValidationError: If a shard with the given hostname
+            already exist.
+    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
+
+    @returns: The id of the added shard.
+    """
+    labels = _assign_board_to_shard_precheck(labels)
+    shard = models.Shard.add_object(hostname=hostname)
+    for label in labels:
+        shard.labels.add(label)
+    return shard.id
+
+
+def add_board_to_shard(hostname, labels):
+    """Add boards to a given shard
+
+    @param hostname: The hostname of the shard to be changed.
+    @param labels: Board labels separated by comma.
+
+    @raises error.RPCException: If label provided doesn't start with `board:` or
+            board has been added to shard already.
+    @raises models.Label.DoesNotExist: If the label specified doesn't exist.
+
+    @returns: The id of the changed shard.
+    """
+    labels = _assign_board_to_shard_precheck(labels)
+    shard = models.Shard.objects.get(hostname=hostname)
+    for label in labels:
+        shard.labels.add(label)
+    return shard.id
+
+
+def delete_shard(hostname):
+    """Delete a shard and reclaim all resources from it.
+
+    This claims back all assigned hosts from the shard. To ensure all DUTs are
+    in a sane state, a Reboot task with highest priority is scheduled for them.
+    This reboots the DUTs and then all left tasks continue to run in drone of
+    the master.
+
+    The procedure for deleting a shard:
+        * Lock all unlocked hosts on that shard.
+        * Remove shard information .
+        * Assign a reboot task with highest priority to these hosts.
+        * Unlock these hosts, then, the reboot tasks run in front of all other
+        tasks.
+
+    The status of jobs that haven't been reported to be finished yet, will be
+    lost. The master scheduler will pick up the jobs and execute them.
+
+    @param hostname: Hostname of the shard to delete.
+    """
+    shard = rpc_utils.retrieve_shard(shard_hostname=hostname)
+    hostnames_to_lock = [h.hostname for h in
+                         models.Host.objects.filter(shard=shard, locked=False)]
+
+    # TODO(beeps): Power off shard
+    # For ChromeOS hosts, a reboot test with the highest priority is added to
+    # the DUT. After a reboot it should be ganranteed that no processes from
+    # prior tests that were run by a shard are still running on.
+
+    # Lock all unlocked hosts.
+    dicts = {'locked': True, 'lock_time': datetime.datetime.now()}
+    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
+
+    # Remove shard information.
+    models.Host.objects.filter(shard=shard).update(shard=None)
+    models.Job.objects.filter(shard=shard).update(shard=None)
+    shard.labels.clear()
+    shard.delete()
+
+    # Assign a reboot task with highest priority: Super.
+    t = models.Test.objects.get(name='platform_BootPerfServer:shard')
+    c = utils.read_file(os.path.join(common.autotest_dir, t.path))
+    if hostnames_to_lock:
+        rpc_utils.create_job_common(
+                'reboot_dut_for_shard_deletion',
+                priority=priorities.Priority.SUPER,
+                control_type='Server',
+                control_file=c, hosts=hostnames_to_lock)
+
+    # Unlock these shard-related hosts.
+    dicts = {'locked': False, 'lock_time': None}
+    models.Host.objects.filter(hostname__in=hostnames_to_lock).update(**dicts)
+
+
+def get_servers(hostname=None, role=None, status=None):
+    """Get a list of servers with matching role and status.
+
+    @param hostname: FQDN of the server.
+    @param role: Name of the server role, e.g., drone, scheduler. Default to
+                 None to match any role.
+    @param status: Status of the server, e.g., primary, backup, repair_required.
+                   Default to None to match any server status.
+
+    @raises error.RPCException: If server database is not used.
+    @return: A list of server names for servers with matching role and status.
+    """
+    if not server_manager_utils.use_server_db():
+        raise error.RPCException('Server database is not enabled. Please try '
+                                 'retrieve servers from global config.')
+    servers = server_manager_utils.get_servers(hostname=hostname, role=role,
+                                               status=status)
+    return [s.get_details() for s in servers]
+
+
+@rpc_utils.route_rpc_to_master
+def get_stable_version(board=stable_version_utils.DEFAULT, android=False):
+    """Get stable version for the given board.
+
+    @param board: Name of the board.
+    @param android: If True, the given board is an Android-based device. If
+                    False, assume its a Chrome OS-based device.
+
+    @return: Stable version of the given board. Return global configure value
+             of CROS.stable_cros_version if stable_versinos table does not have
+             entry of board DEFAULT.
+    """
+    return stable_version_utils.get(board=board, android=android)
+
+
+@rpc_utils.route_rpc_to_master
+def get_all_stable_versions():
+    """Get stable versions for all boards.
+
+    @return: A dictionary of board:version.
+    """
+    return stable_version_utils.get_all()
+
+
+@rpc_utils.route_rpc_to_master
+def set_stable_version(version, board=stable_version_utils.DEFAULT):
+    """Modify stable version for the given board.
+
+    @param version: The new value of stable version for given board.
+    @param board: Name of the board, default to value `DEFAULT`.
+    """
+    stable_version_utils.set(version=version, board=board)
+
+
+@rpc_utils.route_rpc_to_master
+def delete_stable_version(board):
+    """Modify stable version for the given board.
+
+    Delete a stable version entry in afe_stable_versions table for a given
+    board, so default stable version will be used.
+
+    @param board: Name of the board.
+    """
+    stable_version_utils.delete(board=board)
+
+
+def get_tests_by_build(build, ignore_invalid_tests=True):
+    """Get the tests that are available for the specified build.
+
+    @param build: unique name by which to refer to the image.
+    @param ignore_invalid_tests: flag on if unparsable tests are ignored.
+
+    @return: A sorted list of all tests that are in the build specified.
+    """
+    # Collect the control files specified in this build
+    cfile_getter = control_file_lib._initialize_control_file_getter(build)
+    if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
+        control_file_info_list = cfile_getter.get_suite_info()
+        control_file_list = control_file_info_list.keys()
+    else:
+        control_file_list = cfile_getter.get_control_file_list()
+
+    test_objects = []
+    _id = 0
+    for control_file_path in control_file_list:
+        # Read and parse the control file
+        if SuiteBase.ENABLE_CONTROLS_IN_BATCH:
+            control_file = control_file_info_list[control_file_path]
+        else:
+            control_file = cfile_getter.get_control_file_contents(
+                    control_file_path)
+        try:
+            control_obj = control_data.parse_control_string(control_file)
+        except:
+            logging.info('Failed to parse control file: %s', control_file_path)
+            if not ignore_invalid_tests:
+                raise
+
+        # Extract the values needed for the AFE from the control_obj.
+        # The keys list represents attributes in the control_obj that
+        # are required by the AFE
+        keys = ['author', 'doc', 'name', 'time', 'test_type', 'experimental',
+                'test_category', 'test_class', 'dependencies', 'run_verify',
+                'sync_count', 'job_retries', 'retries', 'path']
+
+        test_object = {}
+        for key in keys:
+            test_object[key] = getattr(control_obj, key) if hasattr(
+                    control_obj, key) else ''
+
+        # Unfortunately, the AFE expects different key-names for certain
+        # values, these must be corrected to avoid the risk of tests
+        # being omitted by the AFE.
+        # The 'id' is an additional value used in the AFE.
+        # The control_data parsing does not reference 'run_reset', but it
+        # is also used in the AFE and defaults to True.
+        test_object['id'] = _id
+        test_object['run_reset'] = True
+        test_object['description'] = test_object.get('doc', '')
+        test_object['test_time'] = test_object.get('time', 0)
+        test_object['test_retry'] = test_object.get('retries', 0)
+
+        # Fix the test name to be consistent with the current presentation
+        # of test names in the AFE.
+        testpath, subname = os.path.split(control_file_path)
+        testname = os.path.basename(testpath)
+        subname = subname.split('.')[1:]
+        if subname:
+            testname = '%s:%s' % (testname, ':'.join(subname))
+
+        test_object['name'] = testname
+
+        # Correct the test path as parse_control_string sets an empty string.
+        test_object['path'] = control_file_path
+
+        _id += 1
+        test_objects.append(test_object)
+
+    test_objects = sorted(test_objects, key=lambda x: x.get('name'))
+    return rpc_utils.prepare_for_serialization(test_objects)