fix: fix unit tests so they can work in g3 (#714)
* fix: mock not working well with g3 test
* update
* update
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index 40a4ca2..648541e 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -203,18 +203,18 @@
assert "x-goog-user-project" not in headers
assert "token" in headers["authorization"]
- @mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
- def test__create_self_signed_jwt(self, from_signing_credentials):
+ @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
+ def test__create_self_signed_jwt(self, jwt):
credentials = service_account.Credentials(
SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
)
audience = "https://pubsub.googleapis.com"
credentials._create_self_signed_jwt(audience)
- from_signing_credentials.assert_called_once_with(credentials, audience)
+ jwt.from_signing_credentials.assert_called_once_with(credentials, audience)
- @mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
- def test__create_self_signed_jwt_with_user_scopes(self, from_signing_credentials):
+ @mock.patch("google.auth.jwt.Credentials", instance=True, autospec=True)
+ def test__create_self_signed_jwt_with_user_scopes(self, jwt):
credentials = service_account.Credentials(
SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"]
)
@@ -223,7 +223,7 @@
credentials._create_self_signed_jwt(audience)
# JWT should not be created if there are user-defined scopes
- from_signing_credentials.assert_not_called()
+ jwt.from_signing_credentials.assert_not_called()
@mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_success(self, jwt_grant):
diff --git a/tests/oauth2/test_sts.py b/tests/oauth2/test_sts.py
index 8792bd6..e8e008d 100644
--- a/tests/oauth2/test_sts.py
+++ b/tests/oauth2/test_sts.py
@@ -128,7 +128,7 @@
self.ADDON_HEADERS,
)
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE
def test_exchange_token_partial_success_without_auth(self):
@@ -157,7 +157,7 @@
requested_token_type=self.REQUESTED_TOKEN_TYPE,
)
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE
def test_exchange_token_non200_without_auth(self):
@@ -227,7 +227,7 @@
self.ADDON_HEADERS,
)
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE
def test_exchange_token_partial_success_with_basic_auth(self):
@@ -259,7 +259,7 @@
requested_token_type=self.REQUESTED_TOKEN_TYPE,
)
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE
def test_exchange_token_non200_with_basic_auth(self):
@@ -331,7 +331,7 @@
self.ADDON_HEADERS,
)
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE
def test_exchange_token_partial_success_with_reqbody_auth(self):
@@ -362,7 +362,7 @@
requested_token_type=self.REQUESTED_TOKEN_TYPE,
)
- self.assert_request_kwargs(request.call_args.kwargs, headers, request_data)
+ self.assert_request_kwargs(request.call_args[1], headers, request_data)
assert response == self.SUCCESS_RESPONSE
def test_exchange_token_non200_with_reqbody_auth(self):
diff --git a/tests/test_aws.py b/tests/test_aws.py
index 9a8f98e..7a55841 100644
--- a/tests/test_aws.py
+++ b/tests/test_aws.py
@@ -959,15 +959,15 @@
)
# Assert region request.
self.assert_aws_metadata_request_kwargs(
- request.call_args_list[0].kwargs, REGION_URL
+ request.call_args_list[0][1], REGION_URL
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
- request.call_args_list[1].kwargs, SECURITY_CREDS_URL
+ request.call_args_list[1][1], SECURITY_CREDS_URL
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
- request.call_args_list[2].kwargs,
+ request.call_args_list[2][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{"Content-Type": "application/json"},
)
@@ -986,11 +986,11 @@
assert len(new_request.call_args_list) == 2
# Assert role request.
self.assert_aws_metadata_request_kwargs(
- new_request.call_args_list[0].kwargs, SECURITY_CREDS_URL
+ new_request.call_args_list[0][1], SECURITY_CREDS_URL
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
- new_request.call_args_list[1].kwargs,
+ new_request.call_args_list[1][1],
"{}/{}".format(SECURITY_CREDS_URL, self.AWS_ROLE),
{"Content-Type": "application/json"},
)
@@ -1193,7 +1193,7 @@
assert len(request.call_args_list) == 4
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
- request.call_args_list[3].kwargs, token_headers, token_request_data
+ request.call_args_list[3][1], token_headers, token_request_data
)
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
assert credentials.quota_project_id == QUOTA_PROJECT_ID
@@ -1249,7 +1249,7 @@
assert len(request.call_args_list) == 4
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
- request.call_args_list[3].kwargs, token_headers, token_request_data
+ request.call_args_list[3][1], token_headers, token_request_data
)
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
assert credentials.quota_project_id == QUOTA_PROJECT_ID
@@ -1326,12 +1326,12 @@
assert len(request.call_args_list) == 5
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
- request.call_args_list[3].kwargs, token_headers, token_request_data
+ request.call_args_list[3][1], token_headers, token_request_data
)
# Fifth request should be sent to iamcredentials endpoint for service
# account impersonation.
self.assert_impersonation_request_kwargs(
- request.call_args_list[4].kwargs,
+ request.call_args_list[4][1],
impersonation_headers,
impersonation_request_data,
)
@@ -1410,12 +1410,12 @@
assert len(request.call_args_list) == 5
# Fourth request should be sent to GCP STS endpoint.
self.assert_token_request_kwargs(
- request.call_args_list[3].kwargs, token_headers, token_request_data
+ request.call_args_list[3][1], token_headers, token_request_data
)
# Fifth request should be sent to iamcredentials endpoint for service
# account impersonation.
self.assert_impersonation_request_kwargs(
- request.call_args_list[4].kwargs,
+ request.call_args_list[4][1],
impersonation_headers,
impersonation_request_data,
)
diff --git a/tests/test_external_account.py b/tests/test_external_account.py
index 42e53ec..8f8d980 100644
--- a/tests/test_external_account.py
+++ b/tests/test_external_account.py
@@ -363,9 +363,7 @@
credentials.refresh(request)
- self.assert_token_request_kwargs(
- request.call_args.kwargs, headers, request_data
- )
+ self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert credentials.expiry == expected_expiry
assert not credentials.expired
@@ -422,11 +420,11 @@
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
- request.call_args_list[0].kwargs, token_headers, token_request_data
+ request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
- request.call_args_list[1].kwargs,
+ request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
@@ -436,7 +434,7 @@
assert credentials.token == impersonation_response["accessToken"]
def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
- self
+ self,
):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_data = {
@@ -458,9 +456,7 @@
credentials.refresh(request)
- self.assert_token_request_kwargs(
- request.call_args.kwargs, headers, request_data
- )
+ self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
@@ -488,9 +484,7 @@
credentials.refresh(request)
- self.assert_token_request_kwargs(
- request.call_args.kwargs, headers, request_data
- )
+ self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
@@ -551,9 +545,7 @@
credentials.refresh(request)
- self.assert_token_request_kwargs(
- request.call_args.kwargs, headers, request_data
- )
+ self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
assert credentials.valid
assert not credentials.expired
assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
@@ -616,11 +608,11 @@
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
- request.call_args_list[0].kwargs, token_headers, token_request_data
+ request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
- request.call_args_list[1].kwargs,
+ request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
@@ -687,11 +679,11 @@
assert len(request.call_args_list) == 2
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
- request.call_args_list[0].kwargs, token_headers, token_request_data
+ request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
- request.call_args_list[1].kwargs,
+ request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
@@ -1045,11 +1037,11 @@
assert len(request.call_args_list) == 3
# Verify token exchange request parameters.
self.assert_token_request_kwargs(
- request.call_args_list[0].kwargs, token_headers, token_request_data
+ request.call_args_list[0][1], token_headers, token_request_data
)
# Verify service account impersonation request parameters.
self.assert_impersonation_request_kwargs(
- request.call_args_list[1].kwargs,
+ request.call_args_list[1][1],
impersonation_headers,
impersonation_request_data,
)
@@ -1061,7 +1053,7 @@
assert credentials.token == impersonation_response["accessToken"]
# Verify cloud resource manager request parameters.
self.assert_resource_manager_request_kwargs(
- request.call_args_list[2].kwargs,
+ request.call_args_list[2][1],
self.PROJECT_NUMBER,
{
"x-goog-user-project": self.QUOTA_PROJECT_ID,
diff --git a/tests/test_iam.py b/tests/test_iam.py
index fbd3e41..382713b 100644
--- a/tests/test_iam.py
+++ b/tests/test_iam.py
@@ -89,7 +89,7 @@
returned_signature = signer.sign("123")
assert returned_signature == signature
- kwargs = request.call_args.kwargs
+ kwargs = request.call_args[1]
assert kwargs["headers"]["Content-Type"] == "application/json"
def test_sign_bytes_failure(self):
diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py
index c017ab5..90a0e25 100644
--- a/tests/test_identity_pool.py
+++ b/tests/test_identity_pool.py
@@ -223,10 +223,10 @@
assert len(request.call_args_list) == len(requests)
if credential_data:
- cls.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
+ cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
# Verify token exchange request parameters.
cls.assert_token_request_kwargs(
- request.call_args_list[token_request_index].kwargs,
+ request.call_args_list[token_request_index][1],
token_headers,
token_request_data,
token_url,
@@ -235,7 +235,7 @@
# is processed.
if service_account_impersonation_url:
cls.assert_impersonation_request_kwargs(
- request.call_args_list[impersonation_request_index].kwargs,
+ request.call_args_list[impersonation_request_index][1],
impersonation_headers,
impersonation_request_data,
service_account_impersonation_url,
@@ -505,7 +505,7 @@
assert excinfo.match(r"File './not_found.txt' was not found")
def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
- self
+ self,
):
credentials = self.make_credentials(
client_id=CLIENT_ID,
@@ -677,7 +677,7 @@
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == TEXT_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
+ self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
def test_retrieve_subject_token_from_url_with_headers(self):
credentials = self.make_credentials(
@@ -688,7 +688,7 @@
assert subject_token == TEXT_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(
- request.call_args_list[0].kwargs, {"foo": "bar"}
+ request.call_args_list[0][1], {"foo": "bar"}
)
def test_retrieve_subject_token_from_url_json(self):
@@ -699,7 +699,7 @@
subject_token = credentials.retrieve_subject_token(request)
assert subject_token == JSON_FILE_SUBJECT_TOKEN
- self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
+ self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
def test_retrieve_subject_token_from_url_json_with_headers(self):
credentials = self.make_credentials(
@@ -714,7 +714,7 @@
assert subject_token == JSON_FILE_SUBJECT_TOKEN
self.assert_credential_request_kwargs(
- request.call_args_list[0].kwargs, {"foo": "bar"}
+ request.call_args_list[0][1], {"foo": "bar"}
)
def test_retrieve_subject_token_from_url_not_found(self):
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
index 430c770..90de704 100644
--- a/tests/test_impersonated_credentials.py
+++ b/tests/test_impersonated_credentials.py
@@ -203,7 +203,7 @@
assert credentials.valid
assert not credentials.expired
# Confirm override endpoint used.
- request_kwargs = request.call_args.kwargs
+ request_kwargs = request.call_args[1]
assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
@pytest.mark.parametrize("time_skew", [100, -100])
@@ -378,7 +378,7 @@
assert quota_project_creds.valid
assert not quota_project_creds.expired
# Confirm override endpoint used.
- request_kwargs = request.call_args.kwargs
+ request_kwargs = request.call_args[1]
assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE
def test_id_token_success(