feat(api_core): support version 3 policy bindings (#9869)

* feat(api_core): support version 3 policy bindings

* fix(doc): fix documenting bindings structure

* try fixing docs

* fix pytype error

* fill test coverage

* indent docs

* fix docs

* improve test coverage

* linty

* remove unused variable
diff --git a/google/api_core/iam.py b/google/api_core/iam.py
index 04680eb..a7a1c00 100644
--- a/google/api_core/iam.py
+++ b/google/api_core/iam.py
@@ -21,19 +21,38 @@
 .. code-block:: python
 
    # ``get_iam_policy`` returns a :class:'~google.api_core.iam.Policy`.
-   policy = resource.get_iam_policy()
+   policy = resource.get_iam_policy(requested_policy_version=3)
 
-   phred = policy.user("phred@example.com")
-   admin_group = policy.group("admins@groups.example.com")
-   account = policy.service_account("account-1234@accounts.example.com")
-   policy["roles/owner"] = [phred, admin_group, account]
-   policy["roles/editor"] = policy.authenticated_users()
-   policy["roles/viewer"] = policy.all_users()
+   phred = "user:phred@example.com"
+   admin_group = "group:admins@groups.example.com"
+   account = "serviceAccount:account-1234@accounts.example.com"
+
+   policy.version = 3
+   policy.bindings = [
+       {
+           "role": "roles/owner",
+           "members": {phred, admin_group, account}
+       },
+       {
+           "role": "roles/editor",
+           "members": {"allAuthenticatedUsers"}
+       },
+       {
+           "role": "roles/viewer",
+           "members": {"allUsers"}
+           "condition": {
+               "title": "request_time",
+               "description": "Requests made before 2021-01-01T00:00:00Z",
+               "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
+           }
+       }
+   ]
 
    resource.set_iam_policy(policy)
 """
 
 import collections
+import operator
 import warnings
 
 try:
@@ -53,18 +72,41 @@
 """Generic role implying rights to access an object."""
 
 _ASSIGNMENT_DEPRECATED_MSG = """\
-Assigning to '{}' is deprecated.  Replace with 'policy[{}] = members."""
+Assigning to '{}' is deprecated. Use the `policy.bindings` property to modify bindings instead."""
+
+_FACTORY_DEPRECATED_MSG = """\
+Factory method {0} is deprecated. Replace with '{0}'."""
+
+_DICT_ACCESS_MSG = """\
+Dict access is not supported on policies with version > 1 or with conditional bindings."""
+
+
+class InvalidOperationException(Exception):
+    """Raised when trying to use Policy class as a dict."""
+
+    pass
 
 
 class Policy(collections_abc.MutableMapping):
     """IAM Policy
 
-    See
-    https://cloud.google.com/iam/reference/rest/v1/Policy
-
     Args:
         etag (Optional[str]): ETag used to identify a unique of the policy
-        version (Optional[int]): unique version of the policy
+        version (Optional[int]): The syntax schema version of the policy.
+
+    Note:
+        Using conditions in bindings requires the policy's version to be set
+        to `3` or greater, depending on the versions that are currently supported.
+
+        Accessing the policy using dict operations will raise InvalidOperationException
+        when the policy's version is set to 3.
+
+        Use the policy.bindings getter/setter to retrieve and modify the policy's bindings.
+
+    See:
+        IAM Policy https://cloud.google.com/iam/reference/rest/v1/Policy
+        Policy versions https://cloud.google.com/iam/docs/policies#versions
+        Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
     """
 
     _OWNER_ROLES = (OWNER_ROLE,)
@@ -79,31 +121,120 @@
     def __init__(self, etag=None, version=None):
         self.etag = etag
         self.version = version
-        self._bindings = collections.defaultdict(set)
+        self._bindings = []
 
     def __iter__(self):
-        return iter(self._bindings)
+        self.__check_version__()
+        return (binding["role"] for binding in self._bindings)
 
     def __len__(self):
+        self.__check_version__()
         return len(self._bindings)
 
     def __getitem__(self, key):
-        return self._bindings[key]
+        self.__check_version__()
+        for b in self._bindings:
+            if b["role"] == key:
+                return b["members"]
+        return set()
 
     def __setitem__(self, key, value):
-        self._bindings[key] = set(value)
+        self.__check_version__()
+        value = set(value)
+        for binding in self._bindings:
+            if binding["role"] == key:
+                binding["members"] = value
+                return
+        self._bindings.append({"role": key, "members": value})
 
     def __delitem__(self, key):
-        del self._bindings[key]
+        self.__check_version__()
+        for b in self._bindings:
+            if b["role"] == key:
+                self._bindings.remove(b)
+                return
+        raise KeyError(key)
+
+    def __check_version__(self):
+        """Raise InvalidOperationException if version is greater than 1 or policy contains conditions."""
+        raise_version = self.version is not None and self.version > 1
+
+        if raise_version or self._contains_conditions():
+            raise InvalidOperationException(_DICT_ACCESS_MSG)
+
+    def _contains_conditions(self):
+        for b in self._bindings:
+            if b.get("condition") is not None:
+                return True
+        return False
+
+    @property
+    def bindings(self):
+        """The policy's list of bindings.
+
+        A binding is specified by a dictionary with keys:
+
+        * role (str): Role that is assigned to `members`.
+
+        * members (:obj:`set` of str): Specifies the identities associated to this binding.
+
+        * condition (:obj:`dict` of str:str): Specifies a condition under which this binding will apply.
+
+          * title (str): Title for the condition.
+
+          * description (:obj:str, optional): Description of the condition.
+
+          * expression: A CEL expression.
+
+        Type:
+           :obj:`list` of :obj:`dict`
+
+        See:
+           Policy versions https://cloud.google.com/iam/docs/policies#versions
+           Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
+
+        Example:
+
+        .. code-block:: python
+
+           USER = "user:phred@example.com"
+           ADMIN_GROUP = "group:admins@groups.example.com"
+           SERVICE_ACCOUNT = "serviceAccount:account-1234@accounts.example.com"
+           CONDITION = {
+               "title": "request_time",
+               "description": "Requests made before 2021-01-01T00:00:00Z", # Optional
+               "expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
+           }
+
+           # Set policy's version to 3 before setting bindings containing conditions.
+           policy.version = 3
+
+           policy.bindings = [
+           {
+               "role": "roles/viewer",
+               "members": {USER, ADMIN_GROUP, SERVICE_ACCOUNT},
+               "condition": CONDITION
+               },
+               ...
+           ]
+        """
+        return self._bindings
+
+    @bindings.setter
+    def bindings(self, bindings):
+        self._bindings = bindings
 
     @property
     def owners(self):
         """Legacy access to owner role.
 
-        DEPRECATED:  use ``policy["roles/owners"]`` instead."""
+        Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+        DEPRECATED:  use `policy.bindings` to access bindings instead.
+        """
         result = set()
         for role in self._OWNER_ROLES:
-            for member in self._bindings.get(role, ()):
+            for member in self.get(role, ()):
                 result.add(member)
         return frozenset(result)
 
@@ -111,7 +242,10 @@
     def owners(self, value):
         """Update owners.
 
-        DEPRECATED:  use ``policy["roles/owners"] = value`` instead."""
+        Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+        DEPRECATED:  use `policy.bindings` to access bindings instead.
+        """
         warnings.warn(
             _ASSIGNMENT_DEPRECATED_MSG.format("owners", OWNER_ROLE), DeprecationWarning
         )
@@ -121,10 +255,13 @@
     def editors(self):
         """Legacy access to editor role.
 
-        DEPRECATED:  use ``policy["roles/editors"]`` instead."""
+        Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+        DEPRECATED:  use `policy.bindings` to access bindings instead.
+        """
         result = set()
         for role in self._EDITOR_ROLES:
-            for member in self._bindings.get(role, ()):
+            for member in self.get(role, ()):
                 result.add(member)
         return frozenset(result)
 
@@ -132,7 +269,10 @@
     def editors(self, value):
         """Update editors.
 
-        DEPRECATED:  use ``policy["roles/editors"] = value`` instead."""
+        Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+        DEPRECATED:  use `policy.bindings` to modify bindings instead.
+        """
         warnings.warn(
             _ASSIGNMENT_DEPRECATED_MSG.format("editors", EDITOR_ROLE),
             DeprecationWarning,
@@ -143,11 +283,13 @@
     def viewers(self):
         """Legacy access to viewer role.
 
-        DEPRECATED:  use ``policy["roles/viewers"]`` instead
+        Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+        DEPRECATED:  use `policy.bindings` to modify bindings instead.
         """
         result = set()
         for role in self._VIEWER_ROLES:
-            for member in self._bindings.get(role, ()):
+            for member in self.get(role, ()):
                 result.add(member)
         return frozenset(result)
 
@@ -155,7 +297,9 @@
     def viewers(self, value):
         """Update viewers.
 
-        DEPRECATED:  use ``policy["roles/viewers"] = value`` instead.
+        Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
+
+        DEPRECATED:  use `policy.bindings` to modify bindings instead.
         """
         warnings.warn(
             _ASSIGNMENT_DEPRECATED_MSG.format("viewers", VIEWER_ROLE),
@@ -172,7 +316,12 @@
 
         Returns:
             str: A member string corresponding to the given user.
+
+        DEPRECATED:  set the role `user:{email}` in the binding instead.
         """
+        warnings.warn(
+            _FACTORY_DEPRECATED_MSG.format("user:{email}"), DeprecationWarning,
+        )
         return "user:%s" % (email,)
 
     @staticmethod
@@ -184,7 +333,13 @@
 
         Returns:
             str: A member string corresponding to the given service account.
+
+        DEPRECATED:  set the role `serviceAccount:{email}` in the binding instead.
         """
+        warnings.warn(
+            _FACTORY_DEPRECATED_MSG.format("serviceAccount:{email}"),
+            DeprecationWarning,
+        )
         return "serviceAccount:%s" % (email,)
 
     @staticmethod
@@ -196,7 +351,12 @@
 
         Returns:
             str: A member string corresponding to the given group.
+
+        DEPRECATED:  set the role `group:{email}` in the binding instead.
         """
+        warnings.warn(
+            _FACTORY_DEPRECATED_MSG.format("group:{email}"), DeprecationWarning,
+        )
         return "group:%s" % (email,)
 
     @staticmethod
@@ -208,7 +368,12 @@
 
         Returns:
             str: A member string corresponding to the given domain.
+
+        DEPRECATED:  set the role `domain:{email}` in the binding instead.
         """
+        warnings.warn(
+            _FACTORY_DEPRECATED_MSG.format("domain:{email}"), DeprecationWarning,
+        )
         return "domain:%s" % (domain,)
 
     @staticmethod
@@ -217,7 +382,12 @@
 
         Returns:
             str: A member string representing all users.
+
+        DEPRECATED:  set the role `allUsers` in the binding instead.
         """
+        warnings.warn(
+            _FACTORY_DEPRECATED_MSG.format("allUsers"), DeprecationWarning,
+        )
         return "allUsers"
 
     @staticmethod
@@ -226,7 +396,12 @@
 
         Returns:
             str: A member string representing all authenticated users.
+
+        DEPRECATED:  set the role `allAuthenticatedUsers` in the binding instead.
         """
+        warnings.warn(
+            _FACTORY_DEPRECATED_MSG.format("allAuthenticatedUsers"), DeprecationWarning,
+        )
         return "allAuthenticatedUsers"
 
     @classmethod
@@ -242,10 +417,11 @@
         version = resource.get("version")
         etag = resource.get("etag")
         policy = cls(etag, version)
-        for binding in resource.get("bindings", ()):
-            role = binding["role"]
-            members = sorted(binding["members"])
-            policy[role] = members
+        policy.bindings = resource.get("bindings", [])
+
+        for binding in policy.bindings:
+            binding["members"] = set(binding.get("members", ()))
+
         return policy
 
     def to_api_repr(self):
@@ -262,13 +438,23 @@
         if self.version is not None:
             resource["version"] = self.version
 
-        if self._bindings:
-            bindings = resource["bindings"] = []
-            for role, members in sorted(self._bindings.items()):
+        if self._bindings and len(self._bindings) > 0:
+            bindings = []
+            for binding in self._bindings:
+                members = binding.get("members")
                 if members:
-                    bindings.append({"role": role, "members": sorted(set(members))})
+                    new_binding = {
+                        "role": binding["role"],
+                        "members": sorted(members)
+                    }
+                    condition = binding.get("condition")
+                    if condition:
+                        new_binding["condition"] = condition
+                    bindings.append(new_binding)
 
-            if not bindings:
-                del resource["bindings"]
+            if bindings:
+                # Sort bindings by role
+                key = operator.itemgetter("role")
+                resource["bindings"] = sorted(bindings, key=key)
 
         return resource
diff --git a/tests/unit/test_iam.py b/tests/unit/test_iam.py
index 199c389..896e10d 100644
--- a/tests/unit/test_iam.py
+++ b/tests/unit/test_iam.py
@@ -14,6 +14,8 @@
 
 import pytest
 
+from google.api_core.iam import _DICT_ACCESS_MSG, InvalidOperationException
+
 
 class TestPolicy:
     @staticmethod
@@ -37,7 +39,7 @@
         assert dict(policy) == {}
 
     def test_ctor_explicit(self):
-        VERSION = 17
+        VERSION = 1
         ETAG = "ETAG"
         empty = frozenset()
         policy = self._make_one(ETAG, VERSION)
@@ -53,6 +55,21 @@
         policy = self._make_one()
         assert policy["nonesuch"] == set()
 
+    def test___getitem___version3(self):
+        policy = self._make_one("DEADBEEF", 3)
+        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
+            policy["role"]
+
+    def test___getitem___with_conditions(self):
+        USER = "user:phred@example.com"
+        CONDITION = {"expression": "2 > 1"}
+        policy = self._make_one("DEADBEEF", 1)
+        policy.bindings = [
+            {"role": "role/reader", "members": [USER], "condition": CONDITION}
+        ]
+        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
+            policy["role/reader"]
+
     def test___setitem__(self):
         USER = "user:phred@example.com"
         PRINCIPALS = set([USER])
@@ -62,18 +79,73 @@
         assert len(policy) == 1
         assert dict(policy) == {"rolename": PRINCIPALS}
 
+    def test__set_item__overwrite(self):
+        GROUP = "group:test@group.com"
+        USER = "user:phred@example.com"
+        ALL_USERS = "allUsers"
+        MEMBERS = set([ALL_USERS])
+        GROUPS = set([GROUP])
+        policy = self._make_one()
+        policy["first"] = [GROUP]
+        policy["second"] = [USER]
+        policy["second"] = [ALL_USERS]
+        assert policy["second"] == MEMBERS
+        assert len(policy) == 2
+        assert dict(policy) == {"first": GROUPS, "second": MEMBERS}
+
+    def test___setitem___version3(self):
+        policy = self._make_one("DEADBEEF", 3)
+        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
+            policy["role/reader"] = ["user:phred@example.com"]
+
+    def test___setitem___with_conditions(self):
+        USER = "user:phred@example.com"
+        CONDITION = {"expression": "2 > 1"}
+        policy = self._make_one("DEADBEEF", 1)
+        policy.bindings = [
+            {"role": "role/reader", "members": set([USER]), "condition": CONDITION}
+        ]
+        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
+            policy["role/reader"] = ["user:phred@example.com"]
+
     def test___delitem___hit(self):
         policy = self._make_one()
-        policy._bindings["rolename"] = ["phred@example.com"]
-        del policy["rolename"]
-        assert len(policy) == 0
-        assert dict(policy) == {}
+        policy.bindings = [
+            {"role": "to/keep", "members": set(["phred@example.com"])},
+            {"role": "to/remove", "members": set(["phred@example.com"])}
+        ]
+        del policy["to/remove"]
+        assert len(policy) == 1
+        assert dict(policy) == {"to/keep": set(["phred@example.com"])}
 
     def test___delitem___miss(self):
         policy = self._make_one()
         with pytest.raises(KeyError):
             del policy["nonesuch"]
 
+    def test___delitem___version3(self):
+        policy = self._make_one("DEADBEEF", 3)
+        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
+            del policy["role/reader"]
+
+    def test___delitem___with_conditions(self):
+        USER = "user:phred@example.com"
+        CONDITION = {"expression": "2 > 1"}
+        policy = self._make_one("DEADBEEF", 1)
+        policy.bindings = [
+            {"role": "role/reader", "members": set([USER]), "condition": CONDITION}
+        ]
+        with pytest.raises(InvalidOperationException, match=_DICT_ACCESS_MSG):
+            del policy["role/reader"]
+
+    def test_bindings_property(self):
+        USER = "user:phred@example.com"
+        CONDITION = {"expression": "2 > 1"}
+        policy = self._make_one()
+        BINDINGS = [{"role": "role/reader", "members": set([USER]), "condition": CONDITION}]
+        policy.bindings = BINDINGS
+        assert policy.bindings == BINDINGS
+
     def test_owners_getter(self):
         from google.api_core.iam import OWNER_ROLE
 
@@ -94,7 +166,7 @@
         with warnings.catch_warnings(record=True) as warned:
             policy.owners = [MEMBER]
 
-        warning, = warned
+        (warning,) = warned
         assert warning.category is DeprecationWarning
         assert policy[OWNER_ROLE] == expected
 
@@ -118,7 +190,7 @@
         with warnings.catch_warnings(record=True) as warned:
             policy.editors = [MEMBER]
 
-        warning, = warned
+        (warning,) = warned
         assert warning.category is DeprecationWarning
         assert policy[EDITOR_ROLE] == expected
 
@@ -142,41 +214,77 @@
         with warnings.catch_warnings(record=True) as warned:
             policy.viewers = [MEMBER]
 
-        warning, = warned
+        (warning,) = warned
         assert warning.category is DeprecationWarning
         assert policy[VIEWER_ROLE] == expected
 
     def test_user(self):
+        import warnings
+
         EMAIL = "phred@example.com"
         MEMBER = "user:%s" % (EMAIL,)
         policy = self._make_one()
-        assert policy.user(EMAIL) == MEMBER
+        with warnings.catch_warnings(record=True) as warned:
+            assert policy.user(EMAIL) == MEMBER
+
+        (warning,) = warned
+        assert warning.category is DeprecationWarning
 
     def test_service_account(self):
+        import warnings
+
         EMAIL = "phred@example.com"
         MEMBER = "serviceAccount:%s" % (EMAIL,)
         policy = self._make_one()
-        assert policy.service_account(EMAIL) == MEMBER
+        with warnings.catch_warnings(record=True) as warned:
+            assert policy.service_account(EMAIL) == MEMBER
+
+        (warning,) = warned
+        assert warning.category is DeprecationWarning
 
     def test_group(self):
+        import warnings
+
         EMAIL = "phred@example.com"
         MEMBER = "group:%s" % (EMAIL,)
         policy = self._make_one()
-        assert policy.group(EMAIL) == MEMBER
+        with warnings.catch_warnings(record=True) as warned:
+            assert policy.group(EMAIL) == MEMBER
+
+        (warning,) = warned
+        assert warning.category is DeprecationWarning
 
     def test_domain(self):
+        import warnings
+
         DOMAIN = "example.com"
         MEMBER = "domain:%s" % (DOMAIN,)
         policy = self._make_one()
-        assert policy.domain(DOMAIN) == MEMBER
+        with warnings.catch_warnings(record=True) as warned:
+            assert policy.domain(DOMAIN) == MEMBER
+
+        (warning,) = warned
+        assert warning.category is DeprecationWarning
 
     def test_all_users(self):
+        import warnings
+
         policy = self._make_one()
-        assert policy.all_users() == "allUsers"
+        with warnings.catch_warnings(record=True) as warned:
+            assert policy.all_users() == "allUsers"
+
+        (warning,) = warned
+        assert warning.category is DeprecationWarning
 
     def test_authenticated_users(self):
+        import warnings
+
         policy = self._make_one()
-        assert policy.authenticated_users() == "allAuthenticatedUsers"
+        with warnings.catch_warnings(record=True) as warned:
+            assert policy.authenticated_users() == "allAuthenticatedUsers"
+
+        (warning,) = warned
+        assert warning.category is DeprecationWarning
 
     def test_from_api_repr_only_etag(self):
         empty = frozenset()
@@ -201,7 +309,7 @@
         VIEWER2 = "user:phred@example.com"
         RESOURCE = {
             "etag": "DEADBEEF",
-            "version": 17,
+            "version": 1,
             "bindings": [
                 {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]},
                 {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]},
@@ -211,7 +319,7 @@
         klass = self._get_target_class()
         policy = klass.from_api_repr(RESOURCE)
         assert policy.etag == "DEADBEEF"
-        assert policy.version == 17
+        assert policy.version == 1
         assert policy.owners, frozenset([OWNER1 == OWNER2])
         assert policy.editors, frozenset([EDITOR1 == EDITOR2])
         assert policy.viewers, frozenset([VIEWER1 == VIEWER2])
@@ -220,19 +328,24 @@
             EDITOR_ROLE: set([EDITOR1, EDITOR2]),
             VIEWER_ROLE: set([VIEWER1, VIEWER2]),
         }
+        assert policy.bindings == [
+            {"role": OWNER_ROLE, "members": set([OWNER1, OWNER2])},
+            {"role": EDITOR_ROLE, "members": set([EDITOR1, EDITOR2])},
+            {"role": VIEWER_ROLE, "members": set([VIEWER1, VIEWER2])},
+        ]
 
     def test_from_api_repr_unknown_role(self):
         USER = "user:phred@example.com"
         GROUP = "group:cloud-logs@google.com"
         RESOURCE = {
             "etag": "DEADBEEF",
-            "version": 17,
+            "version": 1,
             "bindings": [{"role": "unknown", "members": [USER, GROUP]}],
         }
         klass = self._get_target_class()
         policy = klass.from_api_repr(RESOURCE)
         assert policy.etag == "DEADBEEF"
-        assert policy.version == 17
+        assert policy.version == 1
         assert dict(policy), {"unknown": set([GROUP == USER])}
 
     def test_to_api_repr_defaults(self):
@@ -262,7 +375,6 @@
 
     def test_to_api_repr_full(self):
         import operator
-        import warnings
         from google.api_core.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
 
         OWNER1 = "group:cloud-logs@google.com"
@@ -271,18 +383,21 @@
         EDITOR2 = "user:phred@example.com"
         VIEWER1 = "serviceAccount:1234-abcdef@service.example.com"
         VIEWER2 = "user:phred@example.com"
+        CONDITION = {
+            "title": "title",
+            "description": "description",
+            "expression": "true"
+        }
         BINDINGS = [
             {"role": OWNER_ROLE, "members": [OWNER1, OWNER2]},
             {"role": EDITOR_ROLE, "members": [EDITOR1, EDITOR2]},
             {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2]},
+            {"role": VIEWER_ROLE, "members": [VIEWER1, VIEWER2], "condition": CONDITION},
         ]
-        policy = self._make_one("DEADBEEF", 17)
-        with warnings.catch_warnings(record=True):
-            policy.owners = [OWNER1, OWNER2]
-            policy.editors = [EDITOR1, EDITOR2]
-            policy.viewers = [VIEWER1, VIEWER2]
+        policy = self._make_one("DEADBEEF", 1)
+        policy.bindings = BINDINGS
         resource = policy.to_api_repr()
         assert resource["etag"] == "DEADBEEF"
-        assert resource["version"] == 17
+        assert resource["version"] == 1
         key = operator.itemgetter("role")
         assert sorted(resource["bindings"], key=key) == sorted(BINDINGS, key=key)