blob: 305f9392667a8c0ce94eb5f469b68337f904afa7 [file] [log] [blame]
salrashid1231fbc6792018-11-09 11:05:34 -08001# Copyright 2018 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17import os
18
19import mock
20import pytest
21from six.moves import http_client
22
23from google.auth import _helpers
24from google.auth import crypt
25from google.auth import exceptions
26from google.auth import impersonated_credentials
27from google.auth import transport
28from google.auth.impersonated_credentials import Credentials
Bu Sun Kim82e224b2020-03-13 13:21:18 -070029from google.oauth2 import credentials
salrashid1231fbc6792018-11-09 11:05:34 -080030from google.oauth2 import service_account
31
Bu Sun Kim9eec0912019-10-21 17:04:21 -070032DATA_DIR = os.path.join(os.path.dirname(__file__), "", "data")
salrashid1231fbc6792018-11-09 11:05:34 -080033
Bu Sun Kim9eec0912019-10-21 17:04:21 -070034with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
salrashid1231fbc6792018-11-09 11:05:34 -080035 PRIVATE_KEY_BYTES = fh.read()
36
Bu Sun Kim9eec0912019-10-21 17:04:21 -070037SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
salrashid1231fbc6792018-11-09 11:05:34 -080038
Bu Sun Kim9eec0912019-10-21 17:04:21 -070039ID_TOKEN_DATA = (
40 "eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew"
41 "Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc"
42 "zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle"
43 "HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L"
44 "y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN"
45 "zA4NTY4In0.redacted"
46)
salrashid1237a8641a2019-08-07 14:31:33 -070047ID_TOKEN_EXPIRY = 1564475051
48
Bu Sun Kim9eec0912019-10-21 17:04:21 -070049with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
salrashid1231fbc6792018-11-09 11:05:34 -080050 SERVICE_ACCOUNT_INFO = json.load(fh)
51
Bu Sun Kim9eec0912019-10-21 17:04:21 -070052SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
53TOKEN_URI = "https://example.com/oauth2/token"
salrashid1231fbc6792018-11-09 11:05:34 -080054
55
56@pytest.fixture
57def mock_donor_credentials():
Bu Sun Kim9eec0912019-10-21 17:04:21 -070058 with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant:
salrashid1231fbc6792018-11-09 11:05:34 -080059 grant.return_value = (
60 "source token",
61 _helpers.utcnow() + datetime.timedelta(seconds=500),
Bu Sun Kim9eec0912019-10-21 17:04:21 -070062 {},
63 )
salrashid1231fbc6792018-11-09 11:05:34 -080064 yield grant
65
66
salrashid1237a8641a2019-08-07 14:31:33 -070067class MockResponse:
68 def __init__(self, json_data, status_code):
69 self.json_data = json_data
70 self.status_code = status_code
71
72 def json(self):
73 return self.json_data
74
75
76@pytest.fixture
77def mock_authorizedsession_sign():
Bu Sun Kim9eec0912019-10-21 17:04:21 -070078 with mock.patch(
79 "google.auth.transport.requests.AuthorizedSession.request", autospec=True
80 ) as auth_session:
81 data = {"keyId": "1", "signedBlob": "c2lnbmF0dXJl"}
salrashid1237a8641a2019-08-07 14:31:33 -070082 auth_session.return_value = MockResponse(data, http_client.OK)
83 yield auth_session
84
85
86@pytest.fixture
87def mock_authorizedsession_idtoken():
Bu Sun Kim9eec0912019-10-21 17:04:21 -070088 with mock.patch(
89 "google.auth.transport.requests.AuthorizedSession.request", autospec=True
90 ) as auth_session:
91 data = {"token": ID_TOKEN_DATA}
salrashid1237a8641a2019-08-07 14:31:33 -070092 auth_session.return_value = MockResponse(data, http_client.OK)
93 yield auth_session
94
95
salrashid1231fbc6792018-11-09 11:05:34 -080096class TestImpersonatedCredentials(object):
97
Bu Sun Kim9eec0912019-10-21 17:04:21 -070098 SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
99 TARGET_PRINCIPAL = "impersonated@project.iam.gserviceaccount.com"
100 TARGET_SCOPES = ["https://www.googleapis.com/auth/devstorage.read_only"]
salrashid1231fbc6792018-11-09 11:05:34 -0800101 DELEGATES = []
102 LIFETIME = 3600
103 SOURCE_CREDENTIALS = service_account.Credentials(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700104 SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI
105 )
Bu Sun Kim82e224b2020-03-13 13:21:18 -0700106 USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE")
salrashid1231fbc6792018-11-09 11:05:34 -0800107
Bu Sun Kim82e224b2020-03-13 13:21:18 -0700108 def make_credentials(
109 self,
110 source_credentials=SOURCE_CREDENTIALS,
111 lifetime=LIFETIME,
112 target_principal=TARGET_PRINCIPAL,
113 ):
salrashid1237a8641a2019-08-07 14:31:33 -0700114
salrashid1231fbc6792018-11-09 11:05:34 -0800115 return Credentials(
Bu Sun Kim82e224b2020-03-13 13:21:18 -0700116 source_credentials=source_credentials,
salrashid1237a8641a2019-08-07 14:31:33 -0700117 target_principal=target_principal,
salrashid1231fbc6792018-11-09 11:05:34 -0800118 target_scopes=self.TARGET_SCOPES,
119 delegates=self.DELEGATES,
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700120 lifetime=lifetime,
121 )
salrashid1231fbc6792018-11-09 11:05:34 -0800122
Bu Sun Kim82e224b2020-03-13 13:21:18 -0700123 def test_make_from_user_credentials(self):
124 credentials = self.make_credentials(
125 source_credentials=self.USER_SOURCE_CREDENTIALS
126 )
127 assert not credentials.valid
128 assert credentials.expired
129
salrashid1231fbc6792018-11-09 11:05:34 -0800130 def test_default_state(self):
131 credentials = self.make_credentials()
132 assert not credentials.valid
133 assert credentials.expired
134
arithmetic1728e115bae2020-05-06 16:00:17 -0700135 def make_request(
136 self,
137 data,
138 status=http_client.OK,
139 headers=None,
140 side_effect=None,
141 use_data_bytes=True,
142 ):
salrashid1231fbc6792018-11-09 11:05:34 -0800143 response = mock.create_autospec(transport.Response, instance=False)
144 response.status = status
arithmetic1728e115bae2020-05-06 16:00:17 -0700145 response.data = _helpers.to_bytes(data) if use_data_bytes else data
salrashid1231fbc6792018-11-09 11:05:34 -0800146 response.headers = headers or {}
147
148 request = mock.create_autospec(transport.Request, instance=False)
149 request.side_effect = side_effect
150 request.return_value = response
151
152 return request
153
arithmetic1728e115bae2020-05-06 16:00:17 -0700154 @pytest.mark.parametrize("use_data_bytes", [True, False])
155 def test_refresh_success(self, use_data_bytes, mock_donor_credentials):
salrashid1231fbc6792018-11-09 11:05:34 -0800156 credentials = self.make_credentials(lifetime=None)
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700157 token = "token"
salrashid1231fbc6792018-11-09 11:05:34 -0800158
159 expire_time = (
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700160 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
161 ).isoformat("T") + "Z"
162 response_body = {"accessToken": token, "expireTime": expire_time}
salrashid1231fbc6792018-11-09 11:05:34 -0800163
164 request = self.make_request(
arithmetic1728e115bae2020-05-06 16:00:17 -0700165 data=json.dumps(response_body),
166 status=http_client.OK,
167 use_data_bytes=use_data_bytes,
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700168 )
salrashid1231fbc6792018-11-09 11:05:34 -0800169
170 credentials.refresh(request)
171
172 assert credentials.valid
173 assert not credentials.expired
174
arithmetic1728eb7be3f2020-05-28 11:01:24 -0700175 @pytest.mark.parametrize("time_skew", [100, -100])
176 def test_refresh_source_credentials(self, time_skew):
177 credentials = self.make_credentials(lifetime=None)
178
179 # Source credentials is refreshed only if it is expired within
180 # _helpers.CLOCK_SKEW from now. We add a time_skew to the expiry, so
181 # source credentials is refreshed only if time_skew <= 0.
182 credentials._source_credentials.expiry = (
183 _helpers.utcnow()
184 + _helpers.CLOCK_SKEW
185 + datetime.timedelta(seconds=time_skew)
186 )
187 credentials._source_credentials.token = "Token"
188
189 with mock.patch(
190 "google.oauth2.service_account.Credentials.refresh", autospec=True
191 ) as source_cred_refresh:
192 expire_time = (
193 _helpers.utcnow().replace(microsecond=0)
194 + datetime.timedelta(seconds=500)
195 ).isoformat("T") + "Z"
196 response_body = {"accessToken": "token", "expireTime": expire_time}
197 request = self.make_request(
198 data=json.dumps(response_body), status=http_client.OK
199 )
200
201 credentials.refresh(request)
202
203 assert credentials.valid
204 assert not credentials.expired
205
206 # Source credentials is refreshed only if it is expired within
207 # _helpers.CLOCK_SKEW
208 if time_skew > 0:
209 source_cred_refresh.assert_not_called()
210 else:
211 source_cred_refresh.assert_called_once()
212
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700213 def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials):
salrashid1231fbc6792018-11-09 11:05:34 -0800214 credentials = self.make_credentials(lifetime=None)
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700215 token = "token"
salrashid1231fbc6792018-11-09 11:05:34 -0800216
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700217 expire_time = (_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat(
218 "T"
219 )
220 response_body = {"accessToken": token, "expireTime": expire_time}
salrashid1231fbc6792018-11-09 11:05:34 -0800221
222 request = self.make_request(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700223 data=json.dumps(response_body), status=http_client.OK
224 )
salrashid1231fbc6792018-11-09 11:05:34 -0800225
226 with pytest.raises(exceptions.RefreshError) as excinfo:
227 credentials.refresh(request)
228
229 assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
230
231 assert not credentials.valid
232 assert credentials.expired
233
salrashid1231fbc6792018-11-09 11:05:34 -0800234 def test_refresh_failure_unauthorzed(self, mock_donor_credentials):
235 credentials = self.make_credentials(lifetime=None)
236
237 response_body = {
238 "error": {
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700239 "code": 403,
240 "message": "The caller does not have permission",
241 "status": "PERMISSION_DENIED",
salrashid1231fbc6792018-11-09 11:05:34 -0800242 }
243 }
244
245 request = self.make_request(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700246 data=json.dumps(response_body), status=http_client.UNAUTHORIZED
247 )
salrashid1231fbc6792018-11-09 11:05:34 -0800248
249 with pytest.raises(exceptions.RefreshError) as excinfo:
250 credentials.refresh(request)
251
252 assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
253
254 assert not credentials.valid
255 assert credentials.expired
256
257 def test_refresh_failure_http_error(self, mock_donor_credentials):
258 credentials = self.make_credentials(lifetime=None)
259
260 response_body = {}
261
262 request = self.make_request(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700263 data=json.dumps(response_body), status=http_client.HTTPException
264 )
salrashid1231fbc6792018-11-09 11:05:34 -0800265
266 with pytest.raises(exceptions.RefreshError) as excinfo:
267 credentials.refresh(request)
268
269 assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
270
271 assert not credentials.valid
272 assert credentials.expired
273
274 def test_expired(self):
275 credentials = self.make_credentials(lifetime=None)
276 assert credentials.expired
salrashid1237a8641a2019-08-07 14:31:33 -0700277
278 def test_signer(self):
279 credentials = self.make_credentials()
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700280 assert isinstance(credentials.signer, impersonated_credentials.Credentials)
salrashid1237a8641a2019-08-07 14:31:33 -0700281
282 def test_signer_email(self):
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700283 credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
salrashid1237a8641a2019-08-07 14:31:33 -0700284 assert credentials.signer_email == self.TARGET_PRINCIPAL
285
286 def test_service_account_email(self):
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700287 credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
salrashid1237a8641a2019-08-07 14:31:33 -0700288 assert credentials.service_account_email == self.TARGET_PRINCIPAL
289
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700290 def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign):
salrashid1237a8641a2019-08-07 14:31:33 -0700291 credentials = self.make_credentials(lifetime=None)
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700292 token = "token"
salrashid1237a8641a2019-08-07 14:31:33 -0700293
294 expire_time = (
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700295 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
296 ).isoformat("T") + "Z"
297 token_response_body = {"accessToken": token, "expireTime": expire_time}
salrashid1237a8641a2019-08-07 14:31:33 -0700298
299 response = mock.create_autospec(transport.Response, instance=False)
300 response.status = http_client.OK
301 response.data = _helpers.to_bytes(json.dumps(token_response_body))
302
303 request = mock.create_autospec(transport.Request, instance=False)
304 request.return_value = response
305
306 credentials.refresh(request)
307
308 assert credentials.valid
309 assert not credentials.expired
310
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700311 signature = credentials.sign_bytes(b"signed bytes")
312 assert signature == b"signature"
salrashid1237a8641a2019-08-07 14:31:33 -0700313
Bu Sun Kim3dda7b22020-07-09 10:39:39 -0700314 def test_with_quota_project(self):
315 credentials = self.make_credentials()
316
317 quota_project_creds = credentials.with_quota_project("project-foo")
318 assert quota_project_creds._quota_project_id == "project-foo"
319
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700320 def test_id_token_success(
321 self, mock_donor_credentials, mock_authorizedsession_idtoken
322 ):
salrashid1237a8641a2019-08-07 14:31:33 -0700323 credentials = self.make_credentials(lifetime=None)
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700324 token = "token"
325 target_audience = "https://foo.bar"
salrashid1237a8641a2019-08-07 14:31:33 -0700326
327 expire_time = (
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700328 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
329 ).isoformat("T") + "Z"
330 response_body = {"accessToken": token, "expireTime": expire_time}
salrashid1237a8641a2019-08-07 14:31:33 -0700331
332 request = self.make_request(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700333 data=json.dumps(response_body), status=http_client.OK
334 )
salrashid1237a8641a2019-08-07 14:31:33 -0700335
336 credentials.refresh(request)
337
338 assert credentials.valid
339 assert not credentials.expired
340
341 id_creds = impersonated_credentials.IDTokenCredentials(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700342 credentials, target_audience=target_audience
343 )
salrashid1237a8641a2019-08-07 14:31:33 -0700344 id_creds.refresh(request)
345
346 assert id_creds.token == ID_TOKEN_DATA
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700347 assert id_creds.expiry == datetime.datetime.fromtimestamp(ID_TOKEN_EXPIRY)
salrashid1237a8641a2019-08-07 14:31:33 -0700348
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700349 def test_id_token_from_credential(
350 self, mock_donor_credentials, mock_authorizedsession_idtoken
351 ):
salrashid1237a8641a2019-08-07 14:31:33 -0700352 credentials = self.make_credentials(lifetime=None)
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700353 token = "token"
354 target_audience = "https://foo.bar"
salrashid1237a8641a2019-08-07 14:31:33 -0700355
356 expire_time = (
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700357 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
358 ).isoformat("T") + "Z"
359 response_body = {"accessToken": token, "expireTime": expire_time}
salrashid1237a8641a2019-08-07 14:31:33 -0700360
361 request = self.make_request(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700362 data=json.dumps(response_body), status=http_client.OK
363 )
salrashid1237a8641a2019-08-07 14:31:33 -0700364
365 credentials.refresh(request)
366
367 assert credentials.valid
368 assert not credentials.expired
369
370 id_creds = impersonated_credentials.IDTokenCredentials(
Pietro De Nicolaofd9b5b12020-12-11 20:00:30 +0100371 credentials, target_audience=target_audience, include_email=True
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700372 )
salrashid1237a8641a2019-08-07 14:31:33 -0700373 id_creds = id_creds.from_credentials(target_credentials=credentials)
374 id_creds.refresh(request)
375
376 assert id_creds.token == ID_TOKEN_DATA
Pietro De Nicolaofd9b5b12020-12-11 20:00:30 +0100377 assert id_creds._include_email is True
salrashid1237a8641a2019-08-07 14:31:33 -0700378
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700379 def test_id_token_with_target_audience(
380 self, mock_donor_credentials, mock_authorizedsession_idtoken
381 ):
salrashid1237a8641a2019-08-07 14:31:33 -0700382 credentials = self.make_credentials(lifetime=None)
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700383 token = "token"
384 target_audience = "https://foo.bar"
salrashid1237a8641a2019-08-07 14:31:33 -0700385
386 expire_time = (
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700387 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
388 ).isoformat("T") + "Z"
389 response_body = {"accessToken": token, "expireTime": expire_time}
salrashid1237a8641a2019-08-07 14:31:33 -0700390
391 request = self.make_request(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700392 data=json.dumps(response_body), status=http_client.OK
393 )
salrashid1237a8641a2019-08-07 14:31:33 -0700394
395 credentials.refresh(request)
396
397 assert credentials.valid
398 assert not credentials.expired
399
Pietro De Nicolaofd9b5b12020-12-11 20:00:30 +0100400 id_creds = impersonated_credentials.IDTokenCredentials(
401 credentials, include_email=True
402 )
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700403 id_creds = id_creds.with_target_audience(target_audience=target_audience)
salrashid1237a8641a2019-08-07 14:31:33 -0700404 id_creds.refresh(request)
405
406 assert id_creds.token == ID_TOKEN_DATA
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700407 assert id_creds.expiry == datetime.datetime.fromtimestamp(ID_TOKEN_EXPIRY)
Pietro De Nicolaofd9b5b12020-12-11 20:00:30 +0100408 assert id_creds._include_email is True
salrashid1237a8641a2019-08-07 14:31:33 -0700409
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700410 def test_id_token_invalid_cred(
411 self, mock_donor_credentials, mock_authorizedsession_idtoken
412 ):
salrashid1237a8641a2019-08-07 14:31:33 -0700413 credentials = None
414
415 with pytest.raises(exceptions.GoogleAuthError) as excinfo:
416 impersonated_credentials.IDTokenCredentials(credentials)
417
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700418 assert excinfo.match("Provided Credential must be" " impersonated_credentials")
salrashid1237a8641a2019-08-07 14:31:33 -0700419
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700420 def test_id_token_with_include_email(
421 self, mock_donor_credentials, mock_authorizedsession_idtoken
422 ):
salrashid1237a8641a2019-08-07 14:31:33 -0700423 credentials = self.make_credentials(lifetime=None)
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700424 token = "token"
425 target_audience = "https://foo.bar"
salrashid1237a8641a2019-08-07 14:31:33 -0700426
427 expire_time = (
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700428 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
429 ).isoformat("T") + "Z"
430 response_body = {"accessToken": token, "expireTime": expire_time}
salrashid1237a8641a2019-08-07 14:31:33 -0700431
432 request = self.make_request(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700433 data=json.dumps(response_body), status=http_client.OK
434 )
salrashid1237a8641a2019-08-07 14:31:33 -0700435
436 credentials.refresh(request)
437
438 assert credentials.valid
439 assert not credentials.expired
440
441 id_creds = impersonated_credentials.IDTokenCredentials(
Bu Sun Kim9eec0912019-10-21 17:04:21 -0700442 credentials, target_audience=target_audience
443 )
salrashid1237a8641a2019-08-07 14:31:33 -0700444 id_creds = id_creds.with_include_email(True)
445 id_creds.refresh(request)
446
447 assert id_creds.token == ID_TOKEN_DATA
Bu Sun Kim3dda7b22020-07-09 10:39:39 -0700448
449 def test_id_token_with_quota_project(
450 self, mock_donor_credentials, mock_authorizedsession_idtoken
451 ):
452 credentials = self.make_credentials(lifetime=None)
453 token = "token"
454 target_audience = "https://foo.bar"
455
456 expire_time = (
457 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
458 ).isoformat("T") + "Z"
459 response_body = {"accessToken": token, "expireTime": expire_time}
460
461 request = self.make_request(
462 data=json.dumps(response_body), status=http_client.OK
463 )
464
465 credentials.refresh(request)
466
467 assert credentials.valid
468 assert not credentials.expired
469
470 id_creds = impersonated_credentials.IDTokenCredentials(
471 credentials, target_audience=target_audience
472 )
473 id_creds = id_creds.with_quota_project("project-foo")
474 id_creds.refresh(request)
475
476 assert id_creds.quota_project_id == "project-foo"