Revert "feat: workload identity federation support (#686)" (#691)
This reverts commit 5dcd2b1bdd9d21522636d959cffc49ee29dda88f.
diff --git a/tests/data/external_subject_token.json b/tests/data/external_subject_token.json
deleted file mode 100644
index a47ec34..0000000
--- a/tests/data/external_subject_token.json
+++ /dev/null
@@ -1,3 +0,0 @@
-{
- "access_token": "HEADER.SIMULATED_JWT_PAYLOAD.SIGNATURE"
-}
\ No newline at end of file
diff --git a/tests/data/external_subject_token.txt b/tests/data/external_subject_token.txt
deleted file mode 100644
index c668d8f..0000000
--- a/tests/data/external_subject_token.txt
+++ /dev/null
@@ -1 +0,0 @@
-HEADER.SIMULATED_JWT_PAYLOAD.SIGNATURE
\ No newline at end of file
diff --git a/tests/oauth2/test_sts.py b/tests/oauth2/test_sts.py
deleted file mode 100644
index 8792bd6..0000000
--- a/tests/oauth2/test_sts.py
+++ /dev/null
@@ -1,395 +0,0 @@
-# Copyright 2020 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-
-import mock
-import pytest
-from six.moves import http_client
-from six.moves import urllib
-
-from google.auth import exceptions
-from google.auth import transport
-from google.oauth2 import sts
-from google.oauth2 import utils
-
-CLIENT_ID = "username"
-CLIENT_SECRET = "password"
-# Base64 encoding of "username:password"
-BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
-
-
-class TestStsClient(object):
- GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
- RESOURCE = "https://api.example.com/"
- AUDIENCE = "urn:example:cooperation-context"
- SCOPES = ["scope1", "scope2"]
- REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
- SUBJECT_TOKEN = "HEADER.SUBJECT_TOKEN_PAYLOAD.SIGNATURE"
- SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
- ACTOR_TOKEN = "HEADER.ACTOR_TOKEN_PAYLOAD.SIGNATURE"
- ACTOR_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
- TOKEN_EXCHANGE_ENDPOINT = "https://example.com/token.oauth2"
- ADDON_HEADERS = {"x-client-version": "0.1.2"}
- ADDON_OPTIONS = {"additional": {"non-standard": ["options"], "other": "some-value"}}
- SUCCESS_RESPONSE = {
- "access_token": "ACCESS_TOKEN",
- "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "token_type": "Bearer",
- "expires_in": 3600,
- "scope": "scope1 scope2",
- }
- ERROR_RESPONSE = {
- "error": "invalid_request",
- "error_description": "Invalid subject token",
- "error_uri": "https://tools.ietf.org/html/rfc6749",
- }
- CLIENT_AUTH_BASIC = utils.ClientAuthentication(
- utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET
- )
- CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication(
- utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET
- )
-
- @classmethod
- def make_client(cls, client_auth=None):
- return sts.Client(cls.TOKEN_EXCHANGE_ENDPOINT, client_auth)
-
- @classmethod
- def make_mock_request(cls, data, status=http_client.OK):
- response = mock.create_autospec(transport.Response, instance=True)
- response.status = status
- response.data = json.dumps(data).encode("utf-8")
-
- request = mock.create_autospec(transport.Request)
- request.return_value = response
-
- return request
-
- @classmethod
- def assert_request_kwargs(cls, request_kwargs, headers, request_data):
- """Asserts the request was called with the expected parameters.
- """
- assert request_kwargs["url"] == cls.TOKEN_EXCHANGE_ENDPOINT
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
- for (k, v) in body_tuples:
- assert v.decode("utf-8") == request_data[k.decode("utf-8")]
- assert len(body_tuples) == len(request_data.keys())
-
- def test_exchange_token_full_success_without_auth(self):
- """Test token exchange success without client authentication using full
- parameters.
- """
- client = self.make_client()
- headers = self.ADDON_HEADERS.copy()
- headers["Content-Type"] = "application/x-www-form-urlencoded"
- request_data = {
- "grant_type": self.GRANT_TYPE,
- "resource": self.RESOURCE,
- "audience": self.AUDIENCE,
- "scope": " ".join(self.SCOPES),
- "requested_token_type": self.REQUESTED_TOKEN_TYPE,
- "subject_token": self.SUBJECT_TOKEN,
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- "actor_token": self.ACTOR_TOKEN,
- "actor_token_type": self.ACTOR_TOKEN_TYPE,
- "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
- }
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
-
- response = client.exchange_token(
- request,
- self.GRANT_TYPE,
- self.SUBJECT_TOKEN,
- self.SUBJECT_TOKEN_TYPE,
- self.RESOURCE,
- self.AUDIENCE,
- self.SCOPES,
- self.REQUESTED_TOKEN_TYPE,
- self.ACTOR_TOKEN,
- self.ACTOR_TOKEN_TYPE,
- self.ADDON_OPTIONS,
- self.ADDON_HEADERS,
- )
-
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
- assert response == self.SUCCESS_RESPONSE
-
- def test_exchange_token_partial_success_without_auth(self):
- """Test token exchange success without client authentication using
- partial (required only) parameters.
- """
- client = self.make_client()
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
- request_data = {
- "grant_type": self.GRANT_TYPE,
- "audience": self.AUDIENCE,
- "requested_token_type": self.REQUESTED_TOKEN_TYPE,
- "subject_token": self.SUBJECT_TOKEN,
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
-
- response = client.exchange_token(
- request,
- grant_type=self.GRANT_TYPE,
- subject_token=self.SUBJECT_TOKEN,
- subject_token_type=self.SUBJECT_TOKEN_TYPE,
- audience=self.AUDIENCE,
- requested_token_type=self.REQUESTED_TOKEN_TYPE,
- )
-
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
- assert response == self.SUCCESS_RESPONSE
-
- def test_exchange_token_non200_without_auth(self):
- """Test token exchange without client auth responding with non-200 status.
- """
- client = self.make_client()
- request = self.make_mock_request(
- status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
- )
-
- with pytest.raises(exceptions.OAuthError) as excinfo:
- client.exchange_token(
- request,
- self.GRANT_TYPE,
- self.SUBJECT_TOKEN,
- self.SUBJECT_TOKEN_TYPE,
- self.RESOURCE,
- self.AUDIENCE,
- self.SCOPES,
- self.REQUESTED_TOKEN_TYPE,
- self.ACTOR_TOKEN,
- self.ACTOR_TOKEN_TYPE,
- self.ADDON_OPTIONS,
- self.ADDON_HEADERS,
- )
-
- assert excinfo.match(
- r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
- )
-
- def test_exchange_token_full_success_with_basic_auth(self):
- """Test token exchange success with basic client authentication using full
- parameters.
- """
- client = self.make_client(self.CLIENT_AUTH_BASIC)
- headers = self.ADDON_HEADERS.copy()
- headers["Content-Type"] = "application/x-www-form-urlencoded"
- headers["Authorization"] = "Basic {}".format(BASIC_AUTH_ENCODING)
- request_data = {
- "grant_type": self.GRANT_TYPE,
- "resource": self.RESOURCE,
- "audience": self.AUDIENCE,
- "scope": " ".join(self.SCOPES),
- "requested_token_type": self.REQUESTED_TOKEN_TYPE,
- "subject_token": self.SUBJECT_TOKEN,
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- "actor_token": self.ACTOR_TOKEN,
- "actor_token_type": self.ACTOR_TOKEN_TYPE,
- "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
- }
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
-
- response = client.exchange_token(
- request,
- self.GRANT_TYPE,
- self.SUBJECT_TOKEN,
- self.SUBJECT_TOKEN_TYPE,
- self.RESOURCE,
- self.AUDIENCE,
- self.SCOPES,
- self.REQUESTED_TOKEN_TYPE,
- self.ACTOR_TOKEN,
- self.ACTOR_TOKEN_TYPE,
- self.ADDON_OPTIONS,
- self.ADDON_HEADERS,
- )
-
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
- assert response == self.SUCCESS_RESPONSE
-
- def test_exchange_token_partial_success_with_basic_auth(self):
- """Test token exchange success with basic client authentication using
- partial (required only) parameters.
- """
- client = self.make_client(self.CLIENT_AUTH_BASIC)
- headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
- }
- request_data = {
- "grant_type": self.GRANT_TYPE,
- "audience": self.AUDIENCE,
- "requested_token_type": self.REQUESTED_TOKEN_TYPE,
- "subject_token": self.SUBJECT_TOKEN,
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
-
- response = client.exchange_token(
- request,
- grant_type=self.GRANT_TYPE,
- subject_token=self.SUBJECT_TOKEN,
- subject_token_type=self.SUBJECT_TOKEN_TYPE,
- audience=self.AUDIENCE,
- requested_token_type=self.REQUESTED_TOKEN_TYPE,
- )
-
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
- assert response == self.SUCCESS_RESPONSE
-
- def test_exchange_token_non200_with_basic_auth(self):
- """Test token exchange with basic client auth responding with non-200
- status.
- """
- client = self.make_client(self.CLIENT_AUTH_BASIC)
- request = self.make_mock_request(
- status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
- )
-
- with pytest.raises(exceptions.OAuthError) as excinfo:
- client.exchange_token(
- request,
- self.GRANT_TYPE,
- self.SUBJECT_TOKEN,
- self.SUBJECT_TOKEN_TYPE,
- self.RESOURCE,
- self.AUDIENCE,
- self.SCOPES,
- self.REQUESTED_TOKEN_TYPE,
- self.ACTOR_TOKEN,
- self.ACTOR_TOKEN_TYPE,
- self.ADDON_OPTIONS,
- self.ADDON_HEADERS,
- )
-
- assert excinfo.match(
- r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
- )
-
- def test_exchange_token_full_success_with_reqbody_auth(self):
- """Test token exchange success with request body client authenticaiton
- using full parameters.
- """
- client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
- headers = self.ADDON_HEADERS.copy()
- headers["Content-Type"] = "application/x-www-form-urlencoded"
- request_data = {
- "grant_type": self.GRANT_TYPE,
- "resource": self.RESOURCE,
- "audience": self.AUDIENCE,
- "scope": " ".join(self.SCOPES),
- "requested_token_type": self.REQUESTED_TOKEN_TYPE,
- "subject_token": self.SUBJECT_TOKEN,
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- "actor_token": self.ACTOR_TOKEN,
- "actor_token_type": self.ACTOR_TOKEN_TYPE,
- "options": urllib.parse.quote(json.dumps(self.ADDON_OPTIONS)),
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- }
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
-
- response = client.exchange_token(
- request,
- self.GRANT_TYPE,
- self.SUBJECT_TOKEN,
- self.SUBJECT_TOKEN_TYPE,
- self.RESOURCE,
- self.AUDIENCE,
- self.SCOPES,
- self.REQUESTED_TOKEN_TYPE,
- self.ACTOR_TOKEN,
- self.ACTOR_TOKEN_TYPE,
- self.ADDON_OPTIONS,
- self.ADDON_HEADERS,
- )
-
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
- assert response == self.SUCCESS_RESPONSE
-
- def test_exchange_token_partial_success_with_reqbody_auth(self):
- """Test token exchange success with request body client authentication
- using partial (required only) parameters.
- """
- client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
- request_data = {
- "grant_type": self.GRANT_TYPE,
- "audience": self.AUDIENCE,
- "requested_token_type": self.REQUESTED_TOKEN_TYPE,
- "subject_token": self.SUBJECT_TOKEN,
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- }
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
-
- response = client.exchange_token(
- request,
- grant_type=self.GRANT_TYPE,
- subject_token=self.SUBJECT_TOKEN,
- subject_token_type=self.SUBJECT_TOKEN_TYPE,
- audience=self.AUDIENCE,
- requested_token_type=self.REQUESTED_TOKEN_TYPE,
- )
-
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
- assert response == self.SUCCESS_RESPONSE
-
- def test_exchange_token_non200_with_reqbody_auth(self):
- """Test token exchange with POST request body client auth responding
- with non-200 status.
- """
- client = self.make_client(self.CLIENT_AUTH_REQUEST_BODY)
- request = self.make_mock_request(
- status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
- )
-
- with pytest.raises(exceptions.OAuthError) as excinfo:
- client.exchange_token(
- request,
- self.GRANT_TYPE,
- self.SUBJECT_TOKEN,
- self.SUBJECT_TOKEN_TYPE,
- self.RESOURCE,
- self.AUDIENCE,
- self.SCOPES,
- self.REQUESTED_TOKEN_TYPE,
- self.ACTOR_TOKEN,
- self.ACTOR_TOKEN_TYPE,
- self.ADDON_OPTIONS,
- self.ADDON_HEADERS,
- )
-
- assert excinfo.match(
- r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
- )
diff --git a/tests/oauth2/test_utils.py b/tests/oauth2/test_utils.py
deleted file mode 100644
index 6de9ff5..0000000
--- a/tests/oauth2/test_utils.py
+++ /dev/null
@@ -1,264 +0,0 @@
-# Copyright 2020 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-
-import pytest
-
-from google.auth import exceptions
-from google.oauth2 import utils
-
-
-CLIENT_ID = "username"
-CLIENT_SECRET = "password"
-# Base64 encoding of "username:password"
-BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
-# Base64 encoding of "username:"
-BASIC_AUTH_ENCODING_SECRETLESS = "dXNlcm5hbWU6"
-
-
-class AuthHandler(utils.OAuthClientAuthHandler):
- def __init__(self, client_auth=None):
- super(AuthHandler, self).__init__(client_auth)
-
- def apply_client_authentication_options(
- self, headers, request_body=None, bearer_token=None
- ):
- return super(AuthHandler, self).apply_client_authentication_options(
- headers, request_body, bearer_token
- )
-
-
-class TestClientAuthentication(object):
- @classmethod
- def make_client_auth(cls, client_secret=None):
- return utils.ClientAuthentication(
- utils.ClientAuthType.basic, CLIENT_ID, client_secret
- )
-
- def test_initialization_with_client_secret(self):
- client_auth = self.make_client_auth(CLIENT_SECRET)
-
- assert client_auth.client_auth_type == utils.ClientAuthType.basic
- assert client_auth.client_id == CLIENT_ID
- assert client_auth.client_secret == CLIENT_SECRET
-
- def test_initialization_no_client_secret(self):
- client_auth = self.make_client_auth()
-
- assert client_auth.client_auth_type == utils.ClientAuthType.basic
- assert client_auth.client_id == CLIENT_ID
- assert client_auth.client_secret is None
-
-
-class TestOAuthClientAuthHandler(object):
- CLIENT_AUTH_BASIC = utils.ClientAuthentication(
- utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET
- )
- CLIENT_AUTH_BASIC_SECRETLESS = utils.ClientAuthentication(
- utils.ClientAuthType.basic, CLIENT_ID
- )
- CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication(
- utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET
- )
- CLIENT_AUTH_REQUEST_BODY_SECRETLESS = utils.ClientAuthentication(
- utils.ClientAuthType.request_body, CLIENT_ID
- )
-
- @classmethod
- def make_oauth_client_auth_handler(cls, client_auth=None):
- return AuthHandler(client_auth)
-
- def test_apply_client_authentication_options_none(self):
- headers = {"Content-Type": "application/json"}
- request_body = {"foo": "bar"}
- auth_handler = self.make_oauth_client_auth_handler()
-
- auth_handler.apply_client_authentication_options(headers, request_body)
-
- assert headers == {"Content-Type": "application/json"}
- assert request_body == {"foo": "bar"}
-
- def test_apply_client_authentication_options_basic(self):
- headers = {"Content-Type": "application/json"}
- request_body = {"foo": "bar"}
- auth_handler = self.make_oauth_client_auth_handler(self.CLIENT_AUTH_BASIC)
-
- auth_handler.apply_client_authentication_options(headers, request_body)
-
- assert headers == {
- "Content-Type": "application/json",
- "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
- }
- assert request_body == {"foo": "bar"}
-
- def test_apply_client_authentication_options_basic_nosecret(self):
- headers = {"Content-Type": "application/json"}
- request_body = {"foo": "bar"}
- auth_handler = self.make_oauth_client_auth_handler(
- self.CLIENT_AUTH_BASIC_SECRETLESS
- )
-
- auth_handler.apply_client_authentication_options(headers, request_body)
-
- assert headers == {
- "Content-Type": "application/json",
- "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING_SECRETLESS),
- }
- assert request_body == {"foo": "bar"}
-
- def test_apply_client_authentication_options_request_body(self):
- headers = {"Content-Type": "application/json"}
- request_body = {"foo": "bar"}
- auth_handler = self.make_oauth_client_auth_handler(
- self.CLIENT_AUTH_REQUEST_BODY
- )
-
- auth_handler.apply_client_authentication_options(headers, request_body)
-
- assert headers == {"Content-Type": "application/json"}
- assert request_body == {
- "foo": "bar",
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- }
-
- def test_apply_client_authentication_options_request_body_nosecret(self):
- headers = {"Content-Type": "application/json"}
- request_body = {"foo": "bar"}
- auth_handler = self.make_oauth_client_auth_handler(
- self.CLIENT_AUTH_REQUEST_BODY_SECRETLESS
- )
-
- auth_handler.apply_client_authentication_options(headers, request_body)
-
- assert headers == {"Content-Type": "application/json"}
- assert request_body == {
- "foo": "bar",
- "client_id": CLIENT_ID,
- "client_secret": "",
- }
-
- def test_apply_client_authentication_options_request_body_no_body(self):
- headers = {"Content-Type": "application/json"}
- auth_handler = self.make_oauth_client_auth_handler(
- self.CLIENT_AUTH_REQUEST_BODY
- )
-
- with pytest.raises(exceptions.OAuthError) as excinfo:
- auth_handler.apply_client_authentication_options(headers)
-
- assert excinfo.match(r"HTTP request does not support request-body")
-
- def test_apply_client_authentication_options_bearer_token(self):
- bearer_token = "ACCESS_TOKEN"
- headers = {"Content-Type": "application/json"}
- request_body = {"foo": "bar"}
- auth_handler = self.make_oauth_client_auth_handler()
-
- auth_handler.apply_client_authentication_options(
- headers, request_body, bearer_token
- )
-
- assert headers == {
- "Content-Type": "application/json",
- "Authorization": "Bearer {}".format(bearer_token),
- }
- assert request_body == {"foo": "bar"}
-
- def test_apply_client_authentication_options_bearer_and_basic(self):
- bearer_token = "ACCESS_TOKEN"
- headers = {"Content-Type": "application/json"}
- request_body = {"foo": "bar"}
- auth_handler = self.make_oauth_client_auth_handler(self.CLIENT_AUTH_BASIC)
-
- auth_handler.apply_client_authentication_options(
- headers, request_body, bearer_token
- )
-
- # Bearer token should have higher priority.
- assert headers == {
- "Content-Type": "application/json",
- "Authorization": "Bearer {}".format(bearer_token),
- }
- assert request_body == {"foo": "bar"}
-
- def test_apply_client_authentication_options_bearer_and_request_body(self):
- bearer_token = "ACCESS_TOKEN"
- headers = {"Content-Type": "application/json"}
- request_body = {"foo": "bar"}
- auth_handler = self.make_oauth_client_auth_handler(
- self.CLIENT_AUTH_REQUEST_BODY
- )
-
- auth_handler.apply_client_authentication_options(
- headers, request_body, bearer_token
- )
-
- # Bearer token should have higher priority.
- assert headers == {
- "Content-Type": "application/json",
- "Authorization": "Bearer {}".format(bearer_token),
- }
- assert request_body == {"foo": "bar"}
-
-
-def test__handle_error_response_code_only():
- error_resp = {"error": "unsupported_grant_type"}
- response_data = json.dumps(error_resp)
-
- with pytest.raises(exceptions.OAuthError) as excinfo:
- utils.handle_error_response(response_data)
-
- assert excinfo.match(r"Error code unsupported_grant_type")
-
-
-def test__handle_error_response_code_description():
- error_resp = {
- "error": "unsupported_grant_type",
- "error_description": "The provided grant_type is unsupported",
- }
- response_data = json.dumps(error_resp)
-
- with pytest.raises(exceptions.OAuthError) as excinfo:
- utils.handle_error_response(response_data)
-
- assert excinfo.match(
- r"Error code unsupported_grant_type: The provided grant_type is unsupported"
- )
-
-
-def test__handle_error_response_code_description_uri():
- error_resp = {
- "error": "unsupported_grant_type",
- "error_description": "The provided grant_type is unsupported",
- "error_uri": "https://tools.ietf.org/html/rfc6749",
- }
- response_data = json.dumps(error_resp)
-
- with pytest.raises(exceptions.OAuthError) as excinfo:
- utils.handle_error_response(response_data)
-
- assert excinfo.match(
- r"Error code unsupported_grant_type: The provided grant_type is unsupported - https://tools.ietf.org/html/rfc6749"
- )
-
-
-def test__handle_error_response_non_json():
- response_data = "Oops, something wrong happened"
-
- with pytest.raises(exceptions.OAuthError) as excinfo:
- utils.handle_error_response(response_data)
-
- assert excinfo.match(r"Oops, something wrong happened")
diff --git a/tests/test__default.py b/tests/test__default.py
index ef6cb78..74511f9 100644
--- a/tests/test__default.py
+++ b/tests/test__default.py
@@ -20,13 +20,10 @@
from google.auth import _default
from google.auth import app_engine
-from google.auth import aws
from google.auth import compute_engine
from google.auth import credentials
from google.auth import environment_vars
from google.auth import exceptions
-from google.auth import external_account
-from google.auth import identity_pool
from google.oauth2 import service_account
import google.oauth2.credentials
@@ -52,34 +49,6 @@
with open(SERVICE_ACCOUNT_FILE) as fh:
SERVICE_ACCOUNT_FILE_DATA = json.load(fh)
-SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
-TOKEN_URL = "https://sts.googleapis.com/v1/token"
-AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
-REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
-SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
-CRED_VERIFICATION_URL = (
- "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
-)
-IDENTITY_POOL_DATA = {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
- "token_url": TOKEN_URL,
- "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
-}
-AWS_DATA = {
- "type": "external_account",
- "audience": AUDIENCE,
- "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
- "token_url": TOKEN_URL,
- "credential_source": {
- "environment_id": "aws1",
- "region_url": REGION_URL,
- "url": SECURITY_CREDS_URL,
- "regional_cred_verification_url": CRED_VERIFICATION_URL,
- },
-}
-
MOCK_CREDENTIALS = mock.Mock(spec=credentials.CredentialsWithQuotaProject)
MOCK_CREDENTIALS.with_quota_project.return_value = MOCK_CREDENTIALS
@@ -88,12 +57,6 @@
return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
autospec=True,
)
-EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH = mock.patch.object(
- external_account.Credentials,
- "get_project_id",
- return_value=mock.sentinel.project_id,
- autospec=True,
-)
def test_load_credentials_from_missing_file():
@@ -222,92 +185,6 @@
assert excinfo.match(r"missing fields")
-@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
-def test_load_credentials_from_file_external_account_identity_pool(
- get_project_id, tmpdir
-):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- credentials, project_id = _default.load_credentials_from_file(str(config_file))
-
- assert isinstance(credentials, identity_pool.Credentials)
- assert project_id is mock.sentinel.project_id
- assert get_project_id.called
-
-
-@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
-def test_load_credentials_from_file_external_account_aws(get_project_id, tmpdir):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(AWS_DATA))
- credentials, project_id = _default.load_credentials_from_file(str(config_file))
-
- assert isinstance(credentials, aws.Credentials)
- assert project_id is mock.sentinel.project_id
- assert get_project_id.called
-
-
-@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
-def test_load_credentials_from_file_external_account_with_user_and_default_scopes(
- get_project_id, tmpdir
-):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- credentials, project_id = _default.load_credentials_from_file(
- str(config_file),
- scopes=["https://www.google.com/calendar/feeds"],
- default_scopes=["https://www.googleapis.com/auth/cloud-platform"],
- )
-
- assert isinstance(credentials, identity_pool.Credentials)
- assert project_id is mock.sentinel.project_id
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- assert credentials.default_scopes == [
- "https://www.googleapis.com/auth/cloud-platform"
- ]
-
-
-@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
-def test_load_credentials_from_file_external_account_with_quota_project(
- get_project_id, tmpdir
-):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- credentials, project_id = _default.load_credentials_from_file(
- str(config_file), quota_project_id="project-foo"
- )
-
- assert isinstance(credentials, identity_pool.Credentials)
- assert project_id is mock.sentinel.project_id
- assert credentials.quota_project_id == "project-foo"
-
-
-def test_load_credentials_from_file_external_account_bad_format(tmpdir):
- filename = tmpdir.join("external_account_bad.json")
- filename.write(json.dumps({"type": "external_account"}))
-
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.load_credentials_from_file(str(filename))
-
- assert excinfo.match(
- "Failed to load external account credentials from {}".format(str(filename))
- )
-
-
-@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
-def test_load_credentials_from_file_external_account_explicit_request(
- get_project_id, tmpdir
-):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- credentials, project_id = _default.load_credentials_from_file(
- str(config_file), request=mock.sentinel.request
- )
-
- assert isinstance(credentials, identity_pool.Credentials)
- assert project_id is mock.sentinel.project_id
- get_project_id.assert_called_with(credentials, request=mock.sentinel.request)
-
-
@mock.patch.dict(os.environ, {}, clear=True)
def test__get_explicit_environ_credentials_no_env():
assert _default._get_explicit_environ_credentials() == (None, None)
@@ -321,34 +198,7 @@
assert credentials is MOCK_CREDENTIALS
assert project_id is mock.sentinel.project_id
- load.assert_called_with(
- "filename",
- scopes=None,
- default_scopes=None,
- quota_project_id=None,
- request=None,
- )
-
-
-@LOAD_FILE_PATCH
-def test__get_explicit_environ_credentials_with_scopes_and_request(load, monkeypatch):
- scopes = ["one", "two"]
- monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
-
- credentials, project_id = _default._get_explicit_environ_credentials(
- request=mock.sentinel.request, scopes=scopes
- )
-
- assert credentials is MOCK_CREDENTIALS
- assert project_id is mock.sentinel.project_id
- # Request and scopes should be propagated.
- load.assert_called_with(
- "filename",
- scopes=scopes,
- default_scopes=None,
- quota_project_id=None,
- request=mock.sentinel.request,
- )
+ load.assert_called_with("filename")
@LOAD_FILE_PATCH
@@ -653,70 +503,3 @@
sys.modules["google.auth.compute_engine"] = None
sys.modules["google.auth.app_engine"] = None
assert _default.default() == (MOCK_CREDENTIALS, mock.sentinel.project_id)
-
-
-@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
-def test_default_environ_external_credentials(get_project_id, monkeypatch, tmpdir):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
-
- credentials, project_id = _default.default()
-
- assert isinstance(credentials, identity_pool.Credentials)
- assert project_id is mock.sentinel.project_id
-
-
-@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
-def test_default_environ_external_credentials_with_user_and_default_scopes_and_quota_project_id(
- get_project_id, monkeypatch, tmpdir
-):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
-
- credentials, project_id = _default.default(
- scopes=["https://www.google.com/calendar/feeds"],
- default_scopes=["https://www.googleapis.com/auth/cloud-platform"],
- quota_project_id="project-foo",
- )
-
- assert isinstance(credentials, identity_pool.Credentials)
- assert project_id is mock.sentinel.project_id
- assert credentials.quota_project_id == "project-foo"
- assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
- assert credentials.default_scopes == [
- "https://www.googleapis.com/auth/cloud-platform"
- ]
-
-
-@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
-def test_default_environ_external_credentials_explicit_request(
- get_project_id, monkeypatch, tmpdir
-):
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(IDENTITY_POOL_DATA))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
-
- credentials, project_id = _default.default(request=mock.sentinel.request)
-
- assert isinstance(credentials, identity_pool.Credentials)
- assert project_id is mock.sentinel.project_id
- # default() will initialize new credentials via with_scopes_if_required
- # and potentially with_quota_project.
- # As a result the caller of get_project_id() will not match the returned
- # credentials.
- get_project_id.assert_called_with(mock.ANY, request=mock.sentinel.request)
-
-
-def test_default_environ_external_credentials_bad_format(monkeypatch, tmpdir):
- filename = tmpdir.join("external_account_bad.json")
- filename.write(json.dumps({"type": "external_account"}))
- monkeypatch.setenv(environment_vars.CREDENTIALS, str(filename))
-
- with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default.default()
-
- assert excinfo.match(
- "Failed to load external account credentials from {}".format(str(filename))
- )
diff --git a/tests/test_aws.py b/tests/test_aws.py
deleted file mode 100644
index 9a8f98e..0000000
--- a/tests/test_aws.py
+++ /dev/null
@@ -1,1434 +0,0 @@
-# Copyright 2020 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import datetime
-import json
-
-import mock
-import pytest
-from six.moves import http_client
-from six.moves import urllib
-
-from google.auth import _helpers
-from google.auth import aws
-from google.auth import environment_vars
-from google.auth import exceptions
-from google.auth import transport
-
-
-CLIENT_ID = "username"
-CLIENT_SECRET = "password"
-# Base64 encoding of "username:password".
-BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
-SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
-SERVICE_ACCOUNT_IMPERSONATION_URL = (
- "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
- + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
-)
-QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
-SCOPES = ["scope1", "scope2"]
-TOKEN_URL = "https://sts.googleapis.com/v1/token"
-SUBJECT_TOKEN_TYPE = "urn:ietf:params:aws:token-type:aws4_request"
-AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
-REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
-SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
-CRED_VERIFICATION_URL = (
- "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
-)
-# Sample AWS security credentials to be used with tests that require a session token.
-ACCESS_KEY_ID = "ASIARD4OQDT6A77FR3CL"
-SECRET_ACCESS_KEY = "Y8AfSaucF37G4PpvfguKZ3/l7Id4uocLXxX0+VTx"
-TOKEN = "IQoJb3JpZ2luX2VjEIz//////////wEaCXVzLWVhc3QtMiJGMEQCIH7MHX/Oy/OB8OlLQa9GrqU1B914+iMikqWQW7vPCKlgAiA/Lsv8Jcafn14owfxXn95FURZNKaaphj0ykpmS+Ki+CSq0AwhlEAAaDDA3NzA3MTM5MTk5NiIMx9sAeP1ovlMTMKLjKpEDwuJQg41/QUKx0laTZYjPlQvjwSqS3OB9P1KAXPWSLkliVMMqaHqelvMF/WO/glv3KwuTfQsavRNs3v5pcSEm4SPO3l7mCs7KrQUHwGP0neZhIKxEXy+Ls//1C/Bqt53NL+LSbaGv6RPHaX82laz2qElphg95aVLdYgIFY6JWV5fzyjgnhz0DQmy62/Vi8pNcM2/VnxeCQ8CC8dRDSt52ry2v+nc77vstuI9xV5k8mPtnaPoJDRANh0bjwY5Sdwkbp+mGRUJBAQRlNgHUJusefXQgVKBCiyJY4w3Csd8Bgj9IyDV+Azuy1jQqfFZWgP68LSz5bURyIjlWDQunO82stZ0BgplKKAa/KJHBPCp8Qi6i99uy7qh76FQAqgVTsnDuU6fGpHDcsDSGoCls2HgZjZFPeOj8mmRhFk1Xqvkbjuz8V1cJk54d3gIJvQt8gD2D6yJQZecnuGWd5K2e2HohvCc8Fc9kBl1300nUJPV+k4tr/A5R/0QfEKOZL1/k5lf1g9CREnrM8LVkGxCgdYMxLQow1uTL+QU67AHRRSp5PhhGX4Rek+01vdYSnJCMaPhSEgcLqDlQkhk6MPsyT91QMXcWmyO+cAZwUPwnRamFepuP4K8k2KVXs/LIJHLELwAZ0ekyaS7CptgOqS7uaSTFG3U+vzFZLEnGvWQ7y9IPNQZ+Dffgh4p3vF4J68y9049sI6Sr5d5wbKkcbm8hdCDHZcv4lnqohquPirLiFQ3q7B17V9krMPu3mz1cg4Ekgcrn/E09NTsxAqD8NcZ7C7ECom9r+X3zkDOxaajW6hu3Az8hGlyylDaMiFfRbBJpTIlxp7jfa7CxikNgNtEKLH9iCzvuSg2vhA=="
-# To avoid json.dumps() differing behavior from one version to other,
-# the JSON payload is hardcoded.
-REQUEST_PARAMS = '{"KeySchema":[{"KeyType":"HASH","AttributeName":"Id"}],"TableName":"TestTable","AttributeDefinitions":[{"AttributeName":"Id","AttributeType":"S"}],"ProvisionedThroughput":{"WriteCapacityUnits":5,"ReadCapacityUnits":5}}'
-# Each tuple contains the following entries:
-# region, time, credentials, original_request, signed_request
-TEST_FIXTURES = [
- # GET request (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with relative path (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-relative-relative.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/foo/bar/../..",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/foo/bar/../..",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with /./ path (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-dot-slash.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/./",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/./",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b27ccfbfa7df52a200ff74193ca6e32d4b48b8856fab7ebf1c595d0670a7e470",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with pointless dot path (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-slash-pointless-dot.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/./foo",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/./foo",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=910e4d6c9abafaf87898e1eb4c929135782ea25bb0279703146455745391e63a",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with utf8 path (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-utf8.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/%E1%88%B4",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/%E1%88%B4",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=8d6634c189aa8c75c2e51e106b6b5121bed103fdb351f7d7d4381c738823af74",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with duplicate query key (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-key-case.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/?foo=Zoo&foo=aha",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/?foo=Zoo&foo=aha",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=be7148d34ebccdc6423b19085378aa0bee970bdc61d144bd1a8c48c33079ab09",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with duplicate out of order query key (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-query-order-value.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/?foo=b&foo=a",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/?foo=b&foo=a",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=feb926e49e382bec75c9d7dcb2a1b6dc8aa50ca43c25d2bc51143768c0875acc",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with utf8 query (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-vanilla-ut8-query.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "GET",
- "url": "https://host.foo.com/?{}=bar".format(
- urllib.parse.unquote("%E1%88%B4")
- ),
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/?{}=bar".format(
- urllib.parse.unquote("%E1%88%B4")
- ),
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=6fb359e9a05394cc7074e0feb42573a2601abc0c869a953e8c5c12e4e01f1a8c",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # POST request with sorted headers (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-key-sort.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "ZOO": "zoobar"},
- },
- {
- "url": "https://host.foo.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=b7a95a52518abbca0964a999a880429ab734f35ebbf1235bd79a5de87756dc4a",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- "ZOO": "zoobar",
- },
- },
- ),
- # POST request with upper case header value from AWS Python test harness.
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-header-value-case.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "zoo": "ZOOBAR"},
- },
- {
- "url": "https://host.foo.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;zoo, Signature=273313af9d0c265c531e11db70bbd653f3ba074c1009239e8559d3987039cad7",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- "zoo": "ZOOBAR",
- },
- },
- ),
- # POST request with header and no body (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT", "p": "phfft"},
- },
- {
- "url": "https://host.foo.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host;p, Signature=debf546796015d6f6ded8626f5ce98597c33b47b9164cf6b17b4642036fcb592",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- "p": "phfft",
- },
- },
- ),
- # POST request with body and no header (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-x-www-form-urlencoded.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/",
- "headers": {
- "Content-Type": "application/x-www-form-urlencoded",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- "data": "foo=bar",
- },
- {
- "url": "https://host.foo.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=content-type;date;host, Signature=5a15b22cf462f047318703b92e6f4f38884e4a7ab7b1d6426ca46a8bd1c26cbc",
- "host": "host.foo.com",
- "Content-Type": "application/x-www-form-urlencoded",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- "data": "foo=bar",
- },
- ),
- # POST request with querystring (AWS botocore tests).
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.req
- # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/post-vanilla-query.sreq
- (
- "us-east-1",
- "2011-09-09T23:36:00Z",
- {
- "access_key_id": "AKIDEXAMPLE",
- "secret_access_key": "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
- },
- {
- "method": "POST",
- "url": "https://host.foo.com/?foo=bar",
- "headers": {"date": "Mon, 09 Sep 2011 23:36:00 GMT"},
- },
- {
- "url": "https://host.foo.com/?foo=bar",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential=AKIDEXAMPLE/20110909/us-east-1/host/aws4_request, SignedHeaders=date;host, Signature=b6e3b79003ce0743a491606ba1035a804593b0efb1e20a11cba83f8c25a57a92",
- "host": "host.foo.com",
- "date": "Mon, 09 Sep 2011 23:36:00 GMT",
- },
- },
- ),
- # GET request with session token credentials.
- (
- "us-east-2",
- "2020-08-11T06:55:22Z",
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- },
- {
- "method": "GET",
- "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
- },
- {
- "url": "https://ec2.us-east-2.amazonaws.com?Action=DescribeRegions&Version=2013-10-15",
- "method": "GET",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential="
- + ACCESS_KEY_ID
- + "/20200811/us-east-2/ec2/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=631ea80cddfaa545fdadb120dc92c9f18166e38a5c47b50fab9fce476e022855",
- "host": "ec2.us-east-2.amazonaws.com",
- "x-amz-date": "20200811T065522Z",
- "x-amz-security-token": TOKEN,
- },
- },
- ),
- # POST request with session token credentials.
- (
- "us-east-2",
- "2020-08-11T06:55:22Z",
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- },
- {
- "method": "POST",
- "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- },
- {
- "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential="
- + ACCESS_KEY_ID
- + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date;x-amz-security-token, Signature=73452984e4a880ffdc5c392355733ec3f5ba310d5e0609a89244440cadfe7a7a",
- "host": "sts.us-east-2.amazonaws.com",
- "x-amz-date": "20200811T065522Z",
- "x-amz-security-token": TOKEN,
- },
- },
- ),
- # POST request with computed x-amz-date and no data.
- (
- "us-east-2",
- "2020-08-11T06:55:22Z",
- {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY},
- {
- "method": "POST",
- "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- },
- {
- "url": "https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential="
- + ACCESS_KEY_ID
- + "/20200811/us-east-2/sts/aws4_request, SignedHeaders=host;x-amz-date, Signature=d095ba304919cd0d5570ba8a3787884ee78b860f268ed040ba23831d55536d56",
- "host": "sts.us-east-2.amazonaws.com",
- "x-amz-date": "20200811T065522Z",
- },
- },
- ),
- # POST request with session token and additional headers/data.
- (
- "us-east-2",
- "2020-08-11T06:55:22Z",
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- },
- {
- "method": "POST",
- "url": "https://dynamodb.us-east-2.amazonaws.com/",
- "headers": {
- "Content-Type": "application/x-amz-json-1.0",
- "x-amz-target": "DynamoDB_20120810.CreateTable",
- },
- "data": REQUEST_PARAMS,
- },
- {
- "url": "https://dynamodb.us-east-2.amazonaws.com/",
- "method": "POST",
- "headers": {
- "Authorization": "AWS4-HMAC-SHA256 Credential="
- + ACCESS_KEY_ID
- + "/20200811/us-east-2/dynamodb/aws4_request, SignedHeaders=content-type;host;x-amz-date;x-amz-security-token;x-amz-target, Signature=fdaa5b9cc9c86b80fe61eaf504141c0b3523780349120f2bd8145448456e0385",
- "host": "dynamodb.us-east-2.amazonaws.com",
- "x-amz-date": "20200811T065522Z",
- "Content-Type": "application/x-amz-json-1.0",
- "x-amz-target": "DynamoDB_20120810.CreateTable",
- "x-amz-security-token": TOKEN,
- },
- "data": REQUEST_PARAMS,
- },
- ),
-]
-
-
-class TestRequestSigner(object):
- @pytest.mark.parametrize(
- "region, time, credentials, original_request, signed_request", TEST_FIXTURES
- )
- @mock.patch("google.auth._helpers.utcnow")
- def test_get_request_options(
- self, utcnow, region, time, credentials, original_request, signed_request
- ):
- utcnow.return_value = datetime.datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
- request_signer = aws.RequestSigner(region)
- actual_signed_request = request_signer.get_request_options(
- credentials,
- original_request.get("url"),
- original_request.get("method"),
- original_request.get("data"),
- original_request.get("headers"),
- )
-
- assert actual_signed_request == signed_request
-
- def test_get_request_options_with_missing_scheme_url(self):
- request_signer = aws.RequestSigner("us-east-2")
-
- with pytest.raises(ValueError) as excinfo:
- request_signer.get_request_options(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- },
- "invalid",
- "POST",
- )
-
- assert excinfo.match(r"Invalid AWS service URL")
-
- def test_get_request_options_with_invalid_scheme_url(self):
- request_signer = aws.RequestSigner("us-east-2")
-
- with pytest.raises(ValueError) as excinfo:
- request_signer.get_request_options(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- },
- "http://invalid",
- "POST",
- )
-
- assert excinfo.match(r"Invalid AWS service URL")
-
- def test_get_request_options_with_missing_hostname_url(self):
- request_signer = aws.RequestSigner("us-east-2")
-
- with pytest.raises(ValueError) as excinfo:
- request_signer.get_request_options(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- },
- "https://",
- "POST",
- )
-
- assert excinfo.match(r"Invalid AWS service URL")
-
-
-class TestCredentials(object):
- AWS_REGION = "us-east-2"
- AWS_ROLE = "gcp-aws-role"
- AWS_SECURITY_CREDENTIALS_RESPONSE = {
- "AccessKeyId": ACCESS_KEY_ID,
- "SecretAccessKey": SECRET_ACCESS_KEY,
- "Token": TOKEN,
- }
- AWS_SIGNATURE_TIME = "2020-08-11T06:55:22Z"
- CREDENTIAL_SOURCE = {
- "environment_id": "aws1",
- "region_url": REGION_URL,
- "url": SECURITY_CREDS_URL,
- "regional_cred_verification_url": CRED_VERIFICATION_URL,
- }
- SUCCESS_RESPONSE = {
- "access_token": "ACCESS_TOKEN",
- "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "token_type": "Bearer",
- "expires_in": 3600,
- "scope": " ".join(SCOPES),
- }
-
- @classmethod
- def make_serialized_aws_signed_request(
- cls,
- aws_security_credentials,
- region_name="us-east-2",
- url="https://sts.us-east-2.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15",
- ):
- """Utility to generate serialize AWS signed requests.
- This makes it easy to assert generated subject tokens based on the
- provided AWS security credentials, regions and AWS STS endpoint.
- """
- request_signer = aws.RequestSigner(region_name)
- signed_request = request_signer.get_request_options(
- aws_security_credentials, url, "POST"
- )
- reformatted_signed_request = {
- "url": signed_request.get("url"),
- "method": signed_request.get("method"),
- "headers": [
- {
- "key": "Authorization",
- "value": signed_request.get("headers").get("Authorization"),
- },
- {"key": "host", "value": signed_request.get("headers").get("host")},
- {
- "key": "x-amz-date",
- "value": signed_request.get("headers").get("x-amz-date"),
- },
- ],
- }
- # Include security token if available.
- if "security_token" in aws_security_credentials:
- reformatted_signed_request.get("headers").append(
- {
- "key": "x-amz-security-token",
- "value": signed_request.get("headers").get("x-amz-security-token"),
- }
- )
- # Append x-goog-cloud-target-resource header.
- reformatted_signed_request.get("headers").append(
- {"key": "x-goog-cloud-target-resource", "value": AUDIENCE}
- ),
- return urllib.parse.quote(
- json.dumps(
- reformatted_signed_request, separators=(",", ":"), sort_keys=True
- )
- )
-
- @classmethod
- def make_mock_request(
- cls,
- region_status=None,
- region_name=None,
- role_status=None,
- role_name=None,
- security_credentials_status=None,
- security_credentials_data=None,
- token_status=None,
- token_data=None,
- impersonation_status=None,
- impersonation_data=None,
- ):
- """Utility function to generate a mock HTTP request object.
- This will facilitate testing various edge cases by specify how the
- various endpoints will respond while generating a Google Access token
- in an AWS environment.
- """
- responses = []
- if region_status:
- # AWS region request.
- region_response = mock.create_autospec(transport.Response, instance=True)
- region_response.status = region_status
- if region_name:
- region_response.data = "{}b".format(region_name).encode("utf-8")
- responses.append(region_response)
-
- if role_status:
- # AWS role name request.
- role_response = mock.create_autospec(transport.Response, instance=True)
- role_response.status = role_status
- if role_name:
- role_response.data = role_name.encode("utf-8")
- responses.append(role_response)
-
- if security_credentials_status:
- # AWS security credentials request.
- security_credentials_response = mock.create_autospec(
- transport.Response, instance=True
- )
- security_credentials_response.status = security_credentials_status
- if security_credentials_data:
- security_credentials_response.data = json.dumps(
- security_credentials_data
- ).encode("utf-8")
- responses.append(security_credentials_response)
-
- if token_status:
- # GCP token exchange request.
- token_response = mock.create_autospec(transport.Response, instance=True)
- token_response.status = token_status
- token_response.data = json.dumps(token_data).encode("utf-8")
- responses.append(token_response)
-
- if impersonation_status:
- # Service account impersonation request.
- impersonation_response = mock.create_autospec(
- transport.Response, instance=True
- )
- impersonation_response.status = impersonation_status
- impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
- responses.append(impersonation_response)
-
- request = mock.create_autospec(transport.Request)
- request.side_effect = responses
-
- return request
-
- @classmethod
- def make_credentials(
- cls,
- credential_source,
- client_id=None,
- client_secret=None,
- quota_project_id=None,
- scopes=None,
- default_scopes=None,
- service_account_impersonation_url=None,
- ):
- return aws.Credentials(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=service_account_impersonation_url,
- credential_source=credential_source,
- client_id=client_id,
- client_secret=client_secret,
- quota_project_id=quota_project_id,
- scopes=scopes,
- default_scopes=default_scopes,
- )
-
- @classmethod
- def assert_aws_metadata_request_kwargs(cls, request_kwargs, url, headers=None):
- assert request_kwargs["url"] == url
- # All used AWS metadata server endpoints use GET HTTP method.
- assert request_kwargs["method"] == "GET"
- if headers:
- assert request_kwargs["headers"] == headers
- else:
- assert "headers" not in request_kwargs
- # None of the endpoints used require any data in request.
- assert "body" not in request_kwargs
-
- @classmethod
- def assert_token_request_kwargs(
- cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
- ):
- assert request_kwargs["url"] == token_url
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
- assert len(body_tuples) == len(request_data.keys())
- for (k, v) in body_tuples:
- assert v.decode("utf-8") == request_data[k.decode("utf-8")]
-
- @classmethod
- def assert_impersonation_request_kwargs(
- cls,
- request_kwargs,
- headers,
- request_data,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- ):
- assert request_kwargs["url"] == service_account_impersonation_url
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_json = json.loads(request_kwargs["body"].decode("utf-8"))
- assert body_json == request_data
-
- @mock.patch.object(aws.Credentials, "__init__", return_value=None)
- def test_from_info_full_options(self, mock_init):
- credentials = aws.Credentials.from_info(
- {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- "quota_project_id": QUOTA_PROJECT_ID,
- "credential_source": self.CREDENTIAL_SOURCE,
- }
- )
-
- # Confirm aws.Credentials instance initialized with the expected parameters.
- assert isinstance(credentials, aws.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=QUOTA_PROJECT_ID,
- )
-
- @mock.patch.object(aws.Credentials, "__init__", return_value=None)
- def test_from_info_required_options_only(self, mock_init):
- credentials = aws.Credentials.from_info(
- {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE,
- }
- )
-
- # Confirm aws.Credentials instance initialized with the expected parameters.
- assert isinstance(credentials, aws.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=None,
- )
-
- @mock.patch.object(aws.Credentials, "__init__", return_value=None)
- def test_from_file_full_options(self, mock_init, tmpdir):
- info = {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- "quota_project_id": QUOTA_PROJECT_ID,
- "credential_source": self.CREDENTIAL_SOURCE,
- }
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(info))
- credentials = aws.Credentials.from_file(str(config_file))
-
- # Confirm aws.Credentials instance initialized with the expected parameters.
- assert isinstance(credentials, aws.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=QUOTA_PROJECT_ID,
- )
-
- @mock.patch.object(aws.Credentials, "__init__", return_value=None)
- def test_from_file_required_options_only(self, mock_init, tmpdir):
- info = {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE,
- }
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(info))
- credentials = aws.Credentials.from_file(str(config_file))
-
- # Confirm aws.Credentials instance initialized with the expected parameters.
- assert isinstance(credentials, aws.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=None,
- )
-
- def test_constructor_invalid_credential_source(self):
- # Provide invalid credential source.
- credential_source = {"unsupported": "value"}
-
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
-
- assert excinfo.match(r"No valid AWS 'credential_source' provided")
-
- def test_constructor_invalid_environment_id(self):
- # Provide invalid environment_id.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source["environment_id"] = "azure1"
-
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
-
- assert excinfo.match(r"No valid AWS 'credential_source' provided")
-
- def test_constructor_missing_cred_verification_url(self):
- # regional_cred_verification_url is a required field.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source.pop("regional_cred_verification_url")
-
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
-
- assert excinfo.match(r"No valid AWS 'credential_source' provided")
-
- def test_constructor_invalid_environment_id_version(self):
- # Provide an unsupported version.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source["environment_id"] = "aws3"
-
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
-
- assert excinfo.match(r"aws version '3' is not supported in the current build.")
-
- def test_retrieve_subject_token_missing_region_url(self):
- # When AWS_REGION envvar is not available, region_url is required for
- # determining the current AWS region.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source.pop("region_url")
- credentials = self.make_credentials(credential_source=credential_source)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
-
- assert excinfo.match(r"Unable to determine AWS region")
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_temp_creds_no_environment_vars(
- self, utcnow
- ):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
-
- subject_token = credentials.retrieve_subject_token(request)
-
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- # Assert region request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0].kwargs, REGION_URL
- )
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[1].kwargs, SECURITY_CREDS_URL
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- request.call_args_list[2].kwargs,
- "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
- {"Content-Type": "application/json"},
- )
-
- # Retrieve subject_token again. Region should not be queried again.
- new_request = self.make_mock_request(
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- )
-
- credentials.retrieve_subject_token(new_request)
-
- # Only 2 requests should be sent as the region is cached.
- assert len(new_request.call_args_list) == 2
- # Assert role request.
- self.assert_aws_metadata_request_kwargs(
- new_request.call_args_list[0].kwargs, SECURITY_CREDS_URL
- )
- # Assert security credentials request.
- self.assert_aws_metadata_request_kwargs(
- new_request.call_args_list[1].kwargs,
- "{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
- {"Content-Type": "application/json"},
- )
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_permanent_creds_no_environment_vars(
- self, utcnow
- ):
- # Simualte a permanent credential without a session token is
- # returned by the security-credentials endpoint.
- security_creds_response = self.AWS_SECURITY_CREDENTIALS_RESPONSE.copy()
- security_creds_response.pop("Token")
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=security_creds_response,
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
-
- subject_token = credentials.retrieve_subject_token(request)
-
- assert subject_token == self.make_serialized_aws_signed_request(
- {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
- )
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_environment_vars(self, utcnow, monkeypatch):
- monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
- monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
- monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
- monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
-
- subject_token = credentials.retrieve_subject_token(None)
-
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_environment_vars_no_session_token(
- self, utcnow, monkeypatch
- ):
- monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
- monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
- monkeypatch.setenv(environment_vars.AWS_REGION, self.AWS_REGION)
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
-
- subject_token = credentials.retrieve_subject_token(None)
-
- assert subject_token == self.make_serialized_aws_signed_request(
- {"access_key_id": ACCESS_KEY_ID, "secret_access_key": SECRET_ACCESS_KEY}
- )
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_retrieve_subject_token_success_environment_vars_except_region(
- self, utcnow, monkeypatch
- ):
- monkeypatch.setenv(environment_vars.AWS_ACCESS_KEY_ID, ACCESS_KEY_ID)
- monkeypatch.setenv(environment_vars.AWS_SECRET_ACCESS_KEY, SECRET_ACCESS_KEY)
- monkeypatch.setenv(environment_vars.AWS_SESSION_TOKEN, TOKEN)
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- # Region will be queried since it is not found in envvars.
- request = self.make_mock_request(
- region_status=http_client.OK, region_name=self.AWS_REGION
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
-
- subject_token = credentials.retrieve_subject_token(request)
-
- assert subject_token == self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
-
- def test_retrieve_subject_token_error_determining_aws_region(self):
- # Simulate error in retrieving the AWS region.
- request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(request)
-
- assert excinfo.match(r"Unable to retrieve AWS region")
-
- def test_retrieve_subject_token_error_determining_aws_role(self):
- # Simulate error in retrieving the AWS role name.
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.BAD_REQUEST,
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(request)
-
- assert excinfo.match(r"Unable to retrieve AWS role name")
-
- def test_retrieve_subject_token_error_determining_security_creds_url(self):
- # Simulate the security-credentials url is missing. This is needed for
- # determining the AWS security credentials when not found in envvars.
- credential_source = self.CREDENTIAL_SOURCE.copy()
- credential_source.pop("url")
- request = self.make_mock_request(
- region_status=http_client.OK, region_name=self.AWS_REGION
- )
- credentials = self.make_credentials(credential_source=credential_source)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(request)
-
- assert excinfo.match(
- r"Unable to determine the AWS metadata server security credentials endpoint"
- )
-
- def test_retrieve_subject_token_error_determining_aws_security_creds(self):
- # Simulate error in retrieving the AWS security credentials.
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.BAD_REQUEST,
- )
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(request)
-
- assert excinfo.match(r"Unable to retrieve AWS security credentials")
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_refresh_success_without_impersonation_ignore_default_scopes(self, utcnow):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic " + BASIC_AUTH_ENCODING,
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": " ".join(SCOPES),
- "subject_token": expected_subject_token,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- token_status=http_client.OK,
- token_data=self.SUCCESS_RESPONSE,
- )
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=QUOTA_PROJECT_ID,
- scopes=SCOPES,
- # Default scopes should be ignored.
- default_scopes=["ignored"],
- )
-
- credentials.refresh(request)
-
- assert len(request.call_args_list) == 4
- # Fourth request should be sent to GCP STS endpoint.
- self.assert_token_request_kwargs(
- request.call_args_list[3].kwargs, token_headers, token_request_data
- )
- assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
- assert credentials.quota_project_id == QUOTA_PROJECT_ID
- assert credentials.scopes == SCOPES
- assert credentials.default_scopes == ["ignored"]
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_refresh_success_without_impersonation_use_default_scopes(self, utcnow):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic " + BASIC_AUTH_ENCODING,
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": " ".join(SCOPES),
- "subject_token": expected_subject_token,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- token_status=http_client.OK,
- token_data=self.SUCCESS_RESPONSE,
- )
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- quota_project_id=QUOTA_PROJECT_ID,
- scopes=None,
- # Default scopes should be used since user specified scopes are none.
- default_scopes=SCOPES,
- )
-
- credentials.refresh(request)
-
- assert len(request.call_args_list) == 4
- # Fourth request should be sent to GCP STS endpoint.
- self.assert_token_request_kwargs(
- request.call_args_list[3].kwargs, token_headers, token_request_data
- )
- assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
- assert credentials.quota_project_id == QUOTA_PROJECT_ID
- assert credentials.scopes is None
- assert credentials.default_scopes == SCOPES
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_refresh_success_with_impersonation_ignore_default_scopes(self, utcnow):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic " + BASIC_AUTH_ENCODING,
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": "https://www.googleapis.com/auth/iam",
- "subject_token": expected_subject_token,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- }
- # Service account impersonation request/response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-goog-user-project": QUOTA_PROJECT_ID,
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": SCOPES,
- "lifetime": "3600s",
- }
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- token_status=http_client.OK,
- token_data=self.SUCCESS_RESPONSE,
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- quota_project_id=QUOTA_PROJECT_ID,
- scopes=SCOPES,
- # Default scopes should be ignored.
- default_scopes=["ignored"],
- )
-
- credentials.refresh(request)
-
- assert len(request.call_args_list) == 5
- # Fourth request should be sent to GCP STS endpoint.
- self.assert_token_request_kwargs(
- request.call_args_list[3].kwargs, token_headers, token_request_data
- )
- # Fifth request should be sent to iamcredentials endpoint for service
- # account impersonation.
- self.assert_impersonation_request_kwargs(
- request.call_args_list[4].kwargs,
- impersonation_headers,
- impersonation_request_data,
- )
- assert credentials.token == impersonation_response["accessToken"]
- assert credentials.quota_project_id == QUOTA_PROJECT_ID
- assert credentials.scopes == SCOPES
- assert credentials.default_scopes == ["ignored"]
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_refresh_success_with_impersonation_use_default_scopes(self, utcnow):
- utcnow.return_value = datetime.datetime.strptime(
- self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
- )
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- expected_subject_token = self.make_serialized_aws_signed_request(
- {
- "access_key_id": ACCESS_KEY_ID,
- "secret_access_key": SECRET_ACCESS_KEY,
- "security_token": TOKEN,
- }
- )
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic " + BASIC_AUTH_ENCODING,
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": "https://www.googleapis.com/auth/iam",
- "subject_token": expected_subject_token,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- }
- # Service account impersonation request/response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-goog-user-project": QUOTA_PROJECT_ID,
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": SCOPES,
- "lifetime": "3600s",
- }
- request = self.make_mock_request(
- region_status=http_client.OK,
- region_name=self.AWS_REGION,
- role_status=http_client.OK,
- role_name=self.AWS_ROLE,
- security_credentials_status=http_client.OK,
- security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
- token_status=http_client.OK,
- token_data=self.SUCCESS_RESPONSE,
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- quota_project_id=QUOTA_PROJECT_ID,
- scopes=None,
- # Default scopes should be used since user specified scopes are none.
- default_scopes=SCOPES,
- )
-
- credentials.refresh(request)
-
- assert len(request.call_args_list) == 5
- # Fourth request should be sent to GCP STS endpoint.
- self.assert_token_request_kwargs(
- request.call_args_list[3].kwargs, token_headers, token_request_data
- )
- # Fifth request should be sent to iamcredentials endpoint for service
- # account impersonation.
- self.assert_impersonation_request_kwargs(
- request.call_args_list[4].kwargs,
- impersonation_headers,
- impersonation_request_data,
- )
- assert credentials.token == impersonation_response["accessToken"]
- assert credentials.quota_project_id == QUOTA_PROJECT_ID
- assert credentials.scopes is None
- assert credentials.default_scopes == SCOPES
-
- def test_refresh_with_retrieve_subject_token_error(self):
- request = self.make_mock_request(region_status=http_client.BAD_REQUEST)
- credentials = self.make_credentials(credential_source=self.CREDENTIAL_SOURCE)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(request)
-
- assert excinfo.match(r"Unable to retrieve AWS region")
diff --git a/tests/test_external_account.py b/tests/test_external_account.py
deleted file mode 100644
index 42e53ec..0000000
--- a/tests/test_external_account.py
+++ /dev/null
@@ -1,1095 +0,0 @@
-# Copyright 2020 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import datetime
-import json
-
-import mock
-import pytest
-from six.moves import http_client
-from six.moves import urllib
-
-from google.auth import _helpers
-from google.auth import exceptions
-from google.auth import external_account
-from google.auth import transport
-
-
-CLIENT_ID = "username"
-CLIENT_SECRET = "password"
-# Base64 encoding of "username:password"
-BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
-SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
-
-
-class CredentialsImpl(external_account.Credentials):
- def __init__(
- self,
- audience,
- subject_token_type,
- token_url,
- credential_source,
- service_account_impersonation_url=None,
- client_id=None,
- client_secret=None,
- quota_project_id=None,
- scopes=None,
- default_scopes=None,
- ):
- super(CredentialsImpl, self).__init__(
- audience=audience,
- subject_token_type=subject_token_type,
- token_url=token_url,
- credential_source=credential_source,
- service_account_impersonation_url=service_account_impersonation_url,
- client_id=client_id,
- client_secret=client_secret,
- quota_project_id=quota_project_id,
- scopes=scopes,
- default_scopes=default_scopes,
- )
- self._counter = 0
-
- def retrieve_subject_token(self, request):
- counter = self._counter
- self._counter += 1
- return "subject_token_{}".format(counter)
-
-
-class TestCredentials(object):
- TOKEN_URL = "https://sts.googleapis.com/v1/token"
- PROJECT_NUMBER = "123456"
- POOL_ID = "POOL_ID"
- PROVIDER_ID = "PROVIDER_ID"
- AUDIENCE = (
- "//iam.googleapis.com/projects/{}"
- "/locations/global/workloadIdentityPools/{}"
- "/providers/{}"
- ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
- SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
- CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
- SUCCESS_RESPONSE = {
- "access_token": "ACCESS_TOKEN",
- "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "token_type": "Bearer",
- "expires_in": 3600,
- "scope": "scope1 scope2",
- }
- ERROR_RESPONSE = {
- "error": "invalid_request",
- "error_description": "Invalid subject token",
- "error_uri": "https://tools.ietf.org/html/rfc6749",
- }
- QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
- SERVICE_ACCOUNT_IMPERSONATION_URL = (
- "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
- + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
- )
- SCOPES = ["scope1", "scope2"]
- IMPERSONATION_ERROR_RESPONSE = {
- "error": {
- "code": 400,
- "message": "Request contains an invalid argument",
- "status": "INVALID_ARGUMENT",
- }
- }
- PROJECT_ID = "my-proj-id"
- CLOUD_RESOURCE_MANAGER_URL = (
- "https://cloudresourcemanager.googleapis.com/v1/projects/"
- )
- CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
- "projectNumber": PROJECT_NUMBER,
- "projectId": PROJECT_ID,
- "lifecycleState": "ACTIVE",
- "name": "project-name",
- "createTime": "2018-11-06T04:42:54.109Z",
- "parent": {"type": "folder", "id": "12345678901"},
- }
-
- @classmethod
- def make_credentials(
- cls,
- client_id=None,
- client_secret=None,
- quota_project_id=None,
- scopes=None,
- default_scopes=None,
- service_account_impersonation_url=None,
- ):
- return CredentialsImpl(
- audience=cls.AUDIENCE,
- subject_token_type=cls.SUBJECT_TOKEN_TYPE,
- token_url=cls.TOKEN_URL,
- service_account_impersonation_url=service_account_impersonation_url,
- credential_source=cls.CREDENTIAL_SOURCE,
- client_id=client_id,
- client_secret=client_secret,
- quota_project_id=quota_project_id,
- scopes=scopes,
- default_scopes=default_scopes,
- )
-
- @classmethod
- def make_mock_request(
- cls,
- status=http_client.OK,
- data=None,
- impersonation_status=None,
- impersonation_data=None,
- cloud_resource_manager_status=None,
- cloud_resource_manager_data=None,
- ):
- # STS token exchange request.
- token_response = mock.create_autospec(transport.Response, instance=True)
- token_response.status = status
- token_response.data = json.dumps(data).encode("utf-8")
- responses = [token_response]
-
- # If service account impersonation is requested, mock the expected response.
- if impersonation_status:
- impersonation_response = mock.create_autospec(
- transport.Response, instance=True
- )
- impersonation_response.status = impersonation_status
- impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
- responses.append(impersonation_response)
-
- # If cloud resource manager is requested, mock the expected response.
- if cloud_resource_manager_status:
- cloud_resource_manager_response = mock.create_autospec(
- transport.Response, instance=True
- )
- cloud_resource_manager_response.status = cloud_resource_manager_status
- cloud_resource_manager_response.data = json.dumps(
- cloud_resource_manager_data
- ).encode("utf-8")
- responses.append(cloud_resource_manager_response)
-
- request = mock.create_autospec(transport.Request)
- request.side_effect = responses
-
- return request
-
- @classmethod
- def assert_token_request_kwargs(cls, request_kwargs, headers, request_data):
- assert request_kwargs["url"] == cls.TOKEN_URL
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
- for (k, v) in body_tuples:
- assert v.decode("utf-8") == request_data[k.decode("utf-8")]
- assert len(body_tuples) == len(request_data.keys())
-
- @classmethod
- def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data):
- assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_json = json.loads(request_kwargs["body"].decode("utf-8"))
- assert body_json == request_data
-
- @classmethod
- def assert_resource_manager_request_kwargs(
- cls, request_kwargs, project_number, headers
- ):
- assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
- assert request_kwargs["method"] == "GET"
- assert request_kwargs["headers"] == headers
- assert "body" not in request_kwargs
-
- def test_default_state(self):
- credentials = self.make_credentials()
-
- # Not token acquired yet
- assert not credentials.token
- assert not credentials.valid
- # Expiration hasn't been set yet
- assert not credentials.expiry
- assert not credentials.expired
- # Scopes are required
- assert not credentials.scopes
- assert credentials.requires_scopes
- assert not credentials.quota_project_id
-
- def test_with_scopes(self):
- credentials = self.make_credentials()
-
- assert not credentials.scopes
- assert credentials.requires_scopes
-
- scoped_credentials = credentials.with_scopes(["email"])
-
- assert scoped_credentials.has_scopes(["email"])
- assert not scoped_credentials.requires_scopes
-
- def test_with_scopes_using_user_and_default_scopes(self):
- credentials = self.make_credentials()
-
- assert not credentials.scopes
- assert credentials.requires_scopes
-
- scoped_credentials = credentials.with_scopes(
- ["email"], default_scopes=["profile"]
- )
-
- assert scoped_credentials.has_scopes(["email"])
- assert not scoped_credentials.has_scopes(["profile"])
- assert not scoped_credentials.requires_scopes
- assert scoped_credentials.scopes == ["email"]
- assert scoped_credentials.default_scopes == ["profile"]
-
- def test_with_scopes_using_default_scopes_only(self):
- credentials = self.make_credentials()
-
- assert not credentials.scopes
- assert credentials.requires_scopes
-
- scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
-
- assert scoped_credentials.has_scopes(["profile"])
- assert not scoped_credentials.requires_scopes
-
- def test_with_scopes_full_options_propagated(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- quota_project_id=self.QUOTA_PROJECT_ID,
- scopes=self.SCOPES,
- default_scopes=["default1"],
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- )
-
- with mock.patch.object(
- external_account.Credentials, "__init__", return_value=None
- ) as mock_init:
- credentials.with_scopes(["email"], ["default2"])
-
- # Confirm with_scopes initialized the credential with the expected
- # parameters and scopes.
- mock_init.assert_called_once_with(
- audience=self.AUDIENCE,
- subject_token_type=self.SUBJECT_TOKEN_TYPE,
- token_url=self.TOKEN_URL,
- credential_source=self.CREDENTIAL_SOURCE,
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- quota_project_id=self.QUOTA_PROJECT_ID,
- scopes=["email"],
- default_scopes=["default2"],
- )
-
- def test_with_quota_project(self):
- credentials = self.make_credentials()
-
- assert not credentials.scopes
- assert not credentials.quota_project_id
-
- quota_project_creds = credentials.with_quota_project("project-foo")
-
- assert quota_project_creds.quota_project_id == "project-foo"
-
- def test_with_quota_project_full_options_propagated(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- quota_project_id=self.QUOTA_PROJECT_ID,
- scopes=self.SCOPES,
- default_scopes=["default1"],
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- )
-
- with mock.patch.object(
- external_account.Credentials, "__init__", return_value=None
- ) as mock_init:
- credentials.with_quota_project("project-foo")
-
- # Confirm with_quota_project initialized the credential with the
- # expected parameters and quota project ID.
- mock_init.assert_called_once_with(
- audience=self.AUDIENCE,
- subject_token_type=self.SUBJECT_TOKEN_TYPE,
- token_url=self.TOKEN_URL,
- credential_source=self.CREDENTIAL_SOURCE,
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- quota_project_id="project-foo",
- scopes=self.SCOPES,
- default_scopes=["default1"],
- )
-
- def test_with_invalid_impersonation_target_principal(self):
- invalid_url = "https://iamcredentials.googleapis.com/v1/invalid"
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- self.make_credentials(service_account_impersonation_url=invalid_url)
-
- assert excinfo.match(
- r"Unable to determine target principal from service account impersonation URL."
- )
-
- @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
- def test_refresh_without_client_auth_success(self, unused_utcnow):
- response = self.SUCCESS_RESPONSE.copy()
- # Test custom expiration to confirm expiry is set correctly.
- response["expires_in"] = 2800
- expected_expiry = datetime.datetime.min + datetime.timedelta(
- seconds=response["expires_in"]
- )
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
- request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": self.AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "subject_token": "subject_token_0",
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(status=http_client.OK, data=response)
- credentials = self.make_credentials()
-
- credentials.refresh(request)
-
- self.assert_token_request_kwargs(
- request.call_args.kwargs, headers, request_data
- )
- assert credentials.valid
- assert credentials.expiry == expected_expiry
- assert not credentials.expired
- assert credentials.token == response["access_token"]
-
- def test_refresh_impersonation_without_client_auth_success(self):
- # Simulate service account access token expires in 2800 seconds.
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
- ).isoformat("T") + "Z"
- expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
- # STS token exchange request/response.
- token_response = self.SUCCESS_RESPONSE.copy()
- token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": self.AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "subject_token": "subject_token_0",
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- "scope": "https://www.googleapis.com/auth/iam",
- }
- # Service account impersonation request/response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "authorization": "Bearer {}".format(token_response["access_token"]),
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": self.SCOPES,
- "lifetime": "3600s",
- }
- # Initialize mock request to handle token exchange and service account
- # impersonation request.
- request = self.make_mock_request(
- status=http_client.OK,
- data=token_response,
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- # Initialize credentials with service account impersonation.
- credentials = self.make_credentials(
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=self.SCOPES,
- )
-
- credentials.refresh(request)
-
- # Only 2 requests should be processed.
- assert len(request.call_args_list) == 2
- # Verify token exchange request parameters.
- self.assert_token_request_kwargs(
- request.call_args_list[0].kwargs, token_headers, token_request_data
- )
- # Verify service account impersonation request parameters.
- self.assert_impersonation_request_kwargs(
- request.call_args_list[1].kwargs,
- impersonation_headers,
- impersonation_request_data,
- )
- assert credentials.valid
- assert credentials.expiry == expected_expiry
- assert not credentials.expired
- assert credentials.token == impersonation_response["accessToken"]
-
- def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
- self
- ):
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
- request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": self.AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": "scope1 scope2",
- "subject_token": "subject_token_0",
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
- credentials = self.make_credentials(
- scopes=["scope1", "scope2"],
- # Default scopes will be ignored in favor of user scopes.
- default_scopes=["ignored"],
- )
-
- credentials.refresh(request)
-
- self.assert_token_request_kwargs(
- request.call_args.kwargs, headers, request_data
- )
- assert credentials.valid
- assert not credentials.expired
- assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
- assert credentials.has_scopes(["scope1", "scope2"])
- assert not credentials.has_scopes(["ignored"])
-
- def test_refresh_without_client_auth_success_explicit_default_scopes_only(self):
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
- request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": self.AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": "scope1 scope2",
- "subject_token": "subject_token_0",
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
- credentials = self.make_credentials(
- scopes=None,
- # Default scopes will be used since user scopes are none.
- default_scopes=["scope1", "scope2"],
- )
-
- credentials.refresh(request)
-
- self.assert_token_request_kwargs(
- request.call_args.kwargs, headers, request_data
- )
- assert credentials.valid
- assert not credentials.expired
- assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
- assert credentials.has_scopes(["scope1", "scope2"])
-
- def test_refresh_without_client_auth_error(self):
- request = self.make_mock_request(
- status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
- )
- credentials = self.make_credentials()
-
- with pytest.raises(exceptions.OAuthError) as excinfo:
- credentials.refresh(request)
-
- assert excinfo.match(
- r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
- )
- assert not credentials.expired
- assert credentials.token is None
-
- def test_refresh_impersonation_without_client_auth_error(self):
- request = self.make_mock_request(
- status=http_client.OK,
- data=self.SUCCESS_RESPONSE,
- impersonation_status=http_client.BAD_REQUEST,
- impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
- )
- credentials = self.make_credentials(
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=self.SCOPES,
- )
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(request)
-
- assert excinfo.match(r"Unable to acquire impersonated credentials")
- assert not credentials.expired
- assert credentials.token is None
-
- def test_refresh_with_client_auth_success(self):
- headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
- }
- request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": self.AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "subject_token": "subject_token_0",
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- }
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
- credentials = self.make_credentials(
- client_id=CLIENT_ID, client_secret=CLIENT_SECRET
- )
-
- credentials.refresh(request)
-
- self.assert_token_request_kwargs(
- request.call_args.kwargs, headers, request_data
- )
- assert credentials.valid
- assert not credentials.expired
- assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
-
- def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(self):
- # Simulate service account access token expires in 2800 seconds.
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
- ).isoformat("T") + "Z"
- expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
- # STS token exchange request/response.
- token_response = self.SUCCESS_RESPONSE.copy()
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": self.AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "subject_token": "subject_token_0",
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- "scope": "https://www.googleapis.com/auth/iam",
- }
- # Service account impersonation request/response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "authorization": "Bearer {}".format(token_response["access_token"]),
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": self.SCOPES,
- "lifetime": "3600s",
- }
- # Initialize mock request to handle token exchange and service account
- # impersonation request.
- request = self.make_mock_request(
- status=http_client.OK,
- data=token_response,
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=self.SCOPES,
- # Default scopes will be ignored since user scopes are specified.
- default_scopes=["ignored"],
- )
-
- credentials.refresh(request)
-
- # Only 2 requests should be processed.
- assert len(request.call_args_list) == 2
- # Verify token exchange request parameters.
- self.assert_token_request_kwargs(
- request.call_args_list[0].kwargs, token_headers, token_request_data
- )
- # Verify service account impersonation request parameters.
- self.assert_impersonation_request_kwargs(
- request.call_args_list[1].kwargs,
- impersonation_headers,
- impersonation_request_data,
- )
- assert credentials.valid
- assert credentials.expiry == expected_expiry
- assert not credentials.expired
- assert credentials.token == impersonation_response["accessToken"]
-
- def test_refresh_impersonation_with_client_auth_success_use_default_scopes(self):
- # Simulate service account access token expires in 2800 seconds.
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
- ).isoformat("T") + "Z"
- expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
- # STS token exchange request/response.
- token_response = self.SUCCESS_RESPONSE.copy()
- token_headers = {
- "Content-Type": "application/x-www-form-urlencoded",
- "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
- }
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": self.AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "subject_token": "subject_token_0",
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- "scope": "https://www.googleapis.com/auth/iam",
- }
- # Service account impersonation request/response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "authorization": "Bearer {}".format(token_response["access_token"]),
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": self.SCOPES,
- "lifetime": "3600s",
- }
- # Initialize mock request to handle token exchange and service account
- # impersonation request.
- request = self.make_mock_request(
- status=http_client.OK,
- data=token_response,
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=None,
- # Default scopes will be used since user specified scopes are none.
- default_scopes=self.SCOPES,
- )
-
- credentials.refresh(request)
-
- # Only 2 requests should be processed.
- assert len(request.call_args_list) == 2
- # Verify token exchange request parameters.
- self.assert_token_request_kwargs(
- request.call_args_list[0].kwargs, token_headers, token_request_data
- )
- # Verify service account impersonation request parameters.
- self.assert_impersonation_request_kwargs(
- request.call_args_list[1].kwargs,
- impersonation_headers,
- impersonation_request_data,
- )
- assert credentials.valid
- assert credentials.expiry == expected_expiry
- assert not credentials.expired
- assert credentials.token == impersonation_response["accessToken"]
-
- def test_apply_without_quota_project_id(self):
- headers = {}
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
- credentials = self.make_credentials()
-
- credentials.refresh(request)
- credentials.apply(headers)
-
- assert headers == {
- "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
- }
-
- def test_apply_impersonation_without_quota_project_id(self):
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- # Service account impersonation response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- # Initialize mock request to handle token exchange and service account
- # impersonation request.
- request = self.make_mock_request(
- status=http_client.OK,
- data=self.SUCCESS_RESPONSE.copy(),
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- # Initialize credentials with service account impersonation.
- credentials = self.make_credentials(
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=self.SCOPES,
- )
- headers = {}
-
- credentials.refresh(request)
- credentials.apply(headers)
-
- assert headers == {
- "authorization": "Bearer {}".format(impersonation_response["accessToken"])
- }
-
- def test_apply_with_quota_project_id(self):
- headers = {"other": "header-value"}
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
- credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
-
- credentials.refresh(request)
- credentials.apply(headers)
-
- assert headers == {
- "other": "header-value",
- "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- "x-goog-user-project": self.QUOTA_PROJECT_ID,
- }
-
- def test_apply_impersonation_with_quota_project_id(self):
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- # Service account impersonation response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- # Initialize mock request to handle token exchange and service account
- # impersonation request.
- request = self.make_mock_request(
- status=http_client.OK,
- data=self.SUCCESS_RESPONSE.copy(),
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- # Initialize credentials with service account impersonation.
- credentials = self.make_credentials(
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=self.SCOPES,
- quota_project_id=self.QUOTA_PROJECT_ID,
- )
- headers = {"other": "header-value"}
-
- credentials.refresh(request)
- credentials.apply(headers)
-
- assert headers == {
- "other": "header-value",
- "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- "x-goog-user-project": self.QUOTA_PROJECT_ID,
- }
-
- def test_before_request(self):
- headers = {"other": "header-value"}
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
- credentials = self.make_credentials()
-
- # First call should call refresh, setting the token.
- credentials.before_request(request, "POST", "https://example.com/api", headers)
-
- assert headers == {
- "other": "header-value",
- "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- }
-
- # Second call shouldn't call refresh.
- credentials.before_request(request, "POST", "https://example.com/api", headers)
-
- assert headers == {
- "other": "header-value",
- "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
- }
-
- def test_before_request_impersonation(self):
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- # Service account impersonation response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- # Initialize mock request to handle token exchange and service account
- # impersonation request.
- request = self.make_mock_request(
- status=http_client.OK,
- data=self.SUCCESS_RESPONSE.copy(),
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- headers = {"other": "header-value"}
- credentials = self.make_credentials(
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
- )
-
- # First call should call refresh, setting the token.
- credentials.before_request(request, "POST", "https://example.com/api", headers)
-
- assert headers == {
- "other": "header-value",
- "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- }
-
- # Second call shouldn't call refresh.
- credentials.before_request(request, "POST", "https://example.com/api", headers)
-
- assert headers == {
- "other": "header-value",
- "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
- }
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_before_request_expired(self, utcnow):
- headers = {}
- request = self.make_mock_request(
- status=http_client.OK, data=self.SUCCESS_RESPONSE
- )
- credentials = self.make_credentials()
- credentials.token = "token"
- utcnow.return_value = datetime.datetime.min
- # Set the expiration to one second more than now plus the clock skew
- # accomodation. These credentials should be valid.
- credentials.expiry = (
- datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
- )
-
- assert credentials.valid
- assert not credentials.expired
-
- credentials.before_request(request, "POST", "https://example.com/api", headers)
-
- # Cached token should be used.
- assert headers == {"authorization": "Bearer token"}
-
- # Next call should simulate 1 second passed.
- utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
-
- assert not credentials.valid
- assert credentials.expired
-
- credentials.before_request(request, "POST", "https://example.com/api", headers)
-
- # New token should be retrieved.
- assert headers == {
- "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
- }
-
- @mock.patch("google.auth._helpers.utcnow")
- def test_before_request_impersonation_expired(self, utcnow):
- headers = {}
- expire_time = (
- datetime.datetime.min + datetime.timedelta(seconds=3601)
- ).isoformat("T") + "Z"
- # Service account impersonation response.
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- # Initialize mock request to handle token exchange and service account
- # impersonation request.
- request = self.make_mock_request(
- status=http_client.OK,
- data=self.SUCCESS_RESPONSE.copy(),
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- )
- credentials = self.make_credentials(
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
- )
- credentials.token = "token"
- utcnow.return_value = datetime.datetime.min
- # Set the expiration to one second more than now plus the clock skew
- # accomodation. These credentials should be valid.
- credentials.expiry = (
- datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
- )
-
- assert credentials.valid
- assert not credentials.expired
-
- credentials.before_request(request, "POST", "https://example.com/api", headers)
-
- # Cached token should be used.
- assert headers == {"authorization": "Bearer token"}
-
- # Next call should simulate 1 second passed. This will trigger the expiration
- # threshold.
- utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
-
- assert not credentials.valid
- assert credentials.expired
-
- credentials.before_request(request, "POST", "https://example.com/api", headers)
-
- # New token should be retrieved.
- assert headers == {
- "authorization": "Bearer {}".format(impersonation_response["accessToken"])
- }
-
- @pytest.mark.parametrize(
- "audience",
- [
- # Legacy K8s audience format.
- "identitynamespace:1f12345:my_provider",
- # Unrealistic audiences.
- "//iam.googleapis.com/projects",
- "//iam.googleapis.com/projects/",
- "//iam.googleapis.com/project/123456",
- "//iam.googleapis.com/projects//123456",
- "//iam.googleapis.com/prefix_projects/123456",
- "//iam.googleapis.com/projects_suffix/123456",
- ],
- )
- def test_project_number_indeterminable(self, audience):
- credentials = CredentialsImpl(
- audience=audience,
- subject_token_type=self.SUBJECT_TOKEN_TYPE,
- token_url=self.TOKEN_URL,
- credential_source=self.CREDENTIAL_SOURCE,
- )
-
- assert credentials.project_number is None
- assert credentials.get_project_id(None) is None
-
- def test_project_number_determinable(self):
- credentials = CredentialsImpl(
- audience=self.AUDIENCE,
- subject_token_type=self.SUBJECT_TOKEN_TYPE,
- token_url=self.TOKEN_URL,
- credential_source=self.CREDENTIAL_SOURCE,
- )
-
- assert credentials.project_number == self.PROJECT_NUMBER
-
- def test_project_id_without_scopes(self):
- # Initialize credentials with no scopes.
- credentials = CredentialsImpl(
- audience=self.AUDIENCE,
- subject_token_type=self.SUBJECT_TOKEN_TYPE,
- token_url=self.TOKEN_URL,
- credential_source=self.CREDENTIAL_SOURCE,
- )
-
- assert credentials.get_project_id(None) is None
-
- def test_get_project_id_cloud_resource_manager_success(self):
- # STS token exchange request/response.
- token_response = self.SUCCESS_RESPONSE.copy()
- token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": self.AUDIENCE,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "subject_token": "subject_token_0",
- "subject_token_type": self.SUBJECT_TOKEN_TYPE,
- "scope": "https://www.googleapis.com/auth/iam",
- }
- # Service account impersonation request/response.
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "x-goog-user-project": self.QUOTA_PROJECT_ID,
- "authorization": "Bearer {}".format(token_response["access_token"]),
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": self.SCOPES,
- "lifetime": "3600s",
- }
- # Initialize mock request to handle token exchange, service account
- # impersonation and cloud resource manager request.
- request = self.make_mock_request(
- status=http_client.OK,
- data=self.SUCCESS_RESPONSE.copy(),
- impersonation_status=http_client.OK,
- impersonation_data=impersonation_response,
- cloud_resource_manager_status=http_client.OK,
- cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
- )
- credentials = self.make_credentials(
- service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=self.SCOPES,
- quota_project_id=self.QUOTA_PROJECT_ID,
- )
-
- # Expected project ID from cloud resource manager response should be returned.
- project_id = credentials.get_project_id(request)
-
- assert project_id == self.PROJECT_ID
- # 3 requests should be processed.
- assert len(request.call_args_list) == 3
- # Verify token exchange request parameters.
- self.assert_token_request_kwargs(
- request.call_args_list[0].kwargs, token_headers, token_request_data
- )
- # Verify service account impersonation request parameters.
- self.assert_impersonation_request_kwargs(
- request.call_args_list[1].kwargs,
- impersonation_headers,
- impersonation_request_data,
- )
- # In the process of getting project ID, an access token should be
- # retrieved.
- assert credentials.valid
- assert credentials.expiry == expected_expiry
- assert not credentials.expired
- assert credentials.token == impersonation_response["accessToken"]
- # Verify cloud resource manager request parameters.
- self.assert_resource_manager_request_kwargs(
- request.call_args_list[2].kwargs,
- self.PROJECT_NUMBER,
- {
- "x-goog-user-project": self.QUOTA_PROJECT_ID,
- "authorization": "Bearer {}".format(
- impersonation_response["accessToken"]
- ),
- },
- )
-
- # Calling get_project_id again should return the cached project_id.
- project_id = credentials.get_project_id(request)
-
- assert project_id == self.PROJECT_ID
- # No additional requests.
- assert len(request.call_args_list) == 3
-
- def test_get_project_id_cloud_resource_manager_error(self):
- # Simulate resource doesn't have sufficient permissions to access
- # cloud resource manager.
- request = self.make_mock_request(
- status=http_client.OK,
- data=self.SUCCESS_RESPONSE.copy(),
- cloud_resource_manager_status=http_client.UNAUTHORIZED,
- )
- credentials = self.make_credentials(scopes=self.SCOPES)
-
- project_id = credentials.get_project_id(request)
-
- assert project_id is None
- # Only 2 requests to STS and cloud resource manager should be sent.
- assert len(request.call_args_list) == 2
diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py
deleted file mode 100644
index c017ab5..0000000
--- a/tests/test_identity_pool.py
+++ /dev/null
@@ -1,873 +0,0 @@
-# Copyright 2020 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import datetime
-import json
-import os
-
-import mock
-import pytest
-from six.moves import http_client
-from six.moves import urllib
-
-from google.auth import _helpers
-from google.auth import exceptions
-from google.auth import identity_pool
-from google.auth import transport
-
-
-CLIENT_ID = "username"
-CLIENT_SECRET = "password"
-# Base64 encoding of "username:password".
-BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
-SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
-SERVICE_ACCOUNT_IMPERSONATION_URL = (
- "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
- + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
-)
-QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
-SCOPES = ["scope1", "scope2"]
-DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
-SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
-SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
-SUBJECT_TOKEN_FIELD_NAME = "access_token"
-
-with open(SUBJECT_TOKEN_TEXT_FILE) as fh:
- TEXT_FILE_SUBJECT_TOKEN = fh.read()
-
-with open(SUBJECT_TOKEN_JSON_FILE) as fh:
- JSON_FILE_CONTENT = json.load(fh)
- JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME)
-
-TOKEN_URL = "https://sts.googleapis.com/v1/token"
-SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
-AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
-
-
-class TestCredentials(object):
- CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE}
- CREDENTIAL_SOURCE_JSON = {
- "file": SUBJECT_TOKEN_JSON_FILE,
- "format": {"type": "json", "subject_token_field_name": "access_token"},
- }
- CREDENTIAL_URL = "http://fakeurl.com"
- CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL}
- CREDENTIAL_SOURCE_JSON_URL = {
- "url": CREDENTIAL_URL,
- "format": {"type": "json", "subject_token_field_name": "access_token"},
- }
- SUCCESS_RESPONSE = {
- "access_token": "ACCESS_TOKEN",
- "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "token_type": "Bearer",
- "expires_in": 3600,
- "scope": " ".join(SCOPES),
- }
-
- @classmethod
- def make_mock_response(cls, status, data):
- response = mock.create_autospec(transport.Response, instance=True)
- response.status = status
- if isinstance(data, dict):
- response.data = json.dumps(data).encode("utf-8")
- else:
- response.data = data
- return response
-
- @classmethod
- def make_mock_request(
- cls, token_status=http_client.OK, token_data=None, *extra_requests
- ):
- responses = []
- responses.append(cls.make_mock_response(token_status, token_data))
-
- while len(extra_requests) > 0:
- # If service account impersonation is requested, mock the expected response.
- status, data, extra_requests = (
- extra_requests[0],
- extra_requests[1],
- extra_requests[2:],
- )
- responses.append(cls.make_mock_response(status, data))
-
- request = mock.create_autospec(transport.Request)
- request.side_effect = responses
-
- return request
-
- @classmethod
- def assert_credential_request_kwargs(
- cls, request_kwargs, headers, url=CREDENTIAL_URL
- ):
- assert request_kwargs["url"] == url
- assert request_kwargs["method"] == "GET"
- assert request_kwargs["headers"] == headers
- assert request_kwargs.get("body", None) is None
-
- @classmethod
- def assert_token_request_kwargs(
- cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
- ):
- assert request_kwargs["url"] == token_url
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
- assert len(body_tuples) == len(request_data.keys())
- for (k, v) in body_tuples:
- assert v.decode("utf-8") == request_data[k.decode("utf-8")]
-
- @classmethod
- def assert_impersonation_request_kwargs(
- cls,
- request_kwargs,
- headers,
- request_data,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- ):
- assert request_kwargs["url"] == service_account_impersonation_url
- assert request_kwargs["method"] == "POST"
- assert request_kwargs["headers"] == headers
- assert request_kwargs["body"] is not None
- body_json = json.loads(request_kwargs["body"].decode("utf-8"))
- assert body_json == request_data
-
- @classmethod
- def assert_underlying_credentials_refresh(
- cls,
- credentials,
- audience,
- subject_token,
- subject_token_type,
- token_url,
- service_account_impersonation_url=None,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=None,
- credential_data=None,
- scopes=None,
- default_scopes=None,
- ):
- """Utility to assert that a credentials are initialized with the expected
- attributes by calling refresh functionality and confirming response matches
- expected one and that the underlying requests were populated with the
- expected parameters.
- """
- # STS token exchange request/response.
- token_response = cls.SUCCESS_RESPONSE.copy()
- token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
- if basic_auth_encoding:
- token_headers["Authorization"] = "Basic " + basic_auth_encoding
-
- if service_account_impersonation_url:
- token_scopes = "https://www.googleapis.com/auth/iam"
- else:
- token_scopes = " ".join(used_scopes or [])
-
- token_request_data = {
- "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
- "audience": audience,
- "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
- "scope": token_scopes,
- "subject_token": subject_token,
- "subject_token_type": subject_token_type,
- }
-
- if service_account_impersonation_url:
- # Service account impersonation request/response.
- expire_time = (
- _helpers.utcnow().replace(microsecond=0)
- + datetime.timedelta(seconds=3600)
- ).isoformat("T") + "Z"
- impersonation_response = {
- "accessToken": "SA_ACCESS_TOKEN",
- "expireTime": expire_time,
- }
- impersonation_headers = {
- "Content-Type": "application/json",
- "authorization": "Bearer {}".format(token_response["access_token"]),
- }
- impersonation_request_data = {
- "delegates": None,
- "scope": used_scopes,
- "lifetime": "3600s",
- }
-
- # Initialize mock request to handle token retrieval, token exchange and
- # service account impersonation request.
- requests = []
- if credential_data:
- requests.append((http_client.OK, credential_data))
-
- token_request_index = len(requests)
- requests.append((http_client.OK, token_response))
-
- if service_account_impersonation_url:
- impersonation_request_index = len(requests)
- requests.append((http_client.OK, impersonation_response))
-
- request = cls.make_mock_request(*[el for req in requests for el in req])
-
- credentials.refresh(request)
-
- assert len(request.call_args_list) == len(requests)
- if credential_data:
- cls.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
- # Verify token exchange request parameters.
- cls.assert_token_request_kwargs(
- request.call_args_list[token_request_index].kwargs,
- token_headers,
- token_request_data,
- token_url,
- )
- # Verify service account impersonation request parameters if the request
- # is processed.
- if service_account_impersonation_url:
- cls.assert_impersonation_request_kwargs(
- request.call_args_list[impersonation_request_index].kwargs,
- impersonation_headers,
- impersonation_request_data,
- service_account_impersonation_url,
- )
- assert credentials.token == impersonation_response["accessToken"]
- else:
- assert credentials.token == token_response["access_token"]
- assert credentials.quota_project_id == quota_project_id
- assert credentials.scopes == scopes
- assert credentials.default_scopes == default_scopes
-
- @classmethod
- def make_credentials(
- cls,
- client_id=None,
- client_secret=None,
- quota_project_id=None,
- scopes=None,
- default_scopes=None,
- service_account_impersonation_url=None,
- credential_source=None,
- ):
- return identity_pool.Credentials(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=service_account_impersonation_url,
- credential_source=credential_source,
- client_id=client_id,
- client_secret=client_secret,
- quota_project_id=quota_project_id,
- scopes=scopes,
- default_scopes=default_scopes,
- )
-
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_info_full_options(self, mock_init):
- credentials = identity_pool.Credentials.from_info(
- {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- "quota_project_id": QUOTA_PROJECT_ID,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- }
- )
-
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- quota_project_id=QUOTA_PROJECT_ID,
- )
-
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_info_required_options_only(self, mock_init):
- credentials = identity_pool.Credentials.from_info(
- {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- }
- )
-
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- quota_project_id=None,
- )
-
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_file_full_options(self, mock_init, tmpdir):
- info = {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
- "client_id": CLIENT_ID,
- "client_secret": CLIENT_SECRET,
- "quota_project_id": QUOTA_PROJECT_ID,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- }
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(info))
- credentials = identity_pool.Credentials.from_file(str(config_file))
-
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- quota_project_id=QUOTA_PROJECT_ID,
- )
-
- @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
- def test_from_file_required_options_only(self, mock_init, tmpdir):
- info = {
- "audience": AUDIENCE,
- "subject_token_type": SUBJECT_TOKEN_TYPE,
- "token_url": TOKEN_URL,
- "credential_source": self.CREDENTIAL_SOURCE_TEXT,
- }
- config_file = tmpdir.join("config.json")
- config_file.write(json.dumps(info))
- credentials = identity_pool.Credentials.from_file(str(config_file))
-
- # Confirm identity_pool.Credentials instantiated with expected attributes.
- assert isinstance(credentials, identity_pool.Credentials)
- mock_init.assert_called_once_with(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- client_id=None,
- client_secret=None,
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- quota_project_id=None,
- )
-
- def test_constructor_invalid_options(self):
- credential_source = {"unsupported": "value"}
-
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
-
- assert excinfo.match(r"Missing credential_source")
-
- def test_constructor_invalid_options_url_and_file(self):
- credential_source = {
- "url": self.CREDENTIAL_URL,
- "file": SUBJECT_TOKEN_TEXT_FILE,
- }
-
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
-
- assert excinfo.match(r"Ambiguous credential_source")
-
- def test_constructor_invalid_options_environment_id(self):
- credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
-
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
-
- assert excinfo.match(
- r"Invalid Identity Pool credential_source field 'environment_id'"
- )
-
- def test_constructor_invalid_credential_source(self):
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source="non-dict")
-
- assert excinfo.match(r"Missing credential_source")
-
- def test_constructor_invalid_credential_source_format_type(self):
- credential_source = {"format": {"type": "xml"}}
-
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
-
- assert excinfo.match(r"Invalid credential_source format 'xml'")
-
- def test_constructor_missing_subject_token_field_name(self):
- credential_source = {"format": {"type": "json"}}
-
- with pytest.raises(ValueError) as excinfo:
- self.make_credentials(credential_source=credential_source)
-
- assert excinfo.match(
- r"Missing subject_token_field_name for JSON credential_source format"
- )
-
- def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
- # Provide empty text file.
- empty_file = tmpdir.join("empty.txt")
- empty_file.write("")
- credential_source = {"file": str(empty_file)}
- credentials = self.make_credentials(credential_source=credential_source)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
-
- assert excinfo.match(r"Missing subject_token in the credential_source file")
-
- def test_retrieve_subject_token_text_file(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_TEXT
- )
-
- subject_token = credentials.retrieve_subject_token(None)
-
- assert subject_token == TEXT_FILE_SUBJECT_TOKEN
-
- def test_retrieve_subject_token_json_file(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON
- )
-
- subject_token = credentials.retrieve_subject_token(None)
-
- assert subject_token == JSON_FILE_SUBJECT_TOKEN
-
- def test_retrieve_subject_token_json_file_invalid_field_name(self):
- credential_source = {
- "file": SUBJECT_TOKEN_JSON_FILE,
- "format": {"type": "json", "subject_token_field_name": "not_found"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
-
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- SUBJECT_TOKEN_JSON_FILE, "not_found"
- )
- )
-
- def test_retrieve_subject_token_invalid_json(self, tmpdir):
- # Provide JSON file. This should result in JSON parsing error.
- invalid_json_file = tmpdir.join("invalid.json")
- invalid_json_file.write("{")
- credential_source = {
- "file": str(invalid_json_file),
- "format": {"type": "json", "subject_token_field_name": "access_token"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
-
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- str(invalid_json_file), "access_token"
- )
- )
-
- def test_retrieve_subject_token_file_not_found(self):
- credential_source = {"file": "./not_found.txt"}
- credentials = self.make_credentials(credential_source=credential_source)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(None)
-
- assert excinfo.match(r"File './not_found.txt' was not found")
-
- def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
- self
- ):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- scopes=SCOPES,
- # Default scopes should be ignored.
- default_scopes=["ignored"],
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=["ignored"],
- )
-
- def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- scopes=None,
- # Default scopes should be used since user specified scopes are none.
- default_scopes=SCOPES,
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=None,
- default_scopes=SCOPES,
- )
-
- def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self):
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=SCOPES,
- # Default scopes should be ignored.
- default_scopes=["ignored"],
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=["ignored"],
- )
-
- def test_refresh_text_file_success_with_impersonation_use_default_scopes(self):
- # Initialize credentials with service account impersonation, basic auth
- # and default scopes (no user scopes).
- credentials = self.make_credentials(
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=None,
- # Default scopes should be used since user specified scopes are none.
- default_scopes=SCOPES,
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=None,
- default_scopes=SCOPES,
- )
-
- def test_refresh_json_file_success_without_impersonation(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with JSON format type.
- credential_source=self.CREDENTIAL_SOURCE_JSON,
- scopes=SCOPES,
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=JSON_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- )
-
- def test_refresh_json_file_success_with_impersonation(self):
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- # Test with JSON format type.
- credential_source=self.CREDENTIAL_SOURCE_JSON,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=SCOPES,
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=JSON_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- )
-
- def test_refresh_with_retrieve_subject_token_error(self):
- credential_source = {
- "file": SUBJECT_TOKEN_JSON_FILE,
- "format": {"type": "json", "subject_token_field_name": "not_found"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(None)
-
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- SUBJECT_TOKEN_JSON_FILE, "not_found"
- )
- )
-
- def test_retrieve_subject_token_from_url(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
- )
- request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
- subject_token = credentials.retrieve_subject_token(request)
-
- assert subject_token == TEXT_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
-
- def test_retrieve_subject_token_from_url_with_headers(self):
- credentials = self.make_credentials(
- credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}}
- )
- request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
- subject_token = credentials.retrieve_subject_token(request)
-
- assert subject_token == TEXT_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(
- request.call_args_list[0].kwargs, {"foo": "bar"}
- )
-
- def test_retrieve_subject_token_from_url_json(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON_URL
- )
- request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
- subject_token = credentials.retrieve_subject_token(request)
-
- assert subject_token == JSON_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
-
- def test_retrieve_subject_token_from_url_json_with_headers(self):
- credentials = self.make_credentials(
- credential_source={
- "url": self.CREDENTIAL_URL,
- "format": {"type": "json", "subject_token_field_name": "access_token"},
- "headers": {"foo": "bar"},
- }
- )
- request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
- subject_token = credentials.retrieve_subject_token(request)
-
- assert subject_token == JSON_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(
- request.call_args_list[0].kwargs, {"foo": "bar"}
- )
-
- def test_retrieve_subject_token_from_url_not_found(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
- )
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(
- self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT)
- )
-
- assert excinfo.match("Unable to retrieve Identity Pool subject token")
-
- def test_retrieve_subject_token_from_url_json_invalid_field(self):
- credential_source = {
- "url": self.CREDENTIAL_URL,
- "format": {"type": "json", "subject_token_field_name": "not_found"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(
- self.make_mock_request(token_data=JSON_FILE_CONTENT)
- )
-
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- self.CREDENTIAL_URL, "not_found"
- )
- )
-
- def test_retrieve_subject_token_from_url_json_invalid_format(self):
- credentials = self.make_credentials(
- credential_source=self.CREDENTIAL_SOURCE_JSON_URL
- )
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.retrieve_subject_token(self.make_mock_request(token_data="{"))
-
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- self.CREDENTIAL_URL, "access_token"
- )
- )
-
- def test_refresh_text_file_success_without_impersonation_url(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
- scopes=SCOPES,
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- credential_data=TEXT_FILE_SUBJECT_TOKEN,
- )
-
- def test_refresh_text_file_success_with_impersonation_url(self):
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- # Test with text format type.
- credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=SCOPES,
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=TEXT_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- credential_data=TEXT_FILE_SUBJECT_TOKEN,
- )
-
- def test_refresh_json_file_success_without_impersonation_url(self):
- credentials = self.make_credentials(
- client_id=CLIENT_ID,
- client_secret=CLIENT_SECRET,
- # Test with JSON format type.
- credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
- scopes=SCOPES,
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=JSON_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=None,
- basic_auth_encoding=BASIC_AUTH_ENCODING,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- credential_data=JSON_FILE_CONTENT,
- )
-
- def test_refresh_json_file_success_with_impersonation_url(self):
- # Initialize credentials with service account impersonation and basic auth.
- credentials = self.make_credentials(
- # Test with JSON format type.
- credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- scopes=SCOPES,
- )
-
- self.assert_underlying_credentials_refresh(
- credentials=credentials,
- audience=AUDIENCE,
- subject_token=JSON_FILE_SUBJECT_TOKEN,
- subject_token_type=SUBJECT_TOKEN_TYPE,
- token_url=TOKEN_URL,
- service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
- basic_auth_encoding=None,
- quota_project_id=None,
- used_scopes=SCOPES,
- scopes=SCOPES,
- default_scopes=None,
- credential_data=JSON_FILE_CONTENT,
- )
-
- def test_refresh_with_retrieve_subject_token_error_url(self):
- credential_source = {
- "url": self.CREDENTIAL_URL,
- "format": {"type": "json", "subject_token_field_name": "not_found"},
- }
- credentials = self.make_credentials(credential_source=credential_source)
-
- with pytest.raises(exceptions.RefreshError) as excinfo:
- credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
-
- assert excinfo.match(
- "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
- self.CREDENTIAL_URL, "not_found"
- )
- )
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
index 430c770..305f939 100644
--- a/tests/test_impersonated_credentials.py
+++ b/tests/test_impersonated_credentials.py
@@ -104,17 +104,12 @@
SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI
)
USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE")
- IAM_ENDPOINT_OVERRIDE = (
- "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
- + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
- )
def make_credentials(
self,
source_credentials=SOURCE_CREDENTIALS,
lifetime=LIFETIME,
target_principal=TARGET_PRINCIPAL,
- iam_endpoint_override=None,
):
return Credentials(
@@ -123,7 +118,6 @@
target_scopes=self.TARGET_SCOPES,
delegates=self.DELEGATES,
lifetime=lifetime,
- iam_endpoint_override=iam_endpoint_override,
)
def test_make_from_user_credentials(self):
@@ -178,34 +172,6 @@
assert credentials.valid
assert not credentials.expired
- @pytest.mark.parametrize("use_data_bytes", [True, False])
- def test_refresh_success_iam_endpoint_override(
- self, use_data_bytes, mock_donor_credentials
- ):
- credentials = self.make_credentials(
- lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE
- )
- token = "token"
-
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
- ).isoformat("T") + "Z"
- response_body = {"accessToken": token, "expireTime": expire_time}
-
- request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK,
- use_data_bytes=use_data_bytes,
- )
-
- credentials.refresh(request)
-
- assert credentials.valid
- assert not credentials.expired
- # Confirm override endpoint used.
- request_kwargs = request.call_args.kwargs
- assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
-
@pytest.mark.parametrize("time_skew", [100, -100])
def test_refresh_source_credentials(self, time_skew):
credentials = self.make_credentials(lifetime=None)
@@ -351,36 +317,6 @@
quota_project_creds = credentials.with_quota_project("project-foo")
assert quota_project_creds._quota_project_id == "project-foo"
- @pytest.mark.parametrize("use_data_bytes", [True, False])
- def test_with_quota_project_iam_endpoint_override(
- self, use_data_bytes, mock_donor_credentials
- ):
- credentials = self.make_credentials(
- lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE
- )
- token = "token"
- # iam_endpoint_override should be copied to created credentials.
- quota_project_creds = credentials.with_quota_project("project-foo")
-
- expire_time = (
- _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
- ).isoformat("T") + "Z"
- response_body = {"accessToken": token, "expireTime": expire_time}
-
- request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK,
- use_data_bytes=use_data_bytes,
- )
-
- quota_project_creds.refresh(request)
-
- assert quota_project_creds.valid
- assert not quota_project_creds.expired
- # Confirm override endpoint used.
- request_kwargs = request.call_args.kwargs
- assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
-
def test_id_token_success(
self, mock_donor_credentials, mock_authorizedsession_idtoken
):