[autotest] cros-version mismatch
add_cros_version_labels_and_job_repo_url() sends a RPC, 'add_label',
to AFE where the code is running (that could be a shard node).
This results in adding the label only to shard DB and the master DB
doesn't have the label.
BUG=chromium:503440
TEST=Test provision on shard testing machines.
DEPLOY=apache
Change-Id: Iedf46190c3c998677b43642f86cb72a55ed4fb27
Reviewed-on: https://chromium-review.googlesource.com/283100
Trybot-Ready: Mungyung Ryu <mkryu@google.com>
Tested-by: Mungyung Ryu <mkryu@google.com>
Reviewed-by: Dan Shi <dshi@chromium.org>
Reviewed-by: Fang Deng <fdeng@chromium.org>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 6e03611..4b88bac 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -1185,38 +1185,6 @@
return replacement
-def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
- """Fanout the given rpc to all shards.
-
- @param host_objs: Host objects for the rpc.
- @param rpc_name: The name of the rpc.
- @param include_hostnames: If True, include the hostnames in the kwargs.
- Hostnames are not always necessary, this functions is designed to
- send rpcs to the shard a host is on, the rpcs themselves could be
- related to labels, acls etc.
- @param kwargs: The kwargs for the rpc.
- """
- # Fanout should only happen from the master to the shards.
- if server_utils.is_shard():
- return
-
- # Figure out which hosts are on which shards.
- shard_host_map = bucket_hosts_by_shard(
- host_objs, rpc_hostnames=True)
-
- # Execute the rpc against the appropriate shards.
- for shard, hostnames in shard_host_map.iteritems():
- if include_hostnames:
- kwargs['hosts'] = hostnames
- try:
- run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
- except:
- ei = sys.exc_info()
- new_exc = error.RPCException('RPC %s failed on shard %s due to '
- '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
- raise new_exc.__class__, new_exc, ei[2]
-
-
def forward_multi_host_rpc_to_shards(func):
"""This decorator forwards rpc calls that modify multiple hosts.
@@ -1247,6 +1215,34 @@
return replacement
+def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
+ """Fanout the given rpc to shards of given hosts.
+
+ @param host_objs: Host objects for the rpc.
+ @param rpc_name: The name of the rpc.
+ @param include_hostnames: If True, include the hostnames in the kwargs.
+ Hostnames are not always necessary, this functions is designed to
+ send rpcs to the shard a host is on, the rpcs themselves could be
+ related to labels, acls etc.
+ @param kwargs: The kwargs for the rpc.
+ """
+ # Figure out which hosts are on which shards.
+ shard_host_map = bucket_hosts_by_shard(
+ host_objs, rpc_hostnames=True)
+
+ # Execute the rpc against the appropriate shards.
+ for shard, hostnames in shard_host_map.iteritems():
+ if include_hostnames:
+ kwargs['hosts'] = hostnames
+ try:
+ run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
+ except:
+ ei = sys.exc_info()
+ new_exc = error.RPCException('RPC %s failed on shard %s due to '
+ '%s: %s' % (rpc_name, shard, ei[0].__name__, ei[1]))
+ raise new_exc.__class__, new_exc, ei[2]
+
+
def run_rpc_on_multiple_hostnames(rpc_call, shard_hostnames, **kwargs):
"""Runs an rpc to multiple AFEs
@@ -1257,6 +1253,8 @@
@param shard_hostnames: List of hostnames to run the rpcs on.
@param **kwargs: Keyword arguments to pass in the rpcs.
"""
+ # Make sure this function is not called on shards but only on master.
+ assert not server_utils.is_shard()
for shard_hostname in shard_hostnames:
afe = frontend_wrappers.RetryingAFE(server=shard_hostname)
afe.run(rpc_call, **kwargs)