[autotest] Forward calls to modify_host(s) to shards.

If a host gets locked after it was assigned to a shard, right now
only the master will know about it.

With this commit, calls to modify_host and modify_hosts will be
forwarded to the relevant shards, so locking of hosts will propagate.

BUG=None
DEPLOY=apache
TEST=Ran suites and called rpcs manually, also with failing shard rpcs.

Change-Id: I90ca34a4cefbdf55acd47ee6e8df872527b27285
Reviewed-on: https://chromium-review.googlesource.com/220850
Tested-by: Jakob Jülich <jakobjuelich@chromium.org>
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Jakob Jülich <jakobjuelich@chromium.org>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index dd8c963..7e53c7c 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -12,6 +12,7 @@
 from autotest_lib.client.common_lib import control_data, error
 from autotest_lib.client.common_lib import global_config, priorities
 from autotest_lib.server.cros import provision
+from autotest_lib.server import frontend
 
 NULL_DATETIME = datetime.datetime.max
 NULL_DATE = datetime.date.max
@@ -1014,3 +1015,55 @@
 
     _persist_records_with_type_sent_from_shard(
         shard, hqes, models.HostQueueEntry, job_ids_sent=job_ids_sent)
+
+
+def is_shard():
+    """Determines if this instance is running as a shard.
+
+    Reads the global_config value shard_hostname in the section SHARD.
+
+    @return True, if shard_hostname is set, False otherwise.
+    """
+    hostname = global_config.global_config.get_config_value(
+            'SHARD', 'shard_hostname', default=None)
+    return bool(hostname)
+
+
+def forward_single_host_rpc_to_shard(func):
+    """This decorator forwards rpc calls that modify a host to a shard.
+
+    If a host is assigned to a shard, rpcs that change his attributes should be
+    forwarded to the shard.
+
+    This assumes the first argument of the function represents a host id.
+
+    @param func: The function to decorate
+
+    @returns: The function to replace func with.
+    """
+    def replacement(**kwargs):
+        # Only keyword arguments can be accepted here, as we need the argument
+        # names to send the rpc. serviceHandler always provides arguments with
+        # their keywords, so this is not a problem.
+        host = models.Host.smart_get(kwargs['id'])
+        if host.shard and not is_shard():
+            run_rpc_on_multiple_hostnames(func.func_name, [host.shard.hostname],
+                                          **kwargs)
+        return func(**kwargs)
+
+    return replacement
+
+
+def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
+    """Runs an rpc to multiple AFEs
+
+    This is i.e. used to propagate changes made to hosts after they are assigned
+    to a shard.
+
+    @param rpc_call: Name of the rpc endpoint to call.
+    @param shard_hostnames: List of hostnames to run the rpcs on.
+    @param **kwargs: Keyword arguments to pass in the rpcs.
+    """
+    for shard_hostname in shard_hostnames:
+        afe = frontend.AFE(server=shard_hostname)
+        afe.run(rpc_call, **kwargs)