[autotest] Support repair_hosts RPC in sharding

BUG=chromium:482192
TEST=Use testing master and shard machines.
Run Reverify and Repair tasks on both master DUTs
and shard DUTs via frontend web ui.
DEPLOY=apache

Change-Id: Iac88eb72e0fcc8993a6bccbf53a91575ce8623b2
Reviewed-on: https://chromium-review.googlesource.com/289304
Trybot-Ready: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index e801872..c10f3b2 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -972,11 +972,19 @@
     return list(sorted(host.hostname for host in hosts))
 
 
-def reverify_hosts(**filter_data):
-    """\
-    Schedules a set of hosts for verify.
+def _forward_special_tasks_on_hosts(task, rpc, **filter_data):
+    """Forward special tasks to corresponding shards.
 
-    @returns A list of hostnames that a verify task was created for.
+    For master, when special tasks are fired on hosts that are sharded,
+    forward the RPC to corresponding shards.
+
+    For shard, create special task records in local DB.
+
+    @param task: Enum value of frontend.afe.models.SpecialTask.Task
+    @param rpc: RPC name to forward.
+    @param filter_data: Filter keywords to be used for DB query.
+
+    @return: A list of hostnames that a special task was created for.
     """
     hosts = models.Host.query_objects(filter_data)
     shard_host_map = rpc_utils.bucket_hosts_by_shard(hosts, rpc_hostnames=True)
@@ -993,11 +1001,12 @@
             # the 'hostname' filter should narrow down the list of hosts on
             # each shard even though we supply all the ids in filter_data.
             # This method uses hostname instead of id because it fits better
-            # with the overall architecture of redirection functions in rpc_utils.
+            # with the overall architecture of redirection functions in
+            # rpc_utils.
             shard_filter = filter_data.copy()
             shard_filter['hostname__in'] = hostnames
             rpc_utils.run_rpc_on_multiple_hostnames(
-                    'reverify_hosts', [shard], **shard_filter)
+                    rpc, [shard], **shard_filter)
 
     # There is a race condition here if someone assigns a shard to one of these
     # hosts before we create the task. The host will stay on the master if:
@@ -1008,10 +1017,20 @@
 
     # Given that we only rarely verify Ready hosts it isn't worth putting this
     # entire method in a transaction. The worst case scenario is that we have
-    # a verify running on a Ready host while the shard is using it, if the verify
-    # fails no subsequent tasks will be created against the host on the master,
-    # and verifies are safe enough that this is OK.
-    return _call_special_tasks_on_hosts(models.SpecialTask.Task.VERIFY, hosts)
+    # a verify running on a Ready host while the shard is using it, if the
+    # verify fails no subsequent tasks will be created against the host on the
+    # master, and verifies are safe enough that this is OK.
+    return _call_special_tasks_on_hosts(task, hosts)
+
+
+def reverify_hosts(**filter_data):
+    """\
+    Schedules a set of hosts for verify.
+
+    @returns A list of hostnames that a verify task was created for.
+    """
+    return _forward_special_tasks_on_hosts(
+            models.SpecialTask.Task.VERIFY, 'reverify_hosts', **filter_data)
 
 
 def repair_hosts(**filter_data):
@@ -1020,8 +1039,8 @@
 
     @returns A list of hostnames that a repair task was created for.
     """
-    return _call_special_tasks_on_hosts(models.SpecialTask.Task.REPAIR,
-            models.Host.query_objects(filter_data))
+    return _forward_special_tasks_on_hosts(
+            models.SpecialTask.Task.REPAIR, 'repair_hosts', **filter_data)
 
 
 def get_jobs(not_yet_run=False, running=False, finished=False,