blob: c017ab59f64fbb854355b14e42f15b11870c5681 [file] [log] [blame]
bojeil-googled4d7f382021-02-16 12:33:20 -08001# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17import os
18
19import mock
20import pytest
21from six.moves import http_client
22from six.moves import urllib
23
24from google.auth import _helpers
25from google.auth import exceptions
26from google.auth import identity_pool
27from google.auth import transport
28
29
30CLIENT_ID = "username"
31CLIENT_SECRET = "password"
32# Base64 encoding of "username:password".
33BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
34SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
35SERVICE_ACCOUNT_IMPERSONATION_URL = (
36 "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
37 + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
38)
39QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
40SCOPES = ["scope1", "scope2"]
41DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
42SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
43SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
44SUBJECT_TOKEN_FIELD_NAME = "access_token"
45
46with open(SUBJECT_TOKEN_TEXT_FILE) as fh:
47 TEXT_FILE_SUBJECT_TOKEN = fh.read()
48
49with open(SUBJECT_TOKEN_JSON_FILE) as fh:
50 JSON_FILE_CONTENT = json.load(fh)
51 JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME)
52
53TOKEN_URL = "https://sts.googleapis.com/v1/token"
54SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
55AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
56
57
58class TestCredentials(object):
59 CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE}
60 CREDENTIAL_SOURCE_JSON = {
61 "file": SUBJECT_TOKEN_JSON_FILE,
62 "format": {"type": "json", "subject_token_field_name": "access_token"},
63 }
64 CREDENTIAL_URL = "http://fakeurl.com"
65 CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL}
66 CREDENTIAL_SOURCE_JSON_URL = {
67 "url": CREDENTIAL_URL,
68 "format": {"type": "json", "subject_token_field_name": "access_token"},
69 }
70 SUCCESS_RESPONSE = {
71 "access_token": "ACCESS_TOKEN",
72 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
73 "token_type": "Bearer",
74 "expires_in": 3600,
75 "scope": " ".join(SCOPES),
76 }
77
78 @classmethod
79 def make_mock_response(cls, status, data):
80 response = mock.create_autospec(transport.Response, instance=True)
81 response.status = status
82 if isinstance(data, dict):
83 response.data = json.dumps(data).encode("utf-8")
84 else:
85 response.data = data
86 return response
87
88 @classmethod
89 def make_mock_request(
90 cls, token_status=http_client.OK, token_data=None, *extra_requests
91 ):
92 responses = []
93 responses.append(cls.make_mock_response(token_status, token_data))
94
95 while len(extra_requests) > 0:
96 # If service account impersonation is requested, mock the expected response.
97 status, data, extra_requests = (
98 extra_requests[0],
99 extra_requests[1],
100 extra_requests[2:],
101 )
102 responses.append(cls.make_mock_response(status, data))
103
104 request = mock.create_autospec(transport.Request)
105 request.side_effect = responses
106
107 return request
108
109 @classmethod
110 def assert_credential_request_kwargs(
111 cls, request_kwargs, headers, url=CREDENTIAL_URL
112 ):
113 assert request_kwargs["url"] == url
114 assert request_kwargs["method"] == "GET"
115 assert request_kwargs["headers"] == headers
116 assert request_kwargs.get("body", None) is None
117
118 @classmethod
119 def assert_token_request_kwargs(
120 cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
121 ):
122 assert request_kwargs["url"] == token_url
123 assert request_kwargs["method"] == "POST"
124 assert request_kwargs["headers"] == headers
125 assert request_kwargs["body"] is not None
126 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
127 assert len(body_tuples) == len(request_data.keys())
128 for (k, v) in body_tuples:
129 assert v.decode("utf-8") == request_data[k.decode("utf-8")]
130
131 @classmethod
132 def assert_impersonation_request_kwargs(
133 cls,
134 request_kwargs,
135 headers,
136 request_data,
137 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
138 ):
139 assert request_kwargs["url"] == service_account_impersonation_url
140 assert request_kwargs["method"] == "POST"
141 assert request_kwargs["headers"] == headers
142 assert request_kwargs["body"] is not None
143 body_json = json.loads(request_kwargs["body"].decode("utf-8"))
144 assert body_json == request_data
145
146 @classmethod
147 def assert_underlying_credentials_refresh(
148 cls,
149 credentials,
150 audience,
151 subject_token,
152 subject_token_type,
153 token_url,
154 service_account_impersonation_url=None,
155 basic_auth_encoding=None,
156 quota_project_id=None,
157 used_scopes=None,
158 credential_data=None,
159 scopes=None,
160 default_scopes=None,
161 ):
162 """Utility to assert that a credentials are initialized with the expected
163 attributes by calling refresh functionality and confirming response matches
164 expected one and that the underlying requests were populated with the
165 expected parameters.
166 """
167 # STS token exchange request/response.
168 token_response = cls.SUCCESS_RESPONSE.copy()
169 token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
170 if basic_auth_encoding:
171 token_headers["Authorization"] = "Basic " + basic_auth_encoding
172
173 if service_account_impersonation_url:
174 token_scopes = "https://www.googleapis.com/auth/iam"
175 else:
176 token_scopes = " ".join(used_scopes or [])
177
178 token_request_data = {
179 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
180 "audience": audience,
181 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
182 "scope": token_scopes,
183 "subject_token": subject_token,
184 "subject_token_type": subject_token_type,
185 }
186
187 if service_account_impersonation_url:
188 # Service account impersonation request/response.
189 expire_time = (
190 _helpers.utcnow().replace(microsecond=0)
191 + datetime.timedelta(seconds=3600)
192 ).isoformat("T") + "Z"
193 impersonation_response = {
194 "accessToken": "SA_ACCESS_TOKEN",
195 "expireTime": expire_time,
196 }
197 impersonation_headers = {
198 "Content-Type": "application/json",
199 "authorization": "Bearer {}".format(token_response["access_token"]),
200 }
201 impersonation_request_data = {
202 "delegates": None,
203 "scope": used_scopes,
204 "lifetime": "3600s",
205 }
206
207 # Initialize mock request to handle token retrieval, token exchange and
208 # service account impersonation request.
209 requests = []
210 if credential_data:
211 requests.append((http_client.OK, credential_data))
212
213 token_request_index = len(requests)
214 requests.append((http_client.OK, token_response))
215
216 if service_account_impersonation_url:
217 impersonation_request_index = len(requests)
218 requests.append((http_client.OK, impersonation_response))
219
220 request = cls.make_mock_request(*[el for req in requests for el in req])
221
222 credentials.refresh(request)
223
224 assert len(request.call_args_list) == len(requests)
225 if credential_data:
226 cls.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
227 # Verify token exchange request parameters.
228 cls.assert_token_request_kwargs(
229 request.call_args_list[token_request_index].kwargs,
230 token_headers,
231 token_request_data,
232 token_url,
233 )
234 # Verify service account impersonation request parameters if the request
235 # is processed.
236 if service_account_impersonation_url:
237 cls.assert_impersonation_request_kwargs(
238 request.call_args_list[impersonation_request_index].kwargs,
239 impersonation_headers,
240 impersonation_request_data,
241 service_account_impersonation_url,
242 )
243 assert credentials.token == impersonation_response["accessToken"]
244 else:
245 assert credentials.token == token_response["access_token"]
246 assert credentials.quota_project_id == quota_project_id
247 assert credentials.scopes == scopes
248 assert credentials.default_scopes == default_scopes
249
250 @classmethod
251 def make_credentials(
252 cls,
253 client_id=None,
254 client_secret=None,
255 quota_project_id=None,
256 scopes=None,
257 default_scopes=None,
258 service_account_impersonation_url=None,
259 credential_source=None,
260 ):
261 return identity_pool.Credentials(
262 audience=AUDIENCE,
263 subject_token_type=SUBJECT_TOKEN_TYPE,
264 token_url=TOKEN_URL,
265 service_account_impersonation_url=service_account_impersonation_url,
266 credential_source=credential_source,
267 client_id=client_id,
268 client_secret=client_secret,
269 quota_project_id=quota_project_id,
270 scopes=scopes,
271 default_scopes=default_scopes,
272 )
273
274 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
275 def test_from_info_full_options(self, mock_init):
276 credentials = identity_pool.Credentials.from_info(
277 {
278 "audience": AUDIENCE,
279 "subject_token_type": SUBJECT_TOKEN_TYPE,
280 "token_url": TOKEN_URL,
281 "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
282 "client_id": CLIENT_ID,
283 "client_secret": CLIENT_SECRET,
284 "quota_project_id": QUOTA_PROJECT_ID,
285 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
286 }
287 )
288
289 # Confirm identity_pool.Credentials instantiated with expected attributes.
290 assert isinstance(credentials, identity_pool.Credentials)
291 mock_init.assert_called_once_with(
292 audience=AUDIENCE,
293 subject_token_type=SUBJECT_TOKEN_TYPE,
294 token_url=TOKEN_URL,
295 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
296 client_id=CLIENT_ID,
297 client_secret=CLIENT_SECRET,
298 credential_source=self.CREDENTIAL_SOURCE_TEXT,
299 quota_project_id=QUOTA_PROJECT_ID,
300 )
301
302 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
303 def test_from_info_required_options_only(self, mock_init):
304 credentials = identity_pool.Credentials.from_info(
305 {
306 "audience": AUDIENCE,
307 "subject_token_type": SUBJECT_TOKEN_TYPE,
308 "token_url": TOKEN_URL,
309 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
310 }
311 )
312
313 # Confirm identity_pool.Credentials instantiated with expected attributes.
314 assert isinstance(credentials, identity_pool.Credentials)
315 mock_init.assert_called_once_with(
316 audience=AUDIENCE,
317 subject_token_type=SUBJECT_TOKEN_TYPE,
318 token_url=TOKEN_URL,
319 service_account_impersonation_url=None,
320 client_id=None,
321 client_secret=None,
322 credential_source=self.CREDENTIAL_SOURCE_TEXT,
323 quota_project_id=None,
324 )
325
326 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
327 def test_from_file_full_options(self, mock_init, tmpdir):
328 info = {
329 "audience": AUDIENCE,
330 "subject_token_type": SUBJECT_TOKEN_TYPE,
331 "token_url": TOKEN_URL,
332 "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
333 "client_id": CLIENT_ID,
334 "client_secret": CLIENT_SECRET,
335 "quota_project_id": QUOTA_PROJECT_ID,
336 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
337 }
338 config_file = tmpdir.join("config.json")
339 config_file.write(json.dumps(info))
340 credentials = identity_pool.Credentials.from_file(str(config_file))
341
342 # Confirm identity_pool.Credentials instantiated with expected attributes.
343 assert isinstance(credentials, identity_pool.Credentials)
344 mock_init.assert_called_once_with(
345 audience=AUDIENCE,
346 subject_token_type=SUBJECT_TOKEN_TYPE,
347 token_url=TOKEN_URL,
348 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
349 client_id=CLIENT_ID,
350 client_secret=CLIENT_SECRET,
351 credential_source=self.CREDENTIAL_SOURCE_TEXT,
352 quota_project_id=QUOTA_PROJECT_ID,
353 )
354
355 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
356 def test_from_file_required_options_only(self, mock_init, tmpdir):
357 info = {
358 "audience": AUDIENCE,
359 "subject_token_type": SUBJECT_TOKEN_TYPE,
360 "token_url": TOKEN_URL,
361 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
362 }
363 config_file = tmpdir.join("config.json")
364 config_file.write(json.dumps(info))
365 credentials = identity_pool.Credentials.from_file(str(config_file))
366
367 # Confirm identity_pool.Credentials instantiated with expected attributes.
368 assert isinstance(credentials, identity_pool.Credentials)
369 mock_init.assert_called_once_with(
370 audience=AUDIENCE,
371 subject_token_type=SUBJECT_TOKEN_TYPE,
372 token_url=TOKEN_URL,
373 service_account_impersonation_url=None,
374 client_id=None,
375 client_secret=None,
376 credential_source=self.CREDENTIAL_SOURCE_TEXT,
377 quota_project_id=None,
378 )
379
380 def test_constructor_invalid_options(self):
381 credential_source = {"unsupported": "value"}
382
383 with pytest.raises(ValueError) as excinfo:
384 self.make_credentials(credential_source=credential_source)
385
386 assert excinfo.match(r"Missing credential_source")
387
388 def test_constructor_invalid_options_url_and_file(self):
389 credential_source = {
390 "url": self.CREDENTIAL_URL,
391 "file": SUBJECT_TOKEN_TEXT_FILE,
392 }
393
394 with pytest.raises(ValueError) as excinfo:
395 self.make_credentials(credential_source=credential_source)
396
397 assert excinfo.match(r"Ambiguous credential_source")
398
399 def test_constructor_invalid_options_environment_id(self):
400 credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
401
402 with pytest.raises(ValueError) as excinfo:
403 self.make_credentials(credential_source=credential_source)
404
405 assert excinfo.match(
406 r"Invalid Identity Pool credential_source field 'environment_id'"
407 )
408
409 def test_constructor_invalid_credential_source(self):
410 with pytest.raises(ValueError) as excinfo:
411 self.make_credentials(credential_source="non-dict")
412
413 assert excinfo.match(r"Missing credential_source")
414
415 def test_constructor_invalid_credential_source_format_type(self):
416 credential_source = {"format": {"type": "xml"}}
417
418 with pytest.raises(ValueError) as excinfo:
419 self.make_credentials(credential_source=credential_source)
420
421 assert excinfo.match(r"Invalid credential_source format 'xml'")
422
423 def test_constructor_missing_subject_token_field_name(self):
424 credential_source = {"format": {"type": "json"}}
425
426 with pytest.raises(ValueError) as excinfo:
427 self.make_credentials(credential_source=credential_source)
428
429 assert excinfo.match(
430 r"Missing subject_token_field_name for JSON credential_source format"
431 )
432
433 def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
434 # Provide empty text file.
435 empty_file = tmpdir.join("empty.txt")
436 empty_file.write("")
437 credential_source = {"file": str(empty_file)}
438 credentials = self.make_credentials(credential_source=credential_source)
439
440 with pytest.raises(exceptions.RefreshError) as excinfo:
441 credentials.retrieve_subject_token(None)
442
443 assert excinfo.match(r"Missing subject_token in the credential_source file")
444
445 def test_retrieve_subject_token_text_file(self):
446 credentials = self.make_credentials(
447 credential_source=self.CREDENTIAL_SOURCE_TEXT
448 )
449
450 subject_token = credentials.retrieve_subject_token(None)
451
452 assert subject_token == TEXT_FILE_SUBJECT_TOKEN
453
454 def test_retrieve_subject_token_json_file(self):
455 credentials = self.make_credentials(
456 credential_source=self.CREDENTIAL_SOURCE_JSON
457 )
458
459 subject_token = credentials.retrieve_subject_token(None)
460
461 assert subject_token == JSON_FILE_SUBJECT_TOKEN
462
463 def test_retrieve_subject_token_json_file_invalid_field_name(self):
464 credential_source = {
465 "file": SUBJECT_TOKEN_JSON_FILE,
466 "format": {"type": "json", "subject_token_field_name": "not_found"},
467 }
468 credentials = self.make_credentials(credential_source=credential_source)
469
470 with pytest.raises(exceptions.RefreshError) as excinfo:
471 credentials.retrieve_subject_token(None)
472
473 assert excinfo.match(
474 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
475 SUBJECT_TOKEN_JSON_FILE, "not_found"
476 )
477 )
478
479 def test_retrieve_subject_token_invalid_json(self, tmpdir):
480 # Provide JSON file. This should result in JSON parsing error.
481 invalid_json_file = tmpdir.join("invalid.json")
482 invalid_json_file.write("{")
483 credential_source = {
484 "file": str(invalid_json_file),
485 "format": {"type": "json", "subject_token_field_name": "access_token"},
486 }
487 credentials = self.make_credentials(credential_source=credential_source)
488
489 with pytest.raises(exceptions.RefreshError) as excinfo:
490 credentials.retrieve_subject_token(None)
491
492 assert excinfo.match(
493 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
494 str(invalid_json_file), "access_token"
495 )
496 )
497
498 def test_retrieve_subject_token_file_not_found(self):
499 credential_source = {"file": "./not_found.txt"}
500 credentials = self.make_credentials(credential_source=credential_source)
501
502 with pytest.raises(exceptions.RefreshError) as excinfo:
503 credentials.retrieve_subject_token(None)
504
505 assert excinfo.match(r"File './not_found.txt' was not found")
506
507 def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
508 self
509 ):
510 credentials = self.make_credentials(
511 client_id=CLIENT_ID,
512 client_secret=CLIENT_SECRET,
513 # Test with text format type.
514 credential_source=self.CREDENTIAL_SOURCE_TEXT,
515 scopes=SCOPES,
516 # Default scopes should be ignored.
517 default_scopes=["ignored"],
518 )
519
520 self.assert_underlying_credentials_refresh(
521 credentials=credentials,
522 audience=AUDIENCE,
523 subject_token=TEXT_FILE_SUBJECT_TOKEN,
524 subject_token_type=SUBJECT_TOKEN_TYPE,
525 token_url=TOKEN_URL,
526 service_account_impersonation_url=None,
527 basic_auth_encoding=BASIC_AUTH_ENCODING,
528 quota_project_id=None,
529 used_scopes=SCOPES,
530 scopes=SCOPES,
531 default_scopes=["ignored"],
532 )
533
534 def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
535 credentials = self.make_credentials(
536 client_id=CLIENT_ID,
537 client_secret=CLIENT_SECRET,
538 # Test with text format type.
539 credential_source=self.CREDENTIAL_SOURCE_TEXT,
540 scopes=None,
541 # Default scopes should be used since user specified scopes are none.
542 default_scopes=SCOPES,
543 )
544
545 self.assert_underlying_credentials_refresh(
546 credentials=credentials,
547 audience=AUDIENCE,
548 subject_token=TEXT_FILE_SUBJECT_TOKEN,
549 subject_token_type=SUBJECT_TOKEN_TYPE,
550 token_url=TOKEN_URL,
551 service_account_impersonation_url=None,
552 basic_auth_encoding=BASIC_AUTH_ENCODING,
553 quota_project_id=None,
554 used_scopes=SCOPES,
555 scopes=None,
556 default_scopes=SCOPES,
557 )
558
559 def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self):
560 # Initialize credentials with service account impersonation and basic auth.
561 credentials = self.make_credentials(
562 # Test with text format type.
563 credential_source=self.CREDENTIAL_SOURCE_TEXT,
564 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
565 scopes=SCOPES,
566 # Default scopes should be ignored.
567 default_scopes=["ignored"],
568 )
569
570 self.assert_underlying_credentials_refresh(
571 credentials=credentials,
572 audience=AUDIENCE,
573 subject_token=TEXT_FILE_SUBJECT_TOKEN,
574 subject_token_type=SUBJECT_TOKEN_TYPE,
575 token_url=TOKEN_URL,
576 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
577 basic_auth_encoding=None,
578 quota_project_id=None,
579 used_scopes=SCOPES,
580 scopes=SCOPES,
581 default_scopes=["ignored"],
582 )
583
584 def test_refresh_text_file_success_with_impersonation_use_default_scopes(self):
585 # Initialize credentials with service account impersonation, basic auth
586 # and default scopes (no user scopes).
587 credentials = self.make_credentials(
588 # Test with text format type.
589 credential_source=self.CREDENTIAL_SOURCE_TEXT,
590 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
591 scopes=None,
592 # Default scopes should be used since user specified scopes are none.
593 default_scopes=SCOPES,
594 )
595
596 self.assert_underlying_credentials_refresh(
597 credentials=credentials,
598 audience=AUDIENCE,
599 subject_token=TEXT_FILE_SUBJECT_TOKEN,
600 subject_token_type=SUBJECT_TOKEN_TYPE,
601 token_url=TOKEN_URL,
602 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
603 basic_auth_encoding=None,
604 quota_project_id=None,
605 used_scopes=SCOPES,
606 scopes=None,
607 default_scopes=SCOPES,
608 )
609
610 def test_refresh_json_file_success_without_impersonation(self):
611 credentials = self.make_credentials(
612 client_id=CLIENT_ID,
613 client_secret=CLIENT_SECRET,
614 # Test with JSON format type.
615 credential_source=self.CREDENTIAL_SOURCE_JSON,
616 scopes=SCOPES,
617 )
618
619 self.assert_underlying_credentials_refresh(
620 credentials=credentials,
621 audience=AUDIENCE,
622 subject_token=JSON_FILE_SUBJECT_TOKEN,
623 subject_token_type=SUBJECT_TOKEN_TYPE,
624 token_url=TOKEN_URL,
625 service_account_impersonation_url=None,
626 basic_auth_encoding=BASIC_AUTH_ENCODING,
627 quota_project_id=None,
628 used_scopes=SCOPES,
629 scopes=SCOPES,
630 default_scopes=None,
631 )
632
633 def test_refresh_json_file_success_with_impersonation(self):
634 # Initialize credentials with service account impersonation and basic auth.
635 credentials = self.make_credentials(
636 # Test with JSON format type.
637 credential_source=self.CREDENTIAL_SOURCE_JSON,
638 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
639 scopes=SCOPES,
640 )
641
642 self.assert_underlying_credentials_refresh(
643 credentials=credentials,
644 audience=AUDIENCE,
645 subject_token=JSON_FILE_SUBJECT_TOKEN,
646 subject_token_type=SUBJECT_TOKEN_TYPE,
647 token_url=TOKEN_URL,
648 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
649 basic_auth_encoding=None,
650 quota_project_id=None,
651 used_scopes=SCOPES,
652 scopes=SCOPES,
653 default_scopes=None,
654 )
655
656 def test_refresh_with_retrieve_subject_token_error(self):
657 credential_source = {
658 "file": SUBJECT_TOKEN_JSON_FILE,
659 "format": {"type": "json", "subject_token_field_name": "not_found"},
660 }
661 credentials = self.make_credentials(credential_source=credential_source)
662
663 with pytest.raises(exceptions.RefreshError) as excinfo:
664 credentials.refresh(None)
665
666 assert excinfo.match(
667 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
668 SUBJECT_TOKEN_JSON_FILE, "not_found"
669 )
670 )
671
672 def test_retrieve_subject_token_from_url(self):
673 credentials = self.make_credentials(
674 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
675 )
676 request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
677 subject_token = credentials.retrieve_subject_token(request)
678
679 assert subject_token == TEXT_FILE_SUBJECT_TOKEN
680 self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
681
682 def test_retrieve_subject_token_from_url_with_headers(self):
683 credentials = self.make_credentials(
684 credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}}
685 )
686 request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
687 subject_token = credentials.retrieve_subject_token(request)
688
689 assert subject_token == TEXT_FILE_SUBJECT_TOKEN
690 self.assert_credential_request_kwargs(
691 request.call_args_list[0].kwargs, {"foo": "bar"}
692 )
693
694 def test_retrieve_subject_token_from_url_json(self):
695 credentials = self.make_credentials(
696 credential_source=self.CREDENTIAL_SOURCE_JSON_URL
697 )
698 request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
699 subject_token = credentials.retrieve_subject_token(request)
700
701 assert subject_token == JSON_FILE_SUBJECT_TOKEN
702 self.assert_credential_request_kwargs(request.call_args_list[0].kwargs, None)
703
704 def test_retrieve_subject_token_from_url_json_with_headers(self):
705 credentials = self.make_credentials(
706 credential_source={
707 "url": self.CREDENTIAL_URL,
708 "format": {"type": "json", "subject_token_field_name": "access_token"},
709 "headers": {"foo": "bar"},
710 }
711 )
712 request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
713 subject_token = credentials.retrieve_subject_token(request)
714
715 assert subject_token == JSON_FILE_SUBJECT_TOKEN
716 self.assert_credential_request_kwargs(
717 request.call_args_list[0].kwargs, {"foo": "bar"}
718 )
719
720 def test_retrieve_subject_token_from_url_not_found(self):
721 credentials = self.make_credentials(
722 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
723 )
724 with pytest.raises(exceptions.RefreshError) as excinfo:
725 credentials.retrieve_subject_token(
726 self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT)
727 )
728
729 assert excinfo.match("Unable to retrieve Identity Pool subject token")
730
731 def test_retrieve_subject_token_from_url_json_invalid_field(self):
732 credential_source = {
733 "url": self.CREDENTIAL_URL,
734 "format": {"type": "json", "subject_token_field_name": "not_found"},
735 }
736 credentials = self.make_credentials(credential_source=credential_source)
737
738 with pytest.raises(exceptions.RefreshError) as excinfo:
739 credentials.retrieve_subject_token(
740 self.make_mock_request(token_data=JSON_FILE_CONTENT)
741 )
742
743 assert excinfo.match(
744 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
745 self.CREDENTIAL_URL, "not_found"
746 )
747 )
748
749 def test_retrieve_subject_token_from_url_json_invalid_format(self):
750 credentials = self.make_credentials(
751 credential_source=self.CREDENTIAL_SOURCE_JSON_URL
752 )
753
754 with pytest.raises(exceptions.RefreshError) as excinfo:
755 credentials.retrieve_subject_token(self.make_mock_request(token_data="{"))
756
757 assert excinfo.match(
758 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
759 self.CREDENTIAL_URL, "access_token"
760 )
761 )
762
763 def test_refresh_text_file_success_without_impersonation_url(self):
764 credentials = self.make_credentials(
765 client_id=CLIENT_ID,
766 client_secret=CLIENT_SECRET,
767 # Test with text format type.
768 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
769 scopes=SCOPES,
770 )
771
772 self.assert_underlying_credentials_refresh(
773 credentials=credentials,
774 audience=AUDIENCE,
775 subject_token=TEXT_FILE_SUBJECT_TOKEN,
776 subject_token_type=SUBJECT_TOKEN_TYPE,
777 token_url=TOKEN_URL,
778 service_account_impersonation_url=None,
779 basic_auth_encoding=BASIC_AUTH_ENCODING,
780 quota_project_id=None,
781 used_scopes=SCOPES,
782 scopes=SCOPES,
783 default_scopes=None,
784 credential_data=TEXT_FILE_SUBJECT_TOKEN,
785 )
786
787 def test_refresh_text_file_success_with_impersonation_url(self):
788 # Initialize credentials with service account impersonation and basic auth.
789 credentials = self.make_credentials(
790 # Test with text format type.
791 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
792 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
793 scopes=SCOPES,
794 )
795
796 self.assert_underlying_credentials_refresh(
797 credentials=credentials,
798 audience=AUDIENCE,
799 subject_token=TEXT_FILE_SUBJECT_TOKEN,
800 subject_token_type=SUBJECT_TOKEN_TYPE,
801 token_url=TOKEN_URL,
802 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
803 basic_auth_encoding=None,
804 quota_project_id=None,
805 used_scopes=SCOPES,
806 scopes=SCOPES,
807 default_scopes=None,
808 credential_data=TEXT_FILE_SUBJECT_TOKEN,
809 )
810
811 def test_refresh_json_file_success_without_impersonation_url(self):
812 credentials = self.make_credentials(
813 client_id=CLIENT_ID,
814 client_secret=CLIENT_SECRET,
815 # Test with JSON format type.
816 credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
817 scopes=SCOPES,
818 )
819
820 self.assert_underlying_credentials_refresh(
821 credentials=credentials,
822 audience=AUDIENCE,
823 subject_token=JSON_FILE_SUBJECT_TOKEN,
824 subject_token_type=SUBJECT_TOKEN_TYPE,
825 token_url=TOKEN_URL,
826 service_account_impersonation_url=None,
827 basic_auth_encoding=BASIC_AUTH_ENCODING,
828 quota_project_id=None,
829 used_scopes=SCOPES,
830 scopes=SCOPES,
831 default_scopes=None,
832 credential_data=JSON_FILE_CONTENT,
833 )
834
835 def test_refresh_json_file_success_with_impersonation_url(self):
836 # Initialize credentials with service account impersonation and basic auth.
837 credentials = self.make_credentials(
838 # Test with JSON format type.
839 credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
840 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
841 scopes=SCOPES,
842 )
843
844 self.assert_underlying_credentials_refresh(
845 credentials=credentials,
846 audience=AUDIENCE,
847 subject_token=JSON_FILE_SUBJECT_TOKEN,
848 subject_token_type=SUBJECT_TOKEN_TYPE,
849 token_url=TOKEN_URL,
850 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
851 basic_auth_encoding=None,
852 quota_project_id=None,
853 used_scopes=SCOPES,
854 scopes=SCOPES,
855 default_scopes=None,
856 credential_data=JSON_FILE_CONTENT,
857 )
858
859 def test_refresh_with_retrieve_subject_token_error_url(self):
860 credential_source = {
861 "url": self.CREDENTIAL_URL,
862 "format": {"type": "json", "subject_token_field_name": "not_found"},
863 }
864 credentials = self.make_credentials(credential_source=credential_source)
865
866 with pytest.raises(exceptions.RefreshError) as excinfo:
867 credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
868
869 assert excinfo.match(
870 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
871 self.CREDENTIAL_URL, "not_found"
872 )
873 )