[autotest] Disallow frontend jobs on hosts given to shards.

TEST=Ran jobs and checked exceptions.
     Ran jobs on non-shard hosts.
BUG=chromium:431789
DEPLOY=apache
Change-Id: Ide385ed1db135a7e98ab0385df1f9a64d97bd631
Reviewed-on: https://chromium-review.googlesource.com/231735
Tested-by: Prashanth B <beeps@chromium.org>
Reviewed-by: Dan Shi <dshi@chromium.org>
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Prashanth B <beeps@chromium.org>
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index f8a83f8..c07f881 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -20,6 +20,7 @@
 from autotest_lib.client.common_lib import global_config
 from autotest_lib.client.common_lib import host_queue_entry_states
 from autotest_lib.client.common_lib import control_data, priorities, decorators
+from autotest_lib.client.common_lib import site_utils
 from autotest_lib.client.common_lib.cros.graphite import es_utils
 
 # job options and user preferences
@@ -157,6 +158,26 @@
         db_table = 'afe_shards'
 
 
+    def rpc_hostname(self):
+        """Get the rpc hostname of the shard.
+
+        @return: Just the shard hostname for all non-testing environments.
+                 The address of the default gateway for vm testing environments.
+        """
+        # TODO: Figure out a better solution for testing. Since no 2 shards
+        # can run on the same host, if the shard hostname is localhost we
+        # conclude that it must be a vm in a test cluster. In such situations
+        # a name of localhost:<port> is necessary to achieve the correct
+        # afe links/redirection from the frontend (this happens through the
+        # host), but for rpcs that are performed *on* the shard, they need to
+        # use the address of the gateway.
+        hostname = self.hostname.split(':')[0]
+        if site_utils.is_localhost(hostname):
+            return self.hostname.replace(
+                    hostname, site_utils.DEFAULT_VM_GATEWAY)
+        return self.hostname
+
+
 class Drone(dbmodels.Model, model_logic.ModelExtensions):
     """
     A scheduler drone
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index a21bd06..76095c4 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -734,6 +734,11 @@
     @returns A list of hostnames that a special task was created for.
     """
     models.AclGroup.check_for_acl_violation_hosts(hosts)
+    shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts)
+    if shard_host_map:
+        raise ValueError('The following hosts are on shards, please '
+                         'follow the link to the shards and create jobs '
+                         'there instead. %s.' % shard_host_map)
     for host in hosts:
         models.SpecialTask.schedule_special_task(host, task)
     return list(sorted(host.hostname for host in hosts))
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 1f4a694..deb8ba9 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -734,6 +734,25 @@
     return interleaved_entries
 
 
+def bucket_hosts_by_shard(host_objs, rpc_hostnames=False):
+    """Figure out which hosts are on which shards.
+
+    @param host_objs: A list of host objects.
+    @param rpc_hostnames: If True, the rpc_hostnames of a shard are returned
+        instead of the 'real' shard hostnames. This only matters for testing
+        environments.
+
+    @return: A map of shard hostname: list of hosts on the shard.
+    """
+    shard_host_map = {}
+    for host in host_objs:
+        if host.shard:
+            shard_name = (host.shard.rpc_hostname() if rpc_hostnames
+                          else host.shard.hostname)
+            shard_host_map.setdefault(shard_name, []).append(host.hostname)
+    return shard_host_map
+
+
 def get_create_job_common_args(local_args):
     """
     Returns a dict containing only the args that apply for create_job_common
@@ -799,6 +818,16 @@
 
     # convert hostnames & meta hosts to host/label objects
     host_objects = models.Host.smart_get_bulk(hosts)
+    # Don't ever create jobs against hosts on shards. Though we have a hook
+    # in host scheduler that will prevent these jobs from even starting,
+    # a perpetually queued job will lead to a lot of confusion.
+    if not is_shard():
+        shard_host_map = bucket_hosts_by_shard(host_objects)
+        if shard_host_map:
+            raise ValueError('The following hosts are on shards, please '
+                             'follow the link to the shards and create jobs '
+                             'there instead. %s.' % shard_host_map)
+
     metahost_objects = []
     meta_host_labels_by_name = {label.name: label for label in label_objects}
     for label_name in meta_hosts or []:
@@ -1081,11 +1110,9 @@
         if not is_shard():
 
             # Figure out which hosts are on which shards.
-            shard_host_map = {}
-            for host in models.Host.smart_get_bulk(kwargs['hosts']):
-                if host.shard:
-                    shard_host_map.setdefault(
-                            host.shard.hostname, []).append(host.hostname)
+            shard_host_map = bucket_hosts_by_shard(
+                    models.Host.smart_get_bulk(kwargs['hosts']),
+                    rpc_hostnames=True)
 
             # Execute the rpc against the appropriate shards.
             for shard, hostnames in shard_host_map.iteritems():