blob: 9ca95f5aa8564f49d022b44a4a7c9717426eca88 [file] [log] [blame]
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import mock
import pytest
from six.moves import http_client
from six.moves import urllib
from google.auth import _helpers
from google.auth import credentials
from google.auth import downscoped
from google.auth import exceptions
from google.auth import transport
EXPRESSION = (
"resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
)
TITLE = "customer-a-objects"
DESCRIPTION = (
"Condition to make permissions available for objects starting with customer-a"
)
AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket"
AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"]
OTHER_EXPRESSION = (
"resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')"
)
OTHER_TITLE = "customer-b-objects"
OTHER_DESCRIPTION = (
"Condition to make permissions available for objects starting with customer-b"
)
OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token"
SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
SUCCESS_RESPONSE = {
"access_token": "ACCESS_TOKEN",
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
"token_type": "Bearer",
"expires_in": 3600,
}
ERROR_RESPONSE = {
"error": "invalid_grant",
"error_description": "Subject token is invalid.",
"error_uri": "https://tools.ietf.org/html/rfc6749",
}
CREDENTIAL_ACCESS_BOUNDARY_JSON = {
"accessBoundary": {
"accessBoundaryRules": [
{
"availablePermissions": AVAILABLE_PERMISSIONS,
"availableResource": AVAILABLE_RESOURCE,
"availabilityCondition": {
"expression": EXPRESSION,
"title": TITLE,
"description": DESCRIPTION,
},
}
]
}
}
class SourceCredentials(credentials.Credentials):
def __init__(self, raise_error=False, expires_in=3600):
super(SourceCredentials, self).__init__()
self._counter = 0
self._raise_error = raise_error
self._expires_in = expires_in
def refresh(self, request):
if self._raise_error:
raise exceptions.RefreshError(
"Failed to refresh access token in source credentials."
)
now = _helpers.utcnow()
self._counter += 1
self.token = "ACCESS_TOKEN_{}".format(self._counter)
self.expiry = now + datetime.timedelta(seconds=self._expires_in)
def make_availability_condition(expression, title=None, description=None):
return downscoped.AvailabilityCondition(expression, title, description)
def make_access_boundary_rule(
available_resource, available_permissions, availability_condition=None
):
return downscoped.AccessBoundaryRule(
available_resource, available_permissions, availability_condition
)
def make_credential_access_boundary(rules):
return downscoped.CredentialAccessBoundary(rules)
class TestAvailabilityCondition(object):
def test_constructor(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
assert availability_condition.expression == EXPRESSION
assert availability_condition.title == TITLE
assert availability_condition.description == DESCRIPTION
def test_constructor_required_params_only(self):
availability_condition = make_availability_condition(EXPRESSION)
assert availability_condition.expression == EXPRESSION
assert availability_condition.title is None
assert availability_condition.description is None
def test_setters(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
availability_condition.expression = OTHER_EXPRESSION
availability_condition.title = OTHER_TITLE
availability_condition.description = OTHER_DESCRIPTION
assert availability_condition.expression == OTHER_EXPRESSION
assert availability_condition.title == OTHER_TITLE
assert availability_condition.description == OTHER_DESCRIPTION
def test_invalid_expression_type(self):
with pytest.raises(TypeError) as excinfo:
make_availability_condition([EXPRESSION], TITLE, DESCRIPTION)
assert excinfo.match("The provided expression is not a string.")
def test_invalid_title_type(self):
with pytest.raises(TypeError) as excinfo:
make_availability_condition(EXPRESSION, False, DESCRIPTION)
assert excinfo.match("The provided title is not a string or None.")
def test_invalid_description_type(self):
with pytest.raises(TypeError) as excinfo:
make_availability_condition(EXPRESSION, TITLE, False)
assert excinfo.match("The provided description is not a string or None.")
def test_to_json_required_params_only(self):
availability_condition = make_availability_condition(EXPRESSION)
assert availability_condition.to_json() == {"expression": EXPRESSION}
def test_to_json_(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
assert availability_condition.to_json() == {
"expression": EXPRESSION,
"title": TITLE,
"description": DESCRIPTION,
}
class TestAccessBoundaryRule(object):
def test_constructor(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
assert access_boundary_rule.available_permissions == tuple(
AVAILABLE_PERMISSIONS
)
assert access_boundary_rule.availability_condition == availability_condition
def test_constructor_required_params_only(self):
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
)
assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
assert access_boundary_rule.available_permissions == tuple(
AVAILABLE_PERMISSIONS
)
assert access_boundary_rule.availability_condition is None
def test_setters(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
other_availability_condition = make_availability_condition(
OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE
access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS
access_boundary_rule.availability_condition = other_availability_condition
assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE
assert access_boundary_rule.available_permissions == tuple(
OTHER_AVAILABLE_PERMISSIONS
)
assert (
access_boundary_rule.availability_condition == other_availability_condition
)
def test_invalid_available_resource_type(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
with pytest.raises(TypeError) as excinfo:
make_access_boundary_rule(
None, AVAILABLE_PERMISSIONS, availability_condition
)
assert excinfo.match("The provided available_resource is not a string.")
def test_invalid_available_permissions_type(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
with pytest.raises(TypeError) as excinfo:
make_access_boundary_rule(
AVAILABLE_RESOURCE, [0, 1, 2], availability_condition
)
assert excinfo.match(
"Provided available_permissions are not a list of strings."
)
def test_invalid_available_permissions_value(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
with pytest.raises(ValueError) as excinfo:
make_access_boundary_rule(
AVAILABLE_RESOURCE,
["roles/storage.objectViewer"],
availability_condition,
)
assert excinfo.match("available_permissions must be prefixed with 'inRole:'.")
def test_invalid_availability_condition_type(self):
with pytest.raises(TypeError) as excinfo:
make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"}
)
assert excinfo.match(
"The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
)
def test_to_json(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
assert access_boundary_rule.to_json() == {
"availablePermissions": AVAILABLE_PERMISSIONS,
"availableResource": AVAILABLE_RESOURCE,
"availabilityCondition": {
"expression": EXPRESSION,
"title": TITLE,
"description": DESCRIPTION,
},
}
def test_to_json_required_params_only(self):
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
)
assert access_boundary_rule.to_json() == {
"availablePermissions": AVAILABLE_PERMISSIONS,
"availableResource": AVAILABLE_RESOURCE,
}
class TestCredentialAccessBoundary(object):
def test_constructor(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)
assert credential_access_boundary.rules == tuple(rules)
def test_setters(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
other_availability_condition = make_availability_condition(
OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
)
other_access_boundary_rule = make_access_boundary_rule(
OTHER_AVAILABLE_RESOURCE,
OTHER_AVAILABLE_PERMISSIONS,
other_availability_condition,
)
other_rules = [other_access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)
credential_access_boundary.rules = other_rules
assert credential_access_boundary.rules == tuple(other_rules)
def test_add_rule(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule] * 9
credential_access_boundary = make_credential_access_boundary(rules)
# Add one more rule. This should not raise an error.
additional_access_boundary_rule = make_access_boundary_rule(
OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS
)
credential_access_boundary.add_rule(additional_access_boundary_rule)
assert len(credential_access_boundary.rules) == 10
assert credential_access_boundary.rules[9] == additional_access_boundary_rule
def test_add_rule_invalid_value(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule] * 10
credential_access_boundary = make_credential_access_boundary(rules)
# Add one more rule to exceed maximum allowed rules.
with pytest.raises(ValueError) as excinfo:
credential_access_boundary.add_rule(access_boundary_rule)
assert excinfo.match(
"Credential access boundary rules can have a maximum of 10 rules."
)
assert len(credential_access_boundary.rules) == 10
def test_add_rule_invalid_type(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)
# Add an invalid rule to exceed maximum allowed rules.
with pytest.raises(TypeError) as excinfo:
credential_access_boundary.add_rule("invalid")
assert excinfo.match(
"The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
)
assert len(credential_access_boundary.rules) == 1
assert credential_access_boundary.rules[0] == access_boundary_rule
def test_invalid_rules_type(self):
with pytest.raises(TypeError) as excinfo:
make_credential_access_boundary(["invalid"])
assert excinfo.match(
"List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
)
def test_invalid_rules_value(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
too_many_rules = [access_boundary_rule] * 11
with pytest.raises(ValueError) as excinfo:
make_credential_access_boundary(too_many_rules)
assert excinfo.match(
"Credential access boundary rules can have a maximum of 10 rules."
)
def test_to_json(self):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)
assert credential_access_boundary.to_json() == {
"accessBoundary": {
"accessBoundaryRules": [
{
"availablePermissions": AVAILABLE_PERMISSIONS,
"availableResource": AVAILABLE_RESOURCE,
"availabilityCondition": {
"expression": EXPRESSION,
"title": TITLE,
"description": DESCRIPTION,
},
}
]
}
}
class TestCredentials(object):
@staticmethod
def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None):
availability_condition = make_availability_condition(
EXPRESSION, TITLE, DESCRIPTION
)
access_boundary_rule = make_access_boundary_rule(
AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
)
rules = [access_boundary_rule]
credential_access_boundary = make_credential_access_boundary(rules)
return downscoped.Credentials(
source_credentials, credential_access_boundary, quota_project_id
)
@staticmethod
def make_mock_request(data, status=http_client.OK):
response = mock.create_autospec(transport.Response, instance=True)
response.status = status
response.data = json.dumps(data).encode("utf-8")
request = mock.create_autospec(transport.Request)
request.return_value = response
return request
@staticmethod
def assert_request_kwargs(request_kwargs, headers, request_data):
"""Asserts the request was called with the expected parameters.
"""
assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT
assert request_kwargs["method"] == "POST"
assert request_kwargs["headers"] == headers
assert request_kwargs["body"] is not None
body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
for (k, v) in body_tuples:
assert v.decode("utf-8") == request_data[k.decode("utf-8")]
assert len(body_tuples) == len(request_data.keys())
def test_default_state(self):
credentials = self.make_credentials()
# No token acquired yet.
assert not credentials.token
assert not credentials.valid
# Expiration hasn't been set yet.
assert not credentials.expiry
assert not credentials.expired
# No quota project ID set.
assert not credentials.quota_project_id
def test_with_quota_project(self):
credentials = self.make_credentials()
assert not credentials.quota_project_id
quota_project_creds = credentials.with_quota_project("project-foo")
assert quota_project_creds.quota_project_id == "project-foo"
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh(self, unused_utcnow):
response = SUCCESS_RESPONSE.copy()
# Test custom expiration to confirm expiry is set correctly.
response["expires_in"] = 2800
expected_expiry = datetime.datetime.min + datetime.timedelta(
seconds=response["expires_in"]
)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
"grant_type": GRANT_TYPE,
"subject_token": "ACCESS_TOKEN_1",
"subject_token_type": SUBJECT_TOKEN_TYPE,
"requested_token_type": REQUESTED_TOKEN_TYPE,
"options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
}
request = self.make_mock_request(status=http_client.OK, data=response)
source_credentials = SourceCredentials()
credentials = self.make_credentials(source_credentials=source_credentials)
# Spy on calls to source credentials refresh to confirm the expected request
# instance is used.
with mock.patch.object(
source_credentials, "refresh", wraps=source_credentials.refresh
) as wrapped_souce_cred_refresh:
credentials.refresh(request)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == response["access_token"]
# Confirm source credentials called with the same request instance.
wrapped_souce_cred_refresh.assert_called_with(request)
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_without_response_expires_in(self, unused_utcnow):
response = SUCCESS_RESPONSE.copy()
# Simulate the response is missing the expires_in field.
# The downscoped token expiration should match the source credentials
# expiration.
del response["expires_in"]
expected_expires_in = 1800
# Simulate the source credentials generates a token with 1800 second
# expiration time. The generated downscoped token should have the same
# expiration time.
source_credentials = SourceCredentials(expires_in=expected_expires_in)
expected_expiry = datetime.datetime.min + datetime.timedelta(
seconds=expected_expires_in
)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
"grant_type": GRANT_TYPE,
"subject_token": "ACCESS_TOKEN_1",
"subject_token_type": SUBJECT_TOKEN_TYPE,
"requested_token_type": REQUESTED_TOKEN_TYPE,
"options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
}
request = self.make_mock_request(status=http_client.OK, data=response)
credentials = self.make_credentials(source_credentials=source_credentials)
# Spy on calls to source credentials refresh to confirm the expected request
# instance is used.
with mock.patch.object(
source_credentials, "refresh", wraps=source_credentials.refresh
) as wrapped_souce_cred_refresh:
credentials.refresh(request)
self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == response["access_token"]
# Confirm source credentials called with the same request instance.
wrapped_souce_cred_refresh.assert_called_with(request)
def test_refresh_token_exchange_error(self):
request = self.make_mock_request(
status=http_client.BAD_REQUEST, data=ERROR_RESPONSE
)
credentials = self.make_credentials()
with pytest.raises(exceptions.OAuthError) as excinfo:
credentials.refresh(request)
assert excinfo.match(
r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749"
)
assert not credentials.expired
assert credentials.token is None
def test_refresh_source_credentials_refresh_error(self):
# Initialize downscoped credentials with source credentials that raise
# an error on refresh.
credentials = self.make_credentials(
source_credentials=SourceCredentials(raise_error=True)
)
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(mock.sentinel.request)
assert excinfo.match(r"Failed to refresh access token in source credentials.")
assert not credentials.expired
assert credentials.token is None
def test_apply_without_quota_project_id(self):
headers = {}
request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
credentials = self.make_credentials()
credentials.refresh(request)
credentials.apply(headers)
assert headers == {
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
}
def test_apply_with_quota_project_id(self):
headers = {"other": "header-value"}
request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID)
credentials.refresh(request)
credentials.apply(headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": QUOTA_PROJECT_ID,
}
def test_before_request(self):
headers = {"other": "header-value"}
request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
credentials = self.make_credentials()
# First call should call refresh, setting the token.
credentials.before_request(request, "POST", "https://example.com/api", headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
}
# Second call shouldn't call refresh (request should be untouched).
credentials.before_request(
mock.sentinel.request, "POST", "https://example.com/api", headers
)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
}
@mock.patch("google.auth._helpers.utcnow")
def test_before_request_expired(self, utcnow):
headers = {}
request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
credentials = self.make_credentials()
credentials.token = "token"
utcnow.return_value = datetime.datetime.min
# Set the expiration to one second more than now plus the clock skew
# accommodation. These credentials should be valid.
credentials.expiry = (
datetime.datetime.min
+ _helpers.REFRESH_THRESHOLD
+ datetime.timedelta(seconds=1)
)
assert credentials.valid
assert not credentials.expired
credentials.before_request(request, "POST", "https://example.com/api", headers)
# Cached token should be used.
assert headers == {"authorization": "Bearer token"}
# Next call should simulate 1 second passed.
utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
assert not credentials.valid
assert credentials.expired
credentials.before_request(request, "POST", "https://example.com/api", headers)
# New token should be retrieved.
assert headers == {
"authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
}