blob: 3c34f998c9c85999bef80f895c0fedc53f2cb175 [file] [log] [blame]
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import mock
import pytest
from six.moves import http_client
from six.moves import urllib
from google.auth import _helpers
from google.auth import exceptions
from google.auth import external_account
from google.auth import transport
CLIENT_ID = "username"
CLIENT_SECRET = "password"
# Base64 encoding of "username:password"
BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
# List of valid workforce pool audiences.
TEST_USER_AUDIENCES = [
"//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
]
# Workload identity pool audiences or invalid workforce pool audiences.
TEST_NON_USER_AUDIENCES = [
# Legacy K8s audience format.
"identitynamespace:1f12345:my_provider",
(
"//iam.googleapis.com/projects/123456/locations/"
"global/workloadIdentityPools/pool-id/providers/"
"provider-id"
),
(
"//iam.googleapis.com/projects/123456/locations/"
"eu/workloadIdentityPools/pool-id/providers/"
"provider-id"
),
# Pool ID with workforcePools string.
(
"//iam.googleapis.com/projects/123456/locations/"
"global/workloadIdentityPools/workforcePools/providers/"
"provider-id"
),
# Unrealistic / incorrect workforce pool audiences.
"//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
"//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
"//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
"//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
]
class CredentialsImpl(external_account.Credentials):
def __init__(
self,
audience,
subject_token_type,
token_url,
credential_source,
service_account_impersonation_url=None,
client_id=None,
client_secret=None,
quota_project_id=None,
scopes=None,
default_scopes=None,
workforce_pool_user_project=None,
):
super(CredentialsImpl, self).__init__(
audience=audience,
subject_token_type=subject_token_type,
token_url=token_url,
credential_source=credential_source,
service_account_impersonation_url=service_account_impersonation_url,
client_id=client_id,
client_secret=client_secret,
quota_project_id=quota_project_id,
scopes=scopes,
default_scopes=default_scopes,
workforce_pool_user_project=workforce_pool_user_project,
)
self._counter = 0
def retrieve_subject_token(self, request):
counter = self._counter
self._counter += 1
return "subject_token_{}".format(counter)
class TestCredentials(object):
TOKEN_URL = "https://sts.googleapis.com/v1/token"
PROJECT_NUMBER = "123456"
POOL_ID = "POOL_ID"
PROVIDER_ID = "PROVIDER_ID"
AUDIENCE = (
"//iam.googleapis.com/projects/{}"
"/locations/global/workloadIdentityPools/{}"
"/providers/{}"
).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
WORKFORCE_AUDIENCE = (
"//iam.googleapis.com/locations/global/workforcePools/{}/providers/{}"
).format(POOL_ID, PROVIDER_ID)
WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
SUCCESS_RESPONSE = {
"access_token": "ACCESS_TOKEN",
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
"token_type": "Bearer",
"expires_in": 3600,
"scope": "scope1 scope2",
}
ERROR_RESPONSE = {
"error": "invalid_request",
"error_description": "Invalid subject token",
"error_uri": "https://tools.ietf.org/html/rfc6749",
}
QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
SERVICE_ACCOUNT_IMPERSONATION_URL = (
"https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
+ "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
)
SCOPES = ["scope1", "scope2"]
IMPERSONATION_ERROR_RESPONSE = {
"error": {
"code": 400,
"message": "Request contains an invalid argument",
"status": "INVALID_ARGUMENT",
}
}
PROJECT_ID = "my-proj-id"
CLOUD_RESOURCE_MANAGER_URL = (
"https://cloudresourcemanager.googleapis.com/v1/projects/"
)
CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
"projectNumber": PROJECT_NUMBER,
"projectId": PROJECT_ID,
"lifecycleState": "ACTIVE",
"name": "project-name",
"createTime": "2018-11-06T04:42:54.109Z",
"parent": {"type": "folder", "id": "12345678901"},
}
@classmethod
def make_credentials(
cls,
client_id=None,
client_secret=None,
quota_project_id=None,
scopes=None,
default_scopes=None,
service_account_impersonation_url=None,
):
return CredentialsImpl(
audience=cls.AUDIENCE,
subject_token_type=cls.SUBJECT_TOKEN_TYPE,
token_url=cls.TOKEN_URL,
service_account_impersonation_url=service_account_impersonation_url,
credential_source=cls.CREDENTIAL_SOURCE,
client_id=client_id,
client_secret=client_secret,
quota_project_id=quota_project_id,
scopes=scopes,
default_scopes=default_scopes,
)
@classmethod
def make_workforce_pool_credentials(
cls,
client_id=None,
client_secret=None,
quota_project_id=None,
scopes=None,
default_scopes=None,
service_account_impersonation_url=None,
workforce_pool_user_project=None,
):
return CredentialsImpl(
audience=cls.WORKFORCE_AUDIENCE,
subject_token_type=cls.WORKFORCE_SUBJECT_TOKEN_TYPE,
token_url=cls.TOKEN_URL,
service_account_impersonation_url=service_account_impersonation_url,
credential_source=cls.CREDENTIAL_SOURCE,
client_id=client_id,
client_secret=client_secret,
quota_project_id=quota_project_id,
scopes=scopes,
default_scopes=default_scopes,
workforce_pool_user_project=workforce_pool_user_project,
)
@classmethod
def make_mock_request(
cls,
status=http_client.OK,
data=None,
impersonation_status=None,
impersonation_data=None,
cloud_resource_manager_status=None,
cloud_resource_manager_data=None,
):
# STS token exchange request.
token_response = mock.create_autospec(transport.Response, instance=True)
token_response.status = status
token_response.data = json.dumps(data).encode("utf-8")
responses = [token_response]
# If service account impersonation is requested, mock the expected response.
if impersonation_status:
impersonation_response = mock.create_autospec(
transport.Response, instance=True
)
impersonation_response.status = impersonation_status
impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
responses.append(impersonation_response)
# If cloud resource manager is requested, mock the expected response.
if cloud_resource_manager_status:
cloud_resource_manager_response = mock.create_autospec(
transport.Response, instance=True
)
cloud_resource_manager_response.status = cloud_resource_manager_status
cloud_resource_manager_response.data = json.dumps(
cloud_resource_manager_data
).encode("utf-8")
responses.append(cloud_resource_manager_response)
request = mock.create_autospec(transport.Request)
request.side_effect = responses
return request
@classmethod
def assert_token_request_kwargs(cls, request_kwargs, headers, request_data):
assert request_kwargs["url"] == cls.TOKEN_URL
assert request_kwargs["method"] == "POST"
assert request_kwargs["headers"] == headers
assert request_kwargs["body"] is not None
body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
for (k, v) in body_tuples:
assert v.decode("utf-8") == request_data[k.decode("utf-8")]
assert len(body_tuples) == len(request_data.keys())
@classmethod
def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data):
assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
assert request_kwargs["method"] == "POST"
assert request_kwargs["headers"] == headers
assert request_kwargs["body"] is not None
body_json = json.loads(request_kwargs["body"].decode("utf-8"))
assert body_json == request_data
@classmethod
def assert_resource_manager_request_kwargs(
cls, request_kwargs, project_number, headers
):
assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
assert request_kwargs["method"] == "GET"
assert request_kwargs["headers"] == headers
assert "body" not in request_kwargs
def test_default_state(self):
credentials = self.make_credentials()
# Not token acquired yet
assert not credentials.token
assert not credentials.valid
# Expiration hasn't been set yet
assert not credentials.expiry
assert not credentials.expired
# Scopes are required
assert not credentials.scopes
assert credentials.requires_scopes
assert not credentials.quota_project_id
def test_nonworkforce_with_workforce_pool_user_project(self):
with pytest.raises(ValueError) as excinfo:
CredentialsImpl(
audience=self.AUDIENCE,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
)
assert excinfo.match(
"workforce_pool_user_project should not be set for non-workforce "
"pool credentials"
)
def test_with_scopes(self):
credentials = self.make_credentials()
assert not credentials.scopes
assert credentials.requires_scopes
scoped_credentials = credentials.with_scopes(["email"])
assert scoped_credentials.has_scopes(["email"])
assert not scoped_credentials.requires_scopes
def test_with_scopes_workforce_pool(self):
credentials = self.make_workforce_pool_credentials(
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
)
assert not credentials.scopes
assert credentials.requires_scopes
scoped_credentials = credentials.with_scopes(["email"])
assert scoped_credentials.has_scopes(["email"])
assert not scoped_credentials.requires_scopes
assert (
scoped_credentials.info.get("workforce_pool_user_project")
== self.WORKFORCE_POOL_USER_PROJECT
)
def test_with_scopes_using_user_and_default_scopes(self):
credentials = self.make_credentials()
assert not credentials.scopes
assert credentials.requires_scopes
scoped_credentials = credentials.with_scopes(
["email"], default_scopes=["profile"]
)
assert scoped_credentials.has_scopes(["email"])
assert not scoped_credentials.has_scopes(["profile"])
assert not scoped_credentials.requires_scopes
assert scoped_credentials.scopes == ["email"]
assert scoped_credentials.default_scopes == ["profile"]
def test_with_scopes_using_default_scopes_only(self):
credentials = self.make_credentials()
assert not credentials.scopes
assert credentials.requires_scopes
scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
assert scoped_credentials.has_scopes(["profile"])
assert not scoped_credentials.requires_scopes
def test_with_scopes_full_options_propagated(self):
credentials = self.make_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
quota_project_id=self.QUOTA_PROJECT_ID,
scopes=self.SCOPES,
default_scopes=["default1"],
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
)
with mock.patch.object(
external_account.Credentials, "__init__", return_value=None
) as mock_init:
credentials.with_scopes(["email"], ["default2"])
# Confirm with_scopes initialized the credential with the expected
# parameters and scopes.
mock_init.assert_called_once_with(
audience=self.AUDIENCE,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
quota_project_id=self.QUOTA_PROJECT_ID,
scopes=["email"],
default_scopes=["default2"],
workforce_pool_user_project=None,
)
def test_with_quota_project(self):
credentials = self.make_credentials()
assert not credentials.scopes
assert not credentials.quota_project_id
quota_project_creds = credentials.with_quota_project("project-foo")
assert quota_project_creds.quota_project_id == "project-foo"
def test_with_quota_project_workforce_pool(self):
credentials = self.make_workforce_pool_credentials(
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
)
assert not credentials.scopes
assert not credentials.quota_project_id
quota_project_creds = credentials.with_quota_project("project-foo")
assert quota_project_creds.quota_project_id == "project-foo"
assert (
quota_project_creds.info.get("workforce_pool_user_project")
== self.WORKFORCE_POOL_USER_PROJECT
)
def test_with_quota_project_full_options_propagated(self):
credentials = self.make_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
quota_project_id=self.QUOTA_PROJECT_ID,
scopes=self.SCOPES,
default_scopes=["default1"],
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
)
with mock.patch.object(
external_account.Credentials, "__init__", return_value=None
) as mock_init:
credentials.with_quota_project("project-foo")
# Confirm with_quota_project initialized the credential with the
# expected parameters and quota project ID.
mock_init.assert_called_once_with(
audience=self.AUDIENCE,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
quota_project_id="project-foo",
scopes=self.SCOPES,
default_scopes=["default1"],
workforce_pool_user_project=None,
)
def test_with_invalid_impersonation_target_principal(self):
invalid_url = "https://iamcredentials.googleapis.com/v1/invalid"
with pytest.raises(exceptions.RefreshError) as excinfo:
self.make_credentials(service_account_impersonation_url=invalid_url)
assert excinfo.match(
r"Unable to determine target principal from service account impersonation URL."
)
def test_info(self):
credentials = self.make_credentials()
assert credentials.info == {
"type": "external_account",
"audience": self.AUDIENCE,
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"token_url": self.TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE.copy(),
}
def test_info_workforce_pool(self):
credentials = self.make_workforce_pool_credentials(
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
)
assert credentials.info == {
"type": "external_account",
"audience": self.WORKFORCE_AUDIENCE,
"subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
"token_url": self.TOKEN_URL,
"credential_source": self.CREDENTIAL_SOURCE.copy(),
"workforce_pool_user_project": self.WORKFORCE_POOL_USER_PROJECT,
}
def test_info_with_full_options(self):
credentials = self.make_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
quota_project_id=self.QUOTA_PROJECT_ID,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
)
assert credentials.info == {
"type": "external_account",
"audience": self.AUDIENCE,
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"token_url": self.TOKEN_URL,
"service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
"credential_source": self.CREDENTIAL_SOURCE.copy(),
"quota_project_id": self.QUOTA_PROJECT_ID,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}
def test_service_account_email_without_impersonation(self):
credentials = self.make_credentials()
assert credentials.service_account_email is None
def test_service_account_email_with_impersonation(self):
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
)
assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
@pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
def test_is_user_with_non_users(self, audience):
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)
assert credentials.is_user is False
@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
def test_is_user_with_users(self, audience):
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)
assert credentials.is_user is True
@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
def test_is_user_with_users_and_impersonation(self, audience):
# Initialize the credentials with service account impersonation.
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
)
# Even though the audience is for a workforce pool, since service account
# impersonation is used, the credentials will represent a service account and
# not a user.
assert credentials.is_user is False
@pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
def test_is_workforce_pool_with_non_users(self, audience):
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)
assert credentials.is_workforce_pool is False
@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
def test_is_workforce_pool_with_users(self, audience):
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)
assert credentials.is_workforce_pool is True
@pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
def test_is_workforce_pool_with_users_and_impersonation(self, audience):
# Initialize the credentials with workforce audience and service account
# impersonation.
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
)
# Even though impersonation is used, is_workforce_pool should still return True.
assert credentials.is_workforce_pool is True
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_without_client_auth_success(self, unused_utcnow):
response = self.SUCCESS_RESPONSE.copy()
# Test custom expiration to confirm expiry is set correctly.
response["expires_in"] = 2800
expected_expiry = datetime.datetime.min + datetime.timedelta(
seconds=response["expires_in"]
)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
}
request = self.make_mock_request(status=http_client.OK, data=response)
credentials = self.make_credentials()
credentials.refresh(request)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == response["access_token"]
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_workforce_without_client_auth_success(self, unused_utcnow):
response = self.SUCCESS_RESPONSE.copy()
# Test custom expiration to confirm expiry is set correctly.
response["expires_in"] = 2800
expected_expiry = datetime.datetime.min + datetime.timedelta(
seconds=response["expires_in"]
)
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.WORKFORCE_AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
"options": urllib.parse.quote(
json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
),
}
request = self.make_mock_request(status=http_client.OK, data=response)
credentials = self.make_workforce_pool_credentials(
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
)
credentials.refresh(request)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == response["access_token"]
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_workforce_with_client_auth_success(self, unused_utcnow):
response = self.SUCCESS_RESPONSE.copy()
# Test custom expiration to confirm expiry is set correctly.
response["expires_in"] = 2800
expected_expiry = datetime.datetime.min + datetime.timedelta(
seconds=response["expires_in"]
)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
}
request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.WORKFORCE_AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
}
request = self.make_mock_request(status=http_client.OK, data=response)
# Client Auth will have higher priority over workforce_pool_user_project.
credentials = self.make_workforce_pool_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
)
credentials.refresh(request)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == response["access_token"]
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_workforce_with_client_auth_and_no_workforce_project_success(
self, unused_utcnow
):
response = self.SUCCESS_RESPONSE.copy()
# Test custom expiration to confirm expiry is set correctly.
response["expires_in"] = 2800
expected_expiry = datetime.datetime.min + datetime.timedelta(
seconds=response["expires_in"]
)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
}
request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.WORKFORCE_AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
}
request = self.make_mock_request(status=http_client.OK, data=response)
# Client Auth will be sufficient for user project determination.
credentials = self.make_workforce_pool_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
workforce_pool_user_project=None,
)
credentials.refresh(request)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == response["access_token"]
def test_refresh_impersonation_without_client_auth_success(self):
# Simulate service account access token expires in 2800 seconds.
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
).isoformat("T") + "Z"
expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
# STS token exchange request/response.
token_response = self.SUCCESS_RESPONSE.copy()
token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
token_request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"scope": "https://www.googleapis.com/auth/iam",
}
# Service account impersonation request/response.
impersonation_response = {
"accessToken": "SA_ACCESS_TOKEN",
"expireTime": expire_time,
}
impersonation_headers = {
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
}
impersonation_request_data = {
"delegates": None,
"scope": self.SCOPES,
"lifetime": "3600s",
}
# Initialize mock request to handle token exchange and service account
# impersonation request.
request = self.make_mock_request(
status=http_client.OK,
data=token_response,
impersonation_status=http_client.OK,
impersonation_data=impersonation_response,
)
# Initialize credentials with service account impersonation.
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
scopes=self.SCOPES,
)
credentials.refresh(request)
# Only 2 requests should be processed.
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == impersonation_response["accessToken"]
def test_refresh_workforce_impersonation_without_client_auth_success(self):
# Simulate service account access token expires in 2800 seconds.
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
).isoformat("T") + "Z"
expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
# STS token exchange request/response.
token_response = self.SUCCESS_RESPONSE.copy()
token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
token_request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.WORKFORCE_AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
"scope": "https://www.googleapis.com/auth/iam",
"options": urllib.parse.quote(
json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
),
}
# Service account impersonation request/response.
impersonation_response = {
"accessToken": "SA_ACCESS_TOKEN",
"expireTime": expire_time,
}
impersonation_headers = {
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
}
impersonation_request_data = {
"delegates": None,
"scope": self.SCOPES,
"lifetime": "3600s",
}
# Initialize mock request to handle token exchange and service account
# impersonation request.
request = self.make_mock_request(
status=http_client.OK,
data=token_response,
impersonation_status=http_client.OK,
impersonation_data=impersonation_response,
)
# Initialize credentials with service account impersonation.
credentials = self.make_workforce_pool_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
scopes=self.SCOPES,
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
)
credentials.refresh(request)
# Only 2 requests should be processed.
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == impersonation_response["accessToken"]
def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
self,
):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"scope": "scope1 scope2",
"subject_token": "subject_token_0",
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
}
request = self.make_mock_request(
status=http_client.OK, data=self.SUCCESS_RESPONSE
)
credentials = self.make_credentials(
scopes=["scope1", "scope2"],
# Default scopes will be ignored in favor of user scopes.
default_scopes=["ignored"],
)
credentials.refresh(request)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
assert credentials.has_scopes(["scope1", "scope2"])
assert not credentials.has_scopes(["ignored"])
def test_refresh_without_client_auth_success_explicit_default_scopes_only(self):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"scope": "scope1 scope2",
"subject_token": "subject_token_0",
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
}
request = self.make_mock_request(
status=http_client.OK, data=self.SUCCESS_RESPONSE
)
credentials = self.make_credentials(
scopes=None,
# Default scopes will be used since user scopes are none.
default_scopes=["scope1", "scope2"],
)
credentials.refresh(request)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
assert credentials.has_scopes(["scope1", "scope2"])
def test_refresh_without_client_auth_error(self):
request = self.make_mock_request(
status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
)
credentials = self.make_credentials()
with pytest.raises(exceptions.OAuthError) as excinfo:
credentials.refresh(request)
assert excinfo.match(
r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
)
assert not credentials.expired
assert credentials.token is None
def test_refresh_impersonation_without_client_auth_error(self):
request = self.make_mock_request(
status=http_client.OK,
data=self.SUCCESS_RESPONSE,
impersonation_status=http_client.BAD_REQUEST,
impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
)
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
scopes=self.SCOPES,
)
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
assert excinfo.match(r"Unable to acquire impersonated credentials")
assert not credentials.expired
assert credentials.token is None
def test_refresh_with_client_auth_success(self):
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
}
request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
}
request = self.make_mock_request(
status=http_client.OK, data=self.SUCCESS_RESPONSE
)
credentials = self.make_credentials(
client_id=CLIENT_ID, client_secret=CLIENT_SECRET
)
credentials.refresh(request)
self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(self):
# Simulate service account access token expires in 2800 seconds.
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
).isoformat("T") + "Z"
expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
# STS token exchange request/response.
token_response = self.SUCCESS_RESPONSE.copy()
token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
}
token_request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"scope": "https://www.googleapis.com/auth/iam",
}
# Service account impersonation request/response.
impersonation_response = {
"accessToken": "SA_ACCESS_TOKEN",
"expireTime": expire_time,
}
impersonation_headers = {
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
}
impersonation_request_data = {
"delegates": None,
"scope": self.SCOPES,
"lifetime": "3600s",
}
# Initialize mock request to handle token exchange and service account
# impersonation request.
request = self.make_mock_request(
status=http_client.OK,
data=token_response,
impersonation_status=http_client.OK,
impersonation_data=impersonation_response,
)
# Initialize credentials with service account impersonation and basic auth.
credentials = self.make_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
scopes=self.SCOPES,
# Default scopes will be ignored since user scopes are specified.
default_scopes=["ignored"],
)
credentials.refresh(request)
# Only 2 requests should be processed.
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == impersonation_response["accessToken"]
def test_refresh_impersonation_with_client_auth_success_use_default_scopes(self):
# Simulate service account access token expires in 2800 seconds.
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
).isoformat("T") + "Z"
expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
# STS token exchange request/response.
token_response = self.SUCCESS_RESPONSE.copy()
token_headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
}
token_request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"scope": "https://www.googleapis.com/auth/iam",
}
# Service account impersonation request/response.
impersonation_response = {
"accessToken": "SA_ACCESS_TOKEN",
"expireTime": expire_time,
}
impersonation_headers = {
"Content-Type": "application/json",
"authorization": "Bearer {}".format(token_response["access_token"]),
}
impersonation_request_data = {
"delegates": None,
"scope": self.SCOPES,
"lifetime": "3600s",
}
# Initialize mock request to handle token exchange and service account
# impersonation request.
request = self.make_mock_request(
status=http_client.OK,
data=token_response,
impersonation_status=http_client.OK,
impersonation_data=impersonation_response,
)
# Initialize credentials with service account impersonation and basic auth.
credentials = self.make_credentials(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
scopes=None,
# Default scopes will be used since user specified scopes are none.
default_scopes=self.SCOPES,
)
credentials.refresh(request)
# Only 2 requests should be processed.
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == impersonation_response["accessToken"]
def test_apply_without_quota_project_id(self):
headers = {}
request = self.make_mock_request(
status=http_client.OK, data=self.SUCCESS_RESPONSE
)
credentials = self.make_credentials()
credentials.refresh(request)
credentials.apply(headers)
assert headers == {
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
}
def test_apply_workforce_without_quota_project_id(self):
headers = {}
request = self.make_mock_request(
status=http_client.OK, data=self.SUCCESS_RESPONSE
)
credentials = self.make_workforce_pool_credentials(
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
)
credentials.refresh(request)
credentials.apply(headers)
assert headers == {
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
}
def test_apply_impersonation_without_quota_project_id(self):
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
).isoformat("T") + "Z"
# Service account impersonation response.
impersonation_response = {
"accessToken": "SA_ACCESS_TOKEN",
"expireTime": expire_time,
}
# Initialize mock request to handle token exchange and service account
# impersonation request.
request = self.make_mock_request(
status=http_client.OK,
data=self.SUCCESS_RESPONSE.copy(),
impersonation_status=http_client.OK,
impersonation_data=impersonation_response,
)
# Initialize credentials with service account impersonation.
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
scopes=self.SCOPES,
)
headers = {}
credentials.refresh(request)
credentials.apply(headers)
assert headers == {
"authorization": "Bearer {}".format(impersonation_response["accessToken"])
}
def test_apply_with_quota_project_id(self):
headers = {"other": "header-value"}
request = self.make_mock_request(
status=http_client.OK, data=self.SUCCESS_RESPONSE
)
credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
credentials.refresh(request)
credentials.apply(headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
"x-goog-user-project": self.QUOTA_PROJECT_ID,
}
def test_apply_impersonation_with_quota_project_id(self):
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
).isoformat("T") + "Z"
# Service account impersonation response.
impersonation_response = {
"accessToken": "SA_ACCESS_TOKEN",
"expireTime": expire_time,
}
# Initialize mock request to handle token exchange and service account
# impersonation request.
request = self.make_mock_request(
status=http_client.OK,
data=self.SUCCESS_RESPONSE.copy(),
impersonation_status=http_client.OK,
impersonation_data=impersonation_response,
)
# Initialize credentials with service account impersonation.
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
scopes=self.SCOPES,
quota_project_id=self.QUOTA_PROJECT_ID,
)
headers = {"other": "header-value"}
credentials.refresh(request)
credentials.apply(headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
"x-goog-user-project": self.QUOTA_PROJECT_ID,
}
def test_before_request(self):
headers = {"other": "header-value"}
request = self.make_mock_request(
status=http_client.OK, data=self.SUCCESS_RESPONSE
)
credentials = self.make_credentials()
# First call should call refresh, setting the token.
credentials.before_request(request, "POST", "https://example.com/api", headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
}
# Second call shouldn't call refresh.
credentials.before_request(request, "POST", "https://example.com/api", headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
}
def test_before_request_workforce(self):
headers = {"other": "header-value"}
request = self.make_mock_request(
status=http_client.OK, data=self.SUCCESS_RESPONSE
)
credentials = self.make_workforce_pool_credentials(
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
)
# First call should call refresh, setting the token.
credentials.before_request(request, "POST", "https://example.com/api", headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
}
# Second call shouldn't call refresh.
credentials.before_request(request, "POST", "https://example.com/api", headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
}
def test_before_request_impersonation(self):
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
).isoformat("T") + "Z"
# Service account impersonation response.
impersonation_response = {
"accessToken": "SA_ACCESS_TOKEN",
"expireTime": expire_time,
}
# Initialize mock request to handle token exchange and service account
# impersonation request.
request = self.make_mock_request(
status=http_client.OK,
data=self.SUCCESS_RESPONSE.copy(),
impersonation_status=http_client.OK,
impersonation_data=impersonation_response,
)
headers = {"other": "header-value"}
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
)
# First call should call refresh, setting the token.
credentials.before_request(request, "POST", "https://example.com/api", headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
}
# Second call shouldn't call refresh.
credentials.before_request(request, "POST", "https://example.com/api", headers)
assert headers == {
"other": "header-value",
"authorization": "Bearer {}".format(impersonation_response["accessToken"]),
}
@mock.patch("google.auth._helpers.utcnow")
def test_before_request_expired(self, utcnow):
headers = {}
request = self.make_mock_request(
status=http_client.OK, data=self.SUCCESS_RESPONSE
)
credentials = self.make_credentials()
credentials.token = "token"
utcnow.return_value = datetime.datetime.min
# Set the expiration to one second more than now plus the clock skew
# accomodation. These credentials should be valid.
credentials.expiry = (
datetime.datetime.min
+ _helpers.REFRESH_THRESHOLD
+ datetime.timedelta(seconds=1)
)
assert credentials.valid
assert not credentials.expired
credentials.before_request(request, "POST", "https://example.com/api", headers)
# Cached token should be used.
assert headers == {"authorization": "Bearer token"}
# Next call should simulate 1 second passed.
utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
assert not credentials.valid
assert credentials.expired
credentials.before_request(request, "POST", "https://example.com/api", headers)
# New token should be retrieved.
assert headers == {
"authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
}
@mock.patch("google.auth._helpers.utcnow")
def test_before_request_impersonation_expired(self, utcnow):
headers = {}
expire_time = (
datetime.datetime.min + datetime.timedelta(seconds=3601)
).isoformat("T") + "Z"
# Service account impersonation response.
impersonation_response = {
"accessToken": "SA_ACCESS_TOKEN",
"expireTime": expire_time,
}
# Initialize mock request to handle token exchange and service account
# impersonation request.
request = self.make_mock_request(
status=http_client.OK,
data=self.SUCCESS_RESPONSE.copy(),
impersonation_status=http_client.OK,
impersonation_data=impersonation_response,
)
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
)
credentials.token = "token"
utcnow.return_value = datetime.datetime.min
# Set the expiration to one second more than now plus the clock skew
# accomodation. These credentials should be valid.
credentials.expiry = (
datetime.datetime.min
+ _helpers.REFRESH_THRESHOLD
+ datetime.timedelta(seconds=1)
)
assert credentials.valid
assert not credentials.expired
credentials.before_request(request, "POST", "https://example.com/api", headers)
# Cached token should be used.
assert headers == {"authorization": "Bearer token"}
# Next call should simulate 1 second passed. This will trigger the expiration
# threshold.
utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
assert not credentials.valid
assert credentials.expired
credentials.before_request(request, "POST", "https://example.com/api", headers)
# New token should be retrieved.
assert headers == {
"authorization": "Bearer {}".format(impersonation_response["accessToken"])
}
@pytest.mark.parametrize(
"audience",
[
# Legacy K8s audience format.
"identitynamespace:1f12345:my_provider",
# Unrealistic audiences.
"//iam.googleapis.com/projects",
"//iam.googleapis.com/projects/",
"//iam.googleapis.com/project/123456",
"//iam.googleapis.com/projects//123456",
"//iam.googleapis.com/prefix_projects/123456",
"//iam.googleapis.com/projects_suffix/123456",
],
)
def test_project_number_indeterminable(self, audience):
credentials = CredentialsImpl(
audience=audience,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)
assert credentials.project_number is None
assert credentials.get_project_id(None) is None
def test_project_number_determinable(self):
credentials = CredentialsImpl(
audience=self.AUDIENCE,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)
assert credentials.project_number == self.PROJECT_NUMBER
def test_project_number_workforce(self):
credentials = CredentialsImpl(
audience=self.WORKFORCE_AUDIENCE,
subject_token_type=self.WORKFORCE_SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
)
assert credentials.project_number is None
def test_project_id_without_scopes(self):
# Initialize credentials with no scopes.
credentials = CredentialsImpl(
audience=self.AUDIENCE,
subject_token_type=self.SUBJECT_TOKEN_TYPE,
token_url=self.TOKEN_URL,
credential_source=self.CREDENTIAL_SOURCE,
)
assert credentials.get_project_id(None) is None
def test_get_project_id_cloud_resource_manager_success(self):
# STS token exchange request/response.
token_response = self.SUCCESS_RESPONSE.copy()
token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
token_request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.SUBJECT_TOKEN_TYPE,
"scope": "https://www.googleapis.com/auth/iam",
}
# Service account impersonation request/response.
expire_time = (
_helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
).isoformat("T") + "Z"
expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
impersonation_response = {
"accessToken": "SA_ACCESS_TOKEN",
"expireTime": expire_time,
}
impersonation_headers = {
"Content-Type": "application/json",
"x-goog-user-project": self.QUOTA_PROJECT_ID,
"authorization": "Bearer {}".format(token_response["access_token"]),
}
impersonation_request_data = {
"delegates": None,
"scope": self.SCOPES,
"lifetime": "3600s",
}
# Initialize mock request to handle token exchange, service account
# impersonation and cloud resource manager request.
request = self.make_mock_request(
status=http_client.OK,
data=self.SUCCESS_RESPONSE.copy(),
impersonation_status=http_client.OK,
impersonation_data=impersonation_response,
cloud_resource_manager_status=http_client.OK,
cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
)
credentials = self.make_credentials(
service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
scopes=self.SCOPES,
quota_project_id=self.QUOTA_PROJECT_ID,
)
# Expected project ID from cloud resource manager response should be returned.
project_id = credentials.get_project_id(request)
assert project_id == self.PROJECT_ID
# 3 requests should be processed.
assert len(request.call_args_list) == 3
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
# In the process of getting project ID, an access token should be
# retrieved.
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
assert credentials.token == impersonation_response["accessToken"]
# Verify cloud resource manager request parameters.
self.assert_resource_manager_request_kwargs(
request.call_args_list[2][1],
self.PROJECT_NUMBER,
{
"x-goog-user-project": self.QUOTA_PROJECT_ID,
"authorization": "Bearer {}".format(
impersonation_response["accessToken"]
),
},
)
# Calling get_project_id again should return the cached project_id.
project_id = credentials.get_project_id(request)
assert project_id == self.PROJECT_ID
# No additional requests.
assert len(request.call_args_list) == 3
def test_workforce_pool_get_project_id_cloud_resource_manager_success(self):
# STS token exchange request/response.
token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
token_request_data = {
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"audience": self.WORKFORCE_AUDIENCE,
"requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
"subject_token": "subject_token_0",
"subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
"scope": "scope1 scope2",
"options": urllib.parse.quote(
json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
),
}
# Initialize mock request to handle token exchange and cloud resource
# manager request.
request = self.make_mock_request(
status=http_client.OK,
data=self.SUCCESS_RESPONSE.copy(),
cloud_resource_manager_status=http_client.OK,
cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
)
credentials = self.make_workforce_pool_credentials(
scopes=self.SCOPES,
quota_project_id=self.QUOTA_PROJECT_ID,
workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
)
# Expected project ID from cloud resource manager response should be returned.
project_id = credentials.get_project_id(request)
assert project_id == self.PROJECT_ID
# 2 requests should be processed.
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
request.call_args_list[0][1], token_headers, token_request_data
)
# In the process of getting project ID, an access token should be
# retrieved.
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
# Verify cloud resource manager request parameters.
self.assert_resource_manager_request_kwargs(
request.call_args_list[1][1],
self.WORKFORCE_POOL_USER_PROJECT,
{
"x-goog-user-project": self.QUOTA_PROJECT_ID,
"authorization": "Bearer {}".format(
self.SUCCESS_RESPONSE["access_token"]
),
},
)
# Calling get_project_id again should return the cached project_id.
project_id = credentials.get_project_id(request)
assert project_id == self.PROJECT_ID
# No additional requests.
assert len(request.call_args_list) == 2
def test_get_project_id_cloud_resource_manager_error(self):
# Simulate resource doesn't have sufficient permissions to access
# cloud resource manager.
request = self.make_mock_request(
status=http_client.OK,
data=self.SUCCESS_RESPONSE.copy(),
cloud_resource_manager_status=http_client.UNAUTHORIZED,
)
credentials = self.make_credentials(scopes=self.SCOPES)
project_id = credentials.get_project_id(request)
assert project_id is None
# Only 2 requests to STS and cloud resource manager should be sent.
assert len(request.call_args_list) == 2