fix: fix impersonated cred for gcloud (#516)
* fix: fix impersonated cred for gcloud
(1) For IDTokenCredentials.refresh(self.request), use the provided
request instead of creating a new one
(2) For Credentials, only refresh the source credentials if it is not
valid
* update expired func
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
index 19e2f34..e0b5b11 100644
--- a/tests/test_impersonated_credentials.py
+++ b/tests/test_impersonated_credentials.py
@@ -172,6 +172,44 @@
assert credentials.valid
assert not credentials.expired
+ @pytest.mark.parametrize("time_skew", [100, -100])
+ def test_refresh_source_credentials(self, time_skew):
+ credentials = self.make_credentials(lifetime=None)
+
+ # Source credentials is refreshed only if it is expired within
+ # _helpers.CLOCK_SKEW from now. We add a time_skew to the expiry, so
+ # source credentials is refreshed only if time_skew <= 0.
+ credentials._source_credentials.expiry = (
+ _helpers.utcnow()
+ + _helpers.CLOCK_SKEW
+ + datetime.timedelta(seconds=time_skew)
+ )
+ credentials._source_credentials.token = "Token"
+
+ with mock.patch(
+ "google.oauth2.service_account.Credentials.refresh", autospec=True
+ ) as source_cred_refresh:
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0)
+ + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": "token", "expireTime": expire_time}
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ # Source credentials is refreshed only if it is expired within
+ # _helpers.CLOCK_SKEW
+ if time_skew > 0:
+ source_cred_refresh.assert_not_called()
+ else:
+ source_cred_refresh.assert_called_once()
+
def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
token = "token"