blob: 61b1d187669136b65b52228186afa4366f15f83e [file] [log] [blame]
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from google.auth import downscoped
EXPRESSION = (
"resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
)
TITLE = "customer-a-objects"
DESCRIPTION = (
"Condition to make permissions available for objects starting with customer-a"
)
AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket"
AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"]
OTHER_EXPRESSION = (
"resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')"
)
OTHER_TITLE = "customer-b-objects"
OTHER_DESCRIPTION = (
"Condition to make permissions available for objects starting with customer-b"
)
OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
def make_availability_condition(expression, title=None, description=None):
return downscoped.AvailabilityCondition(expression, title, description)
def make_access_boundary_rule(
available_resource, available_permissions, availability_condition=None
):
return downscoped.AccessBoundaryRule(
available_resource, available_permissions, availability_condition
)
def make_credential_access_boundary(rules):
return downscoped.CredentialAccessBoundary(rules)
class TestAvailabilityCondition(object):
def test_constructor(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
assert availability_condition.expression == EXPRESSION
assert availability_condition.title == TITLE
assert availability_condition.description == DESCRIPTION
def test_constructor_required_params_only(self):
availability_condition = make_availability_condition(EXPRESSION)
assert availability_condition.expression == EXPRESSION
assert availability_condition.title is None
assert availability_condition.description is None
def test_setters(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
availability_condition.expression = OTHER_EXPRESSION
availability_condition.title = OTHER_TITLE
availability_condition.description = OTHER_DESCRIPTION
assert availability_condition.expression == OTHER_EXPRESSION
assert availability_condition.title == OTHER_TITLE
assert availability_condition.description == OTHER_DESCRIPTION
def test_invalid_expression_type(self):
with pytest.raises(TypeError) as excinfo:
make_availability_condition([EXPRESSION], TITLE, DESCRIPTION)
assert excinfo.match("The provided expression is not a string.")
def test_invalid_title_type(self):
with pytest.raises(TypeError) as excinfo:
make_availability_condition(EXPRESSION, False, DESCRIPTION)
assert excinfo.match("The provided title is not a string or None.")
def test_invalid_description_type(self):
with pytest.raises(TypeError) as excinfo:
make_availability_condition(EXPRESSION, TITLE, False)
assert excinfo.match("The provided description is not a string or None.")
def test_to_json_required_params_only(self):
availability_condition = make_availability_condition(EXPRESSION)
assert availability_condition.to_json() == {"expression": EXPRESSION}
def test_to_json_(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
assert availability_condition.to_json() == {
"expression": EXPRESSION,
"title": TITLE,
"description": DESCRIPTION,
}
class TestAccessBoundaryRule(object):
def test_constructor(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
assert access_boundary_rule.available_permissions == tuple(
AVAILABLE_PERMISSIONS
)
assert access_boundary_rule.availability_condition == availability_condition
def test_constructor_required_params_only(self):
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
)
assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
assert access_boundary_rule.available_permissions == tuple(
AVAILABLE_PERMISSIONS
)
assert access_boundary_rule.availability_condition is None
def test_setters(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
other_availability_condition = make_availability_condition(
OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE
access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS
access_boundary_rule.availability_condition = other_availability_condition
assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE
assert access_boundary_rule.available_permissions == tuple(
OTHER_AVAILABLE_PERMISSIONS
)
assert (
access_boundary_rule.availability_condition == other_availability_condition
)
def test_invalid_available_resource_type(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
with pytest.raises(TypeError) as excinfo:
make_access_boundary_rule(
None, AVAILABLE_PERMISSIONS, availability_condition
)
assert excinfo.match("The provided available_resource is not a string.")
def test_invalid_available_permissions_type(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
with pytest.raises(TypeError) as excinfo:
make_access_boundary_rule(
AVAILABLE_RESOURCE, [0, 1, 2], availability_condition
)
assert excinfo.match(
"Provided available_permissions are not a list of strings."
)
def test_invalid_available_permissions_value(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
with pytest.raises(ValueError) as excinfo:
make_access_boundary_rule(
AVAILABLE_RESOURCE,
["roles/storage.objectViewer"],
availability_condition,
)
assert excinfo.match("available_permissions must be prefixed with 'inRole:'.")
def test_invalid_availability_condition_type(self):
with pytest.raises(TypeError) as excinfo:
make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"}
)
assert excinfo.match(
"The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
)
def test_to_json(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
assert access_boundary_rule.to_json() == {
"availablePermissions": AVAILABLE_PERMISSIONS,
"availableResource": AVAILABLE_RESOURCE,
"availabilityCondition": {
"expression": EXPRESSION,
"title": TITLE,
"description": DESCRIPTION,
},
}
def test_to_json_required_params_only(self):
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
)
assert access_boundary_rule.to_json() == {
"availablePermissions": AVAILABLE_PERMISSIONS,
"availableResource": AVAILABLE_RESOURCE,
}
class TestCredentialAccessBoundary(object):
def test_constructor(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)
assert credential_access_boundary.rules == tuple(rules)
def test_setters(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
other_availability_condition = make_availability_condition(
OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
)
other_access_boundary_rule = make_access_boundary_rule(
OTHER_AVAILABLE_RESOURCE,
OTHER_AVAILABLE_PERMISSIONS,
other_availability_condition,
)
other_rules = [other_access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)
credential_access_boundary.rules = other_rules
assert credential_access_boundary.rules == tuple(other_rules)
def test_add_rule(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule] * 9
credential_access_boundary = make_credential_access_boundary(rules)
# Add one more rule. This should not raise an error.
additional_access_boundary_rule = make_access_boundary_rule(
OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS
)
credential_access_boundary.add_rule(additional_access_boundary_rule)
assert len(credential_access_boundary.rules) == 10
assert credential_access_boundary.rules[9] == additional_access_boundary_rule
def test_add_rule_invalid_value(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule] * 10
credential_access_boundary = make_credential_access_boundary(rules)
# Add one more rule to exceed maximum allowed rules.
with pytest.raises(ValueError) as excinfo:
credential_access_boundary.add_rule(access_boundary_rule)
assert excinfo.match(
"Credential access boundary rules can have a maximum of 10 rules."
)
assert len(credential_access_boundary.rules) == 10
def test_add_rule_invalid_type(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)
# Add an invalid rule to exceed maximum allowed rules.
with pytest.raises(TypeError) as excinfo:
credential_access_boundary.add_rule("invalid")
assert excinfo.match(
"The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
)
assert len(credential_access_boundary.rules) == 1
assert credential_access_boundary.rules[0] == access_boundary_rule
def test_invalid_rules_type(self):
with pytest.raises(TypeError) as excinfo:
make_credential_access_boundary(["invalid"])
assert excinfo.match(
"List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
)
def test_invalid_rules_value(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
too_many_rules = [access_boundary_rule] * 11
with pytest.raises(ValueError) as excinfo:
make_credential_access_boundary(too_many_rules)
assert excinfo.match(
"Credential access boundary rules can have a maximum of 10 rules."
)
def test_to_json(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)
assert credential_access_boundary.to_json() == {
"accessBoundary": {
"accessBoundaryRules": [
{
"availablePermissions": AVAILABLE_PERMISSIONS,
"availableResource": AVAILABLE_RESOURCE,
"availabilityCondition": {
"expression": EXPRESSION,
"title": TITLE,
"description": DESCRIPTION,
},
}
]
}
}