fix: fix impersonated cred for gcloud (#516)
* fix: fix impersonated cred for gcloud
(1) For IDTokenCredentials.refresh(self.request), use the provided
request instead of creating a new one
(2) For Credentials, only refresh the source credentials if it is not
valid
* update expired func
diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py
index 4299802..c682277 100644
--- a/google/auth/impersonated_credentials.py
+++ b/google/auth/impersonated_credentials.py
@@ -226,10 +226,6 @@
def refresh(self, request):
self._update_token(request)
- @property
- def expired(self):
- return _helpers.utcnow() >= self.expiry
-
def _update_token(self, request):
"""Updates credentials with a new access_token representing
the impersonated account.
@@ -239,8 +235,9 @@
to use for refreshing credentials.
"""
- # Refresh our source credentials.
- self._source_credentials.refresh(request)
+ # Refresh our source credentials if it is not valid.
+ if not self._source_credentials.valid:
+ self._source_credentials.refresh(request)
body = {
"delegates": self._delegates,
@@ -347,7 +344,9 @@
headers = {"Content-Type": "application/json"}
- authed_session = AuthorizedSession(self._target_credentials._source_credentials)
+ authed_session = AuthorizedSession(
+ self._target_credentials._source_credentials, auth_request=request
+ )
response = authed_session.post(
url=iam_sign_endpoint,
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
index 19e2f34..e0b5b11 100644
--- a/tests/test_impersonated_credentials.py
+++ b/tests/test_impersonated_credentials.py
@@ -172,6 +172,44 @@
assert credentials.valid
assert not credentials.expired
+ @pytest.mark.parametrize("time_skew", [100, -100])
+ def test_refresh_source_credentials(self, time_skew):
+ credentials = self.make_credentials(lifetime=None)
+
+ # Source credentials is refreshed only if it is expired within
+ # _helpers.CLOCK_SKEW from now. We add a time_skew to the expiry, so
+ # source credentials is refreshed only if time_skew <= 0.
+ credentials._source_credentials.expiry = (
+ _helpers.utcnow()
+ + _helpers.CLOCK_SKEW
+ + datetime.timedelta(seconds=time_skew)
+ )
+ credentials._source_credentials.token = "Token"
+
+ with mock.patch(
+ "google.oauth2.service_account.Credentials.refresh", autospec=True
+ ) as source_cred_refresh:
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0)
+ + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": "token", "expireTime": expire_time}
+ request = self.make_request(
+ data=json.dumps(response_body), status=http_client.OK
+ )
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ # Source credentials is refreshed only if it is expired within
+ # _helpers.CLOCK_SKEW
+ if time_skew > 0:
+ source_cred_refresh.assert_not_called()
+ else:
+ source_cred_refresh.assert_called_once()
+
def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
token = "token"