[autotest] Send labels to shards preserving their ids.

We were allowing deviations between a shard and the master
by not specifying the label id for forwarded label create
rpcs. This cl breaks down the forwarding into 2 phases,
the first to create the label and the second to add the label
to the host, and sends the label id as part of the packet for
label creation.

TEST=atest label add -m host_not_on_shard new_label1
     atest label add -m host_on_shard new_label2
     checked that new_label2 have the same id on the master and shard.
BUG=chromium:448367
DEPLOY=apache

Change-Id: I9773def43b94ad77e85bfeadcb69bbddf9e02460
Reviewed-on: https://chromium-review.googlesource.com/240348
Tested-by: Mungyung Ryu <mkryu@google.com>
Reviewed-by: Dan Shi <dshi@chromium.org>
Commit-Queue: Mungyung Ryu <mkryu@google.com>
diff --git a/frontend/afe/rpc_utils.py b/frontend/afe/rpc_utils.py
index 02f425f..3a41ca6 100644
--- a/frontend/afe/rpc_utils.py
+++ b/frontend/afe/rpc_utils.py
@@ -1077,6 +1077,32 @@
     return replacement
 
 
+def fanout_rpc(host_objs, rpc_name, include_hostnames=True, **kwargs):
+    """Fanout the give rpc to all shards.
+
+    @param host_objs: Host objects for the rpc.
+    @param rpc_name: The name of the rpc.
+    @param include_hostnames: If True, include the hostnames in the kwargs.
+        Hostnames are not always necessary, this functions is designed to
+        send rpcs to the shard a host is on, the rpcs themselves could be
+        related to labels, acls etc.
+    @param kwargs: The kwargs for the rpc.
+    """
+    # Fanout should only happen from the master to the shards.
+    if server_utils.is_shard():
+        return
+
+    # Figure out which hosts are on which shards.
+    shard_host_map = bucket_hosts_by_shard(
+            host_objs, rpc_hostnames=True)
+
+    # Execute the rpc against the appropriate shards.
+    for shard, hostnames in shard_host_map.iteritems():
+        if include_hostnames:
+            kwargs['hosts'] = hostnames
+        run_rpc_on_multiple_hostnames(rpc_name, [shard], **kwargs)
+
+
 def forward_multi_host_rpc_to_shards(func):
     """This decorator forwards rpc calls that modify multiple hosts.
 
@@ -1099,18 +1125,9 @@
     @returns: The function to replace func with.
     """
     def replacement(**kwargs):
-        if not server_utils.is_shard():
-
-            # Figure out which hosts are on which shards.
-            shard_host_map = bucket_hosts_by_shard(
-                    models.Host.smart_get_bulk(kwargs['hosts']),
-                    rpc_hostnames=True)
-
-            # Execute the rpc against the appropriate shards.
-            for shard, hostnames in shard_host_map.iteritems():
-                kwargs['hosts'] = hostnames
-                run_rpc_on_multiple_hostnames(func.func_name, [shard],
-                                              **kwargs)
+        fanout_rpc(
+                models.Host.smart_get_bulk(kwargs['hosts']),
+                func.func_name, **kwargs)
         return func(**kwargs)
 
     return replacement
@@ -1129,3 +1146,18 @@
     for shard_hostname in shard_hostnames:
         afe = frontend.AFE(server=shard_hostname)
         afe.run(rpc_call, **kwargs)
+
+
+def get_label(name):
+    """Gets a label object using a given name.
+
+    @param name: Label name.
+    @raises model.Label.DoesNotExist: when there is no label matching
+                                      the given name.
+    @return: a label object matching the given name.
+    """
+    try:
+        label = models.Label.smart_get(name)
+    except models.Label.DoesNotExist:
+        return None
+    return label