Move google.cloud.iam (core) to google.api_core.iam (#6740)
* move google.cloud.iam (core) to google.api.core.iam
diff --git a/google/api_core/iam.py b/google/api_core/iam.py
new file mode 100644
index 0000000..c17bddc
--- /dev/null
+++ b/google/api_core/iam.py
@@ -0,0 +1,248 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Non-API-specific IAM policy definitions
+
+For allowed roles / permissions, see:
+https://cloud.google.com/iam/docs/understanding-roles
+"""
+
+import collections
+try:
+ from collections import abc as collections_abc
+except ImportError: # Python 2.7
+ import collections as collections_abc
+import warnings
+
+# Generic IAM roles
+
+OWNER_ROLE = "roles/owner"
+"""Generic role implying all rights to an object."""
+
+EDITOR_ROLE = "roles/editor"
+"""Generic role implying rights to modify an object."""
+
+VIEWER_ROLE = "roles/viewer"
+"""Generic role implying rights to access an object."""
+
+_ASSIGNMENT_DEPRECATED_MSG = """\
+Assigning to '{}' is deprecated. Replace with 'policy[{}] = members."""
+
+
+class Policy(collections_abc.MutableMapping):
+ """IAM Policy
+
+ See
+ https://cloud.google.com/iam/reference/rest/v1/Policy
+
+ :type etag: str
+ :param etag: ETag used to identify a unique of the policy
+
+ :type version: int
+ :param version: unique version of the policy
+ """
+
+ _OWNER_ROLES = (OWNER_ROLE,)
+ """Roles mapped onto our ``owners`` attribute."""
+
+ _EDITOR_ROLES = (EDITOR_ROLE,)
+ """Roles mapped onto our ``editors`` attribute."""
+
+ _VIEWER_ROLES = (VIEWER_ROLE,)
+ """Roles mapped onto our ``viewers`` attribute."""
+
+ def __init__(self, etag=None, version=None):
+ self.etag = etag
+ self.version = version
+ self._bindings = collections.defaultdict(set)
+
+ def __iter__(self):
+ return iter(self._bindings)
+
+ def __len__(self):
+ return len(self._bindings)
+
+ def __getitem__(self, key):
+ return self._bindings[key]
+
+ def __setitem__(self, key, value):
+ self._bindings[key] = set(value)
+
+ def __delitem__(self, key):
+ del self._bindings[key]
+
+ @property
+ def owners(self):
+ """Legacy access to owner role."""
+ result = set()
+ for role in self._OWNER_ROLES:
+ for member in self._bindings.get(role, ()):
+ result.add(member)
+ return frozenset(result)
+
+ @owners.setter
+ def owners(self, value):
+ """Update owners."""
+ warnings.warn(
+ _ASSIGNMENT_DEPRECATED_MSG.format("owners", OWNER_ROLE), DeprecationWarning
+ )
+ self[OWNER_ROLE] = value
+
+ @property
+ def editors(self):
+ """Legacy access to editor role."""
+ result = set()
+ for role in self._EDITOR_ROLES:
+ for member in self._bindings.get(role, ()):
+ result.add(member)
+ return frozenset(result)
+
+ @editors.setter
+ def editors(self, value):
+ """Update editors."""
+ warnings.warn(
+ _ASSIGNMENT_DEPRECATED_MSG.format("editors", EDITOR_ROLE),
+ DeprecationWarning,
+ )
+ self[EDITOR_ROLE] = value
+
+ @property
+ def viewers(self):
+ """Legacy access to viewer role."""
+ result = set()
+ for role in self._VIEWER_ROLES:
+ for member in self._bindings.get(role, ()):
+ result.add(member)
+ return frozenset(result)
+
+ @viewers.setter
+ def viewers(self, value):
+ """Update viewers."""
+ warnings.warn(
+ _ASSIGNMENT_DEPRECATED_MSG.format("viewers", VIEWER_ROLE),
+ DeprecationWarning,
+ )
+ self[VIEWER_ROLE] = value
+
+ @staticmethod
+ def user(email):
+ """Factory method for a user member.
+
+ :type email: str
+ :param email: E-mail for this particular user.
+
+ :rtype: str
+ :returns: A member string corresponding to the given user.
+ """
+ return "user:%s" % (email,)
+
+ @staticmethod
+ def service_account(email):
+ """Factory method for a service account member.
+
+ :type email: str
+ :param email: E-mail for this particular service account.
+
+ :rtype: str
+ :returns: A member string corresponding to the given service account.
+ """
+ return "serviceAccount:%s" % (email,)
+
+ @staticmethod
+ def group(email):
+ """Factory method for a group member.
+
+ :type email: str
+ :param email: An id or e-mail for this particular group.
+
+ :rtype: str
+ :returns: A member string corresponding to the given group.
+ """
+ return "group:%s" % (email,)
+
+ @staticmethod
+ def domain(domain):
+ """Factory method for a domain member.
+
+ :type domain: str
+ :param domain: The domain for this member.
+
+ :rtype: str
+ :returns: A member string corresponding to the given domain.
+ """
+ return "domain:%s" % (domain,)
+
+ @staticmethod
+ def all_users():
+ """Factory method for a member representing all users.
+
+ :rtype: str
+ :returns: A member string representing all users.
+ """
+ return "allUsers"
+
+ @staticmethod
+ def authenticated_users():
+ """Factory method for a member representing all authenticated users.
+
+ :rtype: str
+ :returns: A member string representing all authenticated users.
+ """
+ return "allAuthenticatedUsers"
+
+ @classmethod
+ def from_api_repr(cls, resource):
+ """Create a policy from the resource returned from the API.
+
+ :type resource: dict
+ :param resource: resource returned from the ``getIamPolicy`` API.
+
+ :rtype: :class:`Policy`
+ :returns: the parsed policy
+ """
+ version = resource.get("version")
+ etag = resource.get("etag")
+ policy = cls(etag, version)
+ for binding in resource.get("bindings", ()):
+ role = binding["role"]
+ members = sorted(binding["members"])
+ policy[role] = members
+ return policy
+
+ def to_api_repr(self):
+ """Construct a Policy resource.
+
+ :rtype: dict
+ :returns: a resource to be passed to the ``setIamPolicy`` API.
+ """
+ resource = {}
+
+ if self.etag is not None:
+ resource["etag"] = self.etag
+
+ if self.version is not None:
+ resource["version"] = self.version
+
+ if self._bindings:
+ bindings = resource["bindings"] = []
+ for role, members in sorted(self._bindings.items()):
+ if members:
+ bindings.append({"role": role, "members": sorted(set(members))})
+
+ if not bindings:
+ del resource["bindings"]
+
+ return resource
+
+
+collections_abc.MutableMapping.register(Policy)
diff --git a/tests/unit/test_iam.py b/tests/unit/test_iam.py
new file mode 100644
index 0000000..59f3b2c
--- /dev/null
+++ b/tests/unit/test_iam.py
@@ -0,0 +1,275 @@
+# Copyright 2017 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+
+class TestPolicy:
+ @staticmethod
+ def _get_target_class():
+ from google.api_core.iam import Policy
+
+ return Policy
+
+ def _make_one(self, *args, **kw):
+ return self._get_target_class()(*args, **kw)
+
+ def test_ctor_defaults(self):
+ empty = frozenset()
+ policy = self._make_one()
+ assert policy.etag is None
+ assert policy.version is None
+ assert policy.owners == empty
+ assert policy.editors == empty
+ assert policy.viewers == empty
+ assert len(policy) == 0
+ assert dict(policy) == {}
+
+ def test_ctor_explicit(self):
+ VERSION = 17
+ ETAG = "ETAG"
+ empty = frozenset()
+ policy = self._make_one(ETAG, VERSION)
+ assert policy.etag == ETAG
+ assert policy.version == VERSION
+ assert policy.owners == empty
+ assert policy.editors == empty
+ assert policy.viewers == empty
+ assert len(policy) == 0
+ assert dict(policy) == {}
+
+ def test___getitem___miss(self):
+ policy = self._make_one()
+ assert policy["nonesuch"] == set()
+
+ def test___setitem__(self):
+ USER = "user:phred@example.com"
+ PRINCIPALS = set([USER])
+ policy = self._make_one()
+ policy["rolename"] = [USER]
+ assert policy["rolename"] == PRINCIPALS
+ assert len(policy) == 1
+ assert dict(policy) == {"rolename": PRINCIPALS}
+
+ def test___delitem___hit(self):
+ policy = self._make_one()
+ policy._bindings["rolename"] = ["phred@example.com"]
+ del policy["rolename"]
+ assert len(policy) == 0
+ assert dict(policy) == {}
+
+ def test___delitem___miss(self):
+ policy = self._make_one()
+ with pytest.raises(KeyError):
+ del policy["nonesuch"]
+
+ def test_owners_getter(self):
+ from google.api_core.iam import OWNER_ROLE
+
+ MEMBER = "user:phred@example.com"
+ expected = frozenset([MEMBER])
+ policy = self._make_one()
+ policy[OWNER_ROLE] = [MEMBER]
+ assert policy.owners == expected
+
+ def test_owners_setter(self):
+ import warnings
+ from google.api_core.iam import OWNER_ROLE
+
+ MEMBER = "user:phred@example.com"
+ expected = set([MEMBER])
+ policy = self._make_one()
+ with warnings.catch_warnings():
+ warnings.simplefilter("always")
+ policy.owners = [MEMBER]
+ assert policy[OWNER_ROLE] == expected
+
+ def test_editors_getter(self):
+ from google.api_core.iam import EDITOR_ROLE
+
+ MEMBER = "user:phred@example.com"
+ expected = frozenset([MEMBER])
+ policy = self._make_one()
+ policy[EDITOR_ROLE] = [MEMBER]
+ assert policy.editors == expected
+
+ def test_editors_setter(self):
+ import warnings
+ from google.api_core.iam import EDITOR_ROLE
+
+ MEMBER = "user:phred@example.com"
+ expected = set([MEMBER])
+ policy = self._make_one()
+ with warnings.catch_warnings():
+ warnings.simplefilter("always")
+ policy.editors = [MEMBER]
+ assert policy[EDITOR_ROLE] == expected
+
+ def test_viewers_getter(self):
+ from google.api_core.iam import VIEWER_ROLE
+
+ MEMBER = "user:phred@example.com"
+ expected = frozenset([MEMBER])
+ policy = self._make_one()
+ policy[VIEWER_ROLE] = [MEMBER]
+ assert policy.viewers == expected
+
+ def test_viewers_setter(self):
+ import warnings
+ from google.api_core.iam import VIEWER_ROLE
+
+ MEMBER = "user:phred@example.com"
+ expected = set([MEMBER])
+ policy = self._make_one()
+ with warnings.catch_warnings():
+ warnings.simplefilter("always")
+ policy.viewers = [MEMBER]
+ assert policy[VIEWER_ROLE] == expected
+
+ def test_user(self):
+ EMAIL = "phred@example.com"
+ MEMBER = "user:%s" % (EMAIL,)
+ policy = self._make_one()
+ assert policy.user(EMAIL) == MEMBER
+
+ def test_service_account(self):
+ EMAIL = "phred@example.com"
+ MEMBER = "serviceAccount:%s" % (EMAIL,)
+ policy = self._make_one()
+ assert policy.service_account(EMAIL) == MEMBER
+
+ def test_group(self):
+ EMAIL = "phred@example.com"
+ MEMBER = "group:%s" % (EMAIL,)
+ policy = self._make_one()
+ assert policy.group(EMAIL) == MEMBER
+
+ def test_domain(self):
+ DOMAIN = "example.com"
+ MEMBER = "domain:%s" % (DOMAIN,)
+ policy = self._make_one()
+ assert policy.domain(DOMAIN) == MEMBER
+
+ def test_all_users(self):
+ policy = self._make_one()
+ assert policy.all_users() == "allUsers"
+
+ def test_authenticated_users(self):
+ policy = self._make_one()
+ assert policy.authenticated_users() == "allAuthenticatedUsers"
+
+ def test_from_api_repr_only_etag(self):
+ empty = frozenset()
+ RESOURCE = {"etag": "ACAB"}
+ klass = self._get_target_class()
+ policy = klass.from_api_repr(RESOURCE)
+ assert policy.etag == "ACAB"
+ assert policy.version is None
+ assert policy.owners == empty
+ assert policy.editors == empty
+ assert policy.viewers == empty
+ assert dict(policy) == {}
+
+ def test_from_api_repr_complete(self):
+ from google.api_core.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
+
+ OWNER1 = "group:cloud-logs@google.com"
+ OWNER2 = "user:phred@example.com"
+ EDITOR1 = "domain:google.com"
+ EDITOR2 = "user:phred@example.com"
+ VIEWER1 = "serviceAccount:1234-abcdef@service.example.com"
+ VIEWER2 = "user:phred@example.com"
+ RESOURCE = {
+ "etag": "DEADBEEF",
+ "version": 17,
+ "bindings": [
+ {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]},
+ {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]},
+ {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2]},
+ ],
+ }
+ klass = self._get_target_class()
+ policy = klass.from_api_repr(RESOURCE)
+ assert policy.etag == "DEADBEEF"
+ assert policy.version == 17
+ assert policy.owners, frozenset([OWNER1 == OWNER2])
+ assert policy.editors, frozenset([EDITOR1 == EDITOR2])
+ assert policy.viewers, frozenset([VIEWER1 == VIEWER2])
+ assert dict(policy) == {
+ OWNER_ROLE: set([OWNER1, OWNER2]),
+ EDITOR_ROLE: set([EDITOR1, EDITOR2]),
+ VIEWER_ROLE: set([VIEWER1, VIEWER2]),
+ }
+
+ def test_from_api_repr_unknown_role(self):
+ USER = "user:phred@example.com"
+ GROUP = "group:cloud-logs@google.com"
+ RESOURCE = {
+ "etag": "DEADBEEF",
+ "version": 17,
+ "bindings": [{"role": "unknown", "members": [USER, GROUP]}],
+ }
+ klass = self._get_target_class()
+ policy = klass.from_api_repr(RESOURCE)
+ assert policy.etag == "DEADBEEF"
+ assert policy.version == 17
+ assert dict(policy), {"unknown": set([GROUP == USER])}
+
+ def test_to_api_repr_defaults(self):
+ policy = self._make_one()
+ assert policy.to_api_repr() == {}
+
+ def test_to_api_repr_only_etag(self):
+ policy = self._make_one("DEADBEEF")
+ assert policy.to_api_repr() == {"etag": "DEADBEEF"}
+
+ def test_to_api_repr_binding_wo_members(self):
+ policy = self._make_one()
+ policy["empty"] = []
+ assert policy.to_api_repr() == {}
+
+ def test_to_api_repr_binding_w_duplicates(self):
+ from google.api_core.iam import OWNER_ROLE
+
+ OWNER = "group:cloud-logs@google.com"
+ policy = self._make_one()
+ policy.owners = [OWNER, OWNER]
+ assert policy.to_api_repr() == {
+ "bindings": [{"role": OWNER_ROLE, "members": [OWNER]}]
+ }
+
+ def test_to_api_repr_full(self):
+ import operator
+ from google.api_core.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
+
+ OWNER1 = "group:cloud-logs@google.com"
+ OWNER2 = "user:phred@example.com"
+ EDITOR1 = "domain:google.com"
+ EDITOR2 = "user:phred@example.com"
+ VIEWER1 = "serviceAccount:1234-abcdef@service.example.com"
+ VIEWER2 = "user:phred@example.com"
+ BINDINGS = [
+ {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]},
+ {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]},
+ {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2]},
+ ]
+ policy = self._make_one("DEADBEEF", 17)
+ policy.owners = [OWNER1, OWNER2]
+ policy.editors = [EDITOR1, EDITOR2]
+ policy.viewers = [VIEWER1, VIEWER2]
+ resource = policy.to_api_repr()
+ assert resource["etag"] == "DEADBEEF"
+ assert resource["version"] == 17
+ key = operator.itemgetter("role")
+ assert sorted(resource["bindings"], key=key) == sorted(BINDINGS, key=key)