[autotest] Send labels to shards preserving their ids.
We were allowing deviations between a shard and the master
by not specifying the label id for forwarded label create
rpcs. This cl breaks down the forwarding into 2 phases,
the first to create the label and the second to add the label
to the host, and sends the label id as part of the packet for
label creation.
TEST=atest label add -m host_not_on_shard new_label1
atest label add -m host_on_shard new_label2
checked that new_label2 have the same id on the master and shard.
BUG=chromium:448367
DEPLOY=apache
Change-Id: I9773def43b94ad77e85bfeadcb69bbddf9e02460
Reviewed-on: https://chromium-review.googlesource.com/240348
Tested-by: Mungyung Ryu <mkryu@google.com>
Reviewed-by: Dan Shi <dshi@chromium.org>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 02f425f..3a41ca6 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -1077,6 +1077,32 @@
return replacement
+def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
+ """Fanout the give rpc to all shards.
+
+ @param host_objs: Host objects for the rpc.
+ @param rpc_name: The name of the rpc.
+ @param include_hostnames: If True, include the hostnames in the kwargs.
+ Hostnames are not always necessary, this functions is designed to
+ send rpcs to the shard a host is on, the rpcs themselves could be
+ related to labels, acls etc.
+ @param kwargs: The kwargs for the rpc.
+ """
+ # Fanout should only happen from the master to the shards.
+ if server_utils.is_shard():
+ return
+
+ # Figure out which hosts are on which shards.
+ shard_host_map = bucket_hosts_by_shard(
+ host_objs, rpc_hostnames=True)
+
+ # Execute the rpc against the appropriate shards.
+ for shard, hostnames in shard_host_map.iteritems():
+ if include_hostnames:
+ kwargs['hosts'] = hostnames
+ run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
+
+
def forward_multi_host_rpc_to_shards(func):
"""This decorator forwards rpc calls that modify multiple hosts.
@@ -1099,18 +1125,9 @@
@returns: The function to replace func with.
"""
def replacement(**kwargs):
- if not server_utils.is_shard():
-
- # Figure out which hosts are on which shards.
- shard_host_map = bucket_hosts_by_shard(
- models.Host.smart_get_bulk(kwargs['hosts']),
- rpc_hostnames=True)
-
- # Execute the rpc against the appropriate shards.
- for shard, hostnames in shard_host_map.iteritems():
- kwargs['hosts'] = hostnames
- run_rpc_on_multiple_hostnames(func.func_name, [shard],
- **kwargs)
+ fanout_rpc(
+ models.Host.smart_get_bulk(kwargs['hosts']),
+ func.func_name, **kwargs)
return func(**kwargs)
return replacement
@@ -1129,3 +1146,18 @@
for shard_hostname in shard_hostnames:
afe = frontend.AFE(server=shard_hostname)
afe.run(rpc_call, **kwargs)
+
+
+def get_label(name):
+ """Gets a label object using a given name.
+
+ @param name: Label name.
+ @raises model.Label.DoesNotExist: when there is no label matching
+ the given name.
+ @return: a label object matching the given name.
+ """
+ try:
+ label = models.Label.smart_get(name)
+ except models.Label.DoesNotExist:
+ return None
+ return label