blob: b529268fb0ec20aa7c3eda2f39c377b77fa47813 [file] [log] [blame]
bojeil-googled4d7f382021-02-16 12:33:20 -08001# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17import os
18
19import mock
20import pytest
21from six.moves import http_client
22from six.moves import urllib
23
24from google.auth import _helpers
25from google.auth import exceptions
26from google.auth import identity_pool
27from google.auth import transport
28
29
30CLIENT_ID = "username"
31CLIENT_SECRET = "password"
32# Base64 encoding of "username:password".
33BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
34SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
35SERVICE_ACCOUNT_IMPERSONATION_URL = (
36 "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
37 + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
38)
39QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
40SCOPES = ["scope1", "scope2"]
41DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
42SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
43SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
44SUBJECT_TOKEN_FIELD_NAME = "access_token"
45
46with open(SUBJECT_TOKEN_TEXT_FILE) as fh:
47 TEXT_FILE_SUBJECT_TOKEN = fh.read()
48
49with open(SUBJECT_TOKEN_JSON_FILE) as fh:
50 JSON_FILE_CONTENT = json.load(fh)
51 JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME)
52
53TOKEN_URL = "https://sts.googleapis.com/v1/token"
54SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
55AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
56
57
58class TestCredentials(object):
59 CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE}
60 CREDENTIAL_SOURCE_JSON = {
61 "file": SUBJECT_TOKEN_JSON_FILE,
62 "format": {"type": "json", "subject_token_field_name": "access_token"},
63 }
64 CREDENTIAL_URL = "http://fakeurl.com"
65 CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL}
66 CREDENTIAL_SOURCE_JSON_URL = {
67 "url": CREDENTIAL_URL,
68 "format": {"type": "json", "subject_token_field_name": "access_token"},
69 }
70 SUCCESS_RESPONSE = {
71 "access_token": "ACCESS_TOKEN",
72 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
73 "token_type": "Bearer",
74 "expires_in": 3600,
75 "scope": " ".join(SCOPES),
76 }
77
78 @classmethod
79 def make_mock_response(cls, status, data):
80 response = mock.create_autospec(transport.Response, instance=True)
81 response.status = status
82 if isinstance(data, dict):
83 response.data = json.dumps(data).encode("utf-8")
84 else:
85 response.data = data
86 return response
87
88 @classmethod
89 def make_mock_request(
90 cls, token_status=http_client.OK, token_data=None, *extra_requests
91 ):
92 responses = []
93 responses.append(cls.make_mock_response(token_status, token_data))
94
95 while len(extra_requests) > 0:
96 # If service account impersonation is requested, mock the expected response.
97 status, data, extra_requests = (
98 extra_requests[0],
99 extra_requests[1],
100 extra_requests[2:],
101 )
102 responses.append(cls.make_mock_response(status, data))
103
104 request = mock.create_autospec(transport.Request)
105 request.side_effect = responses
106
107 return request
108
109 @classmethod
110 def assert_credential_request_kwargs(
111 cls, request_kwargs, headers, url=CREDENTIAL_URL
112 ):
113 assert request_kwargs["url"] == url
114 assert request_kwargs["method"] == "GET"
115 assert request_kwargs["headers"] == headers
116 assert request_kwargs.get("body", None) is None
117
118 @classmethod
119 def assert_token_request_kwargs(
120 cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
121 ):
122 assert request_kwargs["url"] == token_url
123 assert request_kwargs["method"] == "POST"
124 assert request_kwargs["headers"] == headers
125 assert request_kwargs["body"] is not None
126 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
127 assert len(body_tuples) == len(request_data.keys())
128 for (k, v) in body_tuples:
129 assert v.decode("utf-8") == request_data[k.decode("utf-8")]
130
131 @classmethod
132 def assert_impersonation_request_kwargs(
133 cls,
134 request_kwargs,
135 headers,
136 request_data,
137 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
138 ):
139 assert request_kwargs["url"] == service_account_impersonation_url
140 assert request_kwargs["method"] == "POST"
141 assert request_kwargs["headers"] == headers
142 assert request_kwargs["body"] is not None
143 body_json = json.loads(request_kwargs["body"].decode("utf-8"))
144 assert body_json == request_data
145
146 @classmethod
147 def assert_underlying_credentials_refresh(
148 cls,
149 credentials,
150 audience,
151 subject_token,
152 subject_token_type,
153 token_url,
154 service_account_impersonation_url=None,
155 basic_auth_encoding=None,
156 quota_project_id=None,
157 used_scopes=None,
158 credential_data=None,
159 scopes=None,
160 default_scopes=None,
161 ):
162 """Utility to assert that a credentials are initialized with the expected
163 attributes by calling refresh functionality and confirming response matches
164 expected one and that the underlying requests were populated with the
165 expected parameters.
166 """
167 # STS token exchange request/response.
168 token_response = cls.SUCCESS_RESPONSE.copy()
169 token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
170 if basic_auth_encoding:
171 token_headers["Authorization"] = "Basic " + basic_auth_encoding
172
173 if service_account_impersonation_url:
174 token_scopes = "https://www.googleapis.com/auth/iam"
175 else:
176 token_scopes = " ".join(used_scopes or [])
177
178 token_request_data = {
179 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
180 "audience": audience,
181 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
182 "scope": token_scopes,
183 "subject_token": subject_token,
184 "subject_token_type": subject_token_type,
185 }
186
187 if service_account_impersonation_url:
188 # Service account impersonation request/response.
189 expire_time = (
190 _helpers.utcnow().replace(microsecond=0)
191 + datetime.timedelta(seconds=3600)
192 ).isoformat("T") + "Z"
193 impersonation_response = {
194 "accessToken": "SA_ACCESS_TOKEN",
195 "expireTime": expire_time,
196 }
197 impersonation_headers = {
198 "Content-Type": "application/json",
199 "authorization": "Bearer {}".format(token_response["access_token"]),
200 }
201 impersonation_request_data = {
202 "delegates": None,
203 "scope": used_scopes,
204 "lifetime": "3600s",
205 }
206
207 # Initialize mock request to handle token retrieval, token exchange and
208 # service account impersonation request.
209 requests = []
210 if credential_data:
211 requests.append((http_client.OK, credential_data))
212
213 token_request_index = len(requests)
214 requests.append((http_client.OK, token_response))
215
216 if service_account_impersonation_url:
217 impersonation_request_index = len(requests)
218 requests.append((http_client.OK, impersonation_response))
219
220 request = cls.make_mock_request(*[el for req in requests for el in req])
221
222 credentials.refresh(request)
223
224 assert len(request.call_args_list) == len(requests)
225 if credential_data:
arithmetic1728d80c85f2021-03-08 13:35:44 -0800226 cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
bojeil-googled4d7f382021-02-16 12:33:20 -0800227 # Verify token exchange request parameters.
228 cls.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800229 request.call_args_list[token_request_index][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800230 token_headers,
231 token_request_data,
232 token_url,
233 )
234 # Verify service account impersonation request parameters if the request
235 # is processed.
236 if service_account_impersonation_url:
237 cls.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800238 request.call_args_list[impersonation_request_index][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800239 impersonation_headers,
240 impersonation_request_data,
241 service_account_impersonation_url,
242 )
243 assert credentials.token == impersonation_response["accessToken"]
244 else:
245 assert credentials.token == token_response["access_token"]
246 assert credentials.quota_project_id == quota_project_id
247 assert credentials.scopes == scopes
248 assert credentials.default_scopes == default_scopes
249
250 @classmethod
251 def make_credentials(
252 cls,
253 client_id=None,
254 client_secret=None,
255 quota_project_id=None,
256 scopes=None,
257 default_scopes=None,
258 service_account_impersonation_url=None,
259 credential_source=None,
260 ):
261 return identity_pool.Credentials(
262 audience=AUDIENCE,
263 subject_token_type=SUBJECT_TOKEN_TYPE,
264 token_url=TOKEN_URL,
265 service_account_impersonation_url=service_account_impersonation_url,
266 credential_source=credential_source,
267 client_id=client_id,
268 client_secret=client_secret,
269 quota_project_id=quota_project_id,
270 scopes=scopes,
271 default_scopes=default_scopes,
272 )
273
274 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
275 def test_from_info_full_options(self, mock_init):
276 credentials = identity_pool.Credentials.from_info(
277 {
278 "audience": AUDIENCE,
279 "subject_token_type": SUBJECT_TOKEN_TYPE,
280 "token_url": TOKEN_URL,
281 "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
282 "client_id": CLIENT_ID,
283 "client_secret": CLIENT_SECRET,
284 "quota_project_id": QUOTA_PROJECT_ID,
285 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
286 }
287 )
288
289 # Confirm identity_pool.Credentials instantiated with expected attributes.
290 assert isinstance(credentials, identity_pool.Credentials)
291 mock_init.assert_called_once_with(
292 audience=AUDIENCE,
293 subject_token_type=SUBJECT_TOKEN_TYPE,
294 token_url=TOKEN_URL,
295 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
296 client_id=CLIENT_ID,
297 client_secret=CLIENT_SECRET,
298 credential_source=self.CREDENTIAL_SOURCE_TEXT,
299 quota_project_id=QUOTA_PROJECT_ID,
300 )
301
302 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
303 def test_from_info_required_options_only(self, mock_init):
304 credentials = identity_pool.Credentials.from_info(
305 {
306 "audience": AUDIENCE,
307 "subject_token_type": SUBJECT_TOKEN_TYPE,
308 "token_url": TOKEN_URL,
309 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
310 }
311 )
312
313 # Confirm identity_pool.Credentials instantiated with expected attributes.
314 assert isinstance(credentials, identity_pool.Credentials)
315 mock_init.assert_called_once_with(
316 audience=AUDIENCE,
317 subject_token_type=SUBJECT_TOKEN_TYPE,
318 token_url=TOKEN_URL,
319 service_account_impersonation_url=None,
320 client_id=None,
321 client_secret=None,
322 credential_source=self.CREDENTIAL_SOURCE_TEXT,
323 quota_project_id=None,
324 )
325
326 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
327 def test_from_file_full_options(self, mock_init, tmpdir):
328 info = {
329 "audience": AUDIENCE,
330 "subject_token_type": SUBJECT_TOKEN_TYPE,
331 "token_url": TOKEN_URL,
332 "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
333 "client_id": CLIENT_ID,
334 "client_secret": CLIENT_SECRET,
335 "quota_project_id": QUOTA_PROJECT_ID,
336 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
337 }
338 config_file = tmpdir.join("config.json")
339 config_file.write(json.dumps(info))
340 credentials = identity_pool.Credentials.from_file(str(config_file))
341
342 # Confirm identity_pool.Credentials instantiated with expected attributes.
343 assert isinstance(credentials, identity_pool.Credentials)
344 mock_init.assert_called_once_with(
345 audience=AUDIENCE,
346 subject_token_type=SUBJECT_TOKEN_TYPE,
347 token_url=TOKEN_URL,
348 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
349 client_id=CLIENT_ID,
350 client_secret=CLIENT_SECRET,
351 credential_source=self.CREDENTIAL_SOURCE_TEXT,
352 quota_project_id=QUOTA_PROJECT_ID,
353 )
354
355 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
356 def test_from_file_required_options_only(self, mock_init, tmpdir):
357 info = {
358 "audience": AUDIENCE,
359 "subject_token_type": SUBJECT_TOKEN_TYPE,
360 "token_url": TOKEN_URL,
361 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
362 }
363 config_file = tmpdir.join("config.json")
364 config_file.write(json.dumps(info))
365 credentials = identity_pool.Credentials.from_file(str(config_file))
366
367 # Confirm identity_pool.Credentials instantiated with expected attributes.
368 assert isinstance(credentials, identity_pool.Credentials)
369 mock_init.assert_called_once_with(
370 audience=AUDIENCE,
371 subject_token_type=SUBJECT_TOKEN_TYPE,
372 token_url=TOKEN_URL,
373 service_account_impersonation_url=None,
374 client_id=None,
375 client_secret=None,
376 credential_source=self.CREDENTIAL_SOURCE_TEXT,
377 quota_project_id=None,
378 )
379
380 def test_constructor_invalid_options(self):
381 credential_source = {"unsupported": "value"}
382
383 with pytest.raises(ValueError) as excinfo:
384 self.make_credentials(credential_source=credential_source)
385
386 assert excinfo.match(r"Missing credential_source")
387
388 def test_constructor_invalid_options_url_and_file(self):
389 credential_source = {
390 "url": self.CREDENTIAL_URL,
391 "file": SUBJECT_TOKEN_TEXT_FILE,
392 }
393
394 with pytest.raises(ValueError) as excinfo:
395 self.make_credentials(credential_source=credential_source)
396
397 assert excinfo.match(r"Ambiguous credential_source")
398
399 def test_constructor_invalid_options_environment_id(self):
400 credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
401
402 with pytest.raises(ValueError) as excinfo:
403 self.make_credentials(credential_source=credential_source)
404
405 assert excinfo.match(
406 r"Invalid Identity Pool credential_source field 'environment_id'"
407 )
408
409 def test_constructor_invalid_credential_source(self):
410 with pytest.raises(ValueError) as excinfo:
411 self.make_credentials(credential_source="non-dict")
412
413 assert excinfo.match(r"Missing credential_source")
414
415 def test_constructor_invalid_credential_source_format_type(self):
416 credential_source = {"format": {"type": "xml"}}
417
418 with pytest.raises(ValueError) as excinfo:
419 self.make_credentials(credential_source=credential_source)
420
421 assert excinfo.match(r"Invalid credential_source format 'xml'")
422
423 def test_constructor_missing_subject_token_field_name(self):
424 credential_source = {"format": {"type": "json"}}
425
426 with pytest.raises(ValueError) as excinfo:
427 self.make_credentials(credential_source=credential_source)
428
429 assert excinfo.match(
430 r"Missing subject_token_field_name for JSON credential_source format"
431 )
432
bojeil-googlef97499c2021-06-09 07:58:25 -0700433 def test_info_with_file_credential_source(self):
434 credentials = self.make_credentials(
435 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
436 )
437
438 assert credentials.info == {
439 "type": "external_account",
440 "audience": AUDIENCE,
441 "subject_token_type": SUBJECT_TOKEN_TYPE,
442 "token_url": TOKEN_URL,
443 "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
444 }
445
446 def test_info_with_url_credential_source(self):
447 credentials = self.make_credentials(
448 credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
449 )
450
451 assert credentials.info == {
452 "type": "external_account",
453 "audience": AUDIENCE,
454 "subject_token_type": SUBJECT_TOKEN_TYPE,
455 "token_url": TOKEN_URL,
456 "credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
457 }
458
bojeil-googled4d7f382021-02-16 12:33:20 -0800459 def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
460 # Provide empty text file.
461 empty_file = tmpdir.join("empty.txt")
462 empty_file.write("")
463 credential_source = {"file": str(empty_file)}
464 credentials = self.make_credentials(credential_source=credential_source)
465
466 with pytest.raises(exceptions.RefreshError) as excinfo:
467 credentials.retrieve_subject_token(None)
468
469 assert excinfo.match(r"Missing subject_token in the credential_source file")
470
471 def test_retrieve_subject_token_text_file(self):
472 credentials = self.make_credentials(
473 credential_source=self.CREDENTIAL_SOURCE_TEXT
474 )
475
476 subject_token = credentials.retrieve_subject_token(None)
477
478 assert subject_token == TEXT_FILE_SUBJECT_TOKEN
479
480 def test_retrieve_subject_token_json_file(self):
481 credentials = self.make_credentials(
482 credential_source=self.CREDENTIAL_SOURCE_JSON
483 )
484
485 subject_token = credentials.retrieve_subject_token(None)
486
487 assert subject_token == JSON_FILE_SUBJECT_TOKEN
488
489 def test_retrieve_subject_token_json_file_invalid_field_name(self):
490 credential_source = {
491 "file": SUBJECT_TOKEN_JSON_FILE,
492 "format": {"type": "json", "subject_token_field_name": "not_found"},
493 }
494 credentials = self.make_credentials(credential_source=credential_source)
495
496 with pytest.raises(exceptions.RefreshError) as excinfo:
497 credentials.retrieve_subject_token(None)
498
499 assert excinfo.match(
500 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
501 SUBJECT_TOKEN_JSON_FILE, "not_found"
502 )
503 )
504
505 def test_retrieve_subject_token_invalid_json(self, tmpdir):
506 # Provide JSON file. This should result in JSON parsing error.
507 invalid_json_file = tmpdir.join("invalid.json")
508 invalid_json_file.write("{")
509 credential_source = {
510 "file": str(invalid_json_file),
511 "format": {"type": "json", "subject_token_field_name": "access_token"},
512 }
513 credentials = self.make_credentials(credential_source=credential_source)
514
515 with pytest.raises(exceptions.RefreshError) as excinfo:
516 credentials.retrieve_subject_token(None)
517
518 assert excinfo.match(
519 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
520 str(invalid_json_file), "access_token"
521 )
522 )
523
524 def test_retrieve_subject_token_file_not_found(self):
525 credential_source = {"file": "./not_found.txt"}
526 credentials = self.make_credentials(credential_source=credential_source)
527
528 with pytest.raises(exceptions.RefreshError) as excinfo:
529 credentials.retrieve_subject_token(None)
530
531 assert excinfo.match(r"File './not_found.txt' was not found")
532
533 def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800534 self,
bojeil-googled4d7f382021-02-16 12:33:20 -0800535 ):
536 credentials = self.make_credentials(
537 client_id=CLIENT_ID,
538 client_secret=CLIENT_SECRET,
539 # Test with text format type.
540 credential_source=self.CREDENTIAL_SOURCE_TEXT,
541 scopes=SCOPES,
542 # Default scopes should be ignored.
543 default_scopes=["ignored"],
544 )
545
546 self.assert_underlying_credentials_refresh(
547 credentials=credentials,
548 audience=AUDIENCE,
549 subject_token=TEXT_FILE_SUBJECT_TOKEN,
550 subject_token_type=SUBJECT_TOKEN_TYPE,
551 token_url=TOKEN_URL,
552 service_account_impersonation_url=None,
553 basic_auth_encoding=BASIC_AUTH_ENCODING,
554 quota_project_id=None,
555 used_scopes=SCOPES,
556 scopes=SCOPES,
557 default_scopes=["ignored"],
558 )
559
560 def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
561 credentials = self.make_credentials(
562 client_id=CLIENT_ID,
563 client_secret=CLIENT_SECRET,
564 # Test with text format type.
565 credential_source=self.CREDENTIAL_SOURCE_TEXT,
566 scopes=None,
567 # Default scopes should be used since user specified scopes are none.
568 default_scopes=SCOPES,
569 )
570
571 self.assert_underlying_credentials_refresh(
572 credentials=credentials,
573 audience=AUDIENCE,
574 subject_token=TEXT_FILE_SUBJECT_TOKEN,
575 subject_token_type=SUBJECT_TOKEN_TYPE,
576 token_url=TOKEN_URL,
577 service_account_impersonation_url=None,
578 basic_auth_encoding=BASIC_AUTH_ENCODING,
579 quota_project_id=None,
580 used_scopes=SCOPES,
581 scopes=None,
582 default_scopes=SCOPES,
583 )
584
585 def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self):
586 # Initialize credentials with service account impersonation and basic auth.
587 credentials = self.make_credentials(
588 # Test with text format type.
589 credential_source=self.CREDENTIAL_SOURCE_TEXT,
590 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
591 scopes=SCOPES,
592 # Default scopes should be ignored.
593 default_scopes=["ignored"],
594 )
595
596 self.assert_underlying_credentials_refresh(
597 credentials=credentials,
598 audience=AUDIENCE,
599 subject_token=TEXT_FILE_SUBJECT_TOKEN,
600 subject_token_type=SUBJECT_TOKEN_TYPE,
601 token_url=TOKEN_URL,
602 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
603 basic_auth_encoding=None,
604 quota_project_id=None,
605 used_scopes=SCOPES,
606 scopes=SCOPES,
607 default_scopes=["ignored"],
608 )
609
610 def test_refresh_text_file_success_with_impersonation_use_default_scopes(self):
611 # Initialize credentials with service account impersonation, basic auth
612 # and default scopes (no user scopes).
613 credentials = self.make_credentials(
614 # Test with text format type.
615 credential_source=self.CREDENTIAL_SOURCE_TEXT,
616 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
617 scopes=None,
618 # Default scopes should be used since user specified scopes are none.
619 default_scopes=SCOPES,
620 )
621
622 self.assert_underlying_credentials_refresh(
623 credentials=credentials,
624 audience=AUDIENCE,
625 subject_token=TEXT_FILE_SUBJECT_TOKEN,
626 subject_token_type=SUBJECT_TOKEN_TYPE,
627 token_url=TOKEN_URL,
628 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
629 basic_auth_encoding=None,
630 quota_project_id=None,
631 used_scopes=SCOPES,
632 scopes=None,
633 default_scopes=SCOPES,
634 )
635
636 def test_refresh_json_file_success_without_impersonation(self):
637 credentials = self.make_credentials(
638 client_id=CLIENT_ID,
639 client_secret=CLIENT_SECRET,
640 # Test with JSON format type.
641 credential_source=self.CREDENTIAL_SOURCE_JSON,
642 scopes=SCOPES,
643 )
644
645 self.assert_underlying_credentials_refresh(
646 credentials=credentials,
647 audience=AUDIENCE,
648 subject_token=JSON_FILE_SUBJECT_TOKEN,
649 subject_token_type=SUBJECT_TOKEN_TYPE,
650 token_url=TOKEN_URL,
651 service_account_impersonation_url=None,
652 basic_auth_encoding=BASIC_AUTH_ENCODING,
653 quota_project_id=None,
654 used_scopes=SCOPES,
655 scopes=SCOPES,
656 default_scopes=None,
657 )
658
659 def test_refresh_json_file_success_with_impersonation(self):
660 # Initialize credentials with service account impersonation and basic auth.
661 credentials = self.make_credentials(
662 # Test with JSON format type.
663 credential_source=self.CREDENTIAL_SOURCE_JSON,
664 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
665 scopes=SCOPES,
666 )
667
668 self.assert_underlying_credentials_refresh(
669 credentials=credentials,
670 audience=AUDIENCE,
671 subject_token=JSON_FILE_SUBJECT_TOKEN,
672 subject_token_type=SUBJECT_TOKEN_TYPE,
673 token_url=TOKEN_URL,
674 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
675 basic_auth_encoding=None,
676 quota_project_id=None,
677 used_scopes=SCOPES,
678 scopes=SCOPES,
679 default_scopes=None,
680 )
681
682 def test_refresh_with_retrieve_subject_token_error(self):
683 credential_source = {
684 "file": SUBJECT_TOKEN_JSON_FILE,
685 "format": {"type": "json", "subject_token_field_name": "not_found"},
686 }
687 credentials = self.make_credentials(credential_source=credential_source)
688
689 with pytest.raises(exceptions.RefreshError) as excinfo:
690 credentials.refresh(None)
691
692 assert excinfo.match(
693 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
694 SUBJECT_TOKEN_JSON_FILE, "not_found"
695 )
696 )
697
698 def test_retrieve_subject_token_from_url(self):
699 credentials = self.make_credentials(
700 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
701 )
702 request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
703 subject_token = credentials.retrieve_subject_token(request)
704
705 assert subject_token == TEXT_FILE_SUBJECT_TOKEN
arithmetic1728d80c85f2021-03-08 13:35:44 -0800706 self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
bojeil-googled4d7f382021-02-16 12:33:20 -0800707
708 def test_retrieve_subject_token_from_url_with_headers(self):
709 credentials = self.make_credentials(
710 credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}}
711 )
712 request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
713 subject_token = credentials.retrieve_subject_token(request)
714
715 assert subject_token == TEXT_FILE_SUBJECT_TOKEN
716 self.assert_credential_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800717 request.call_args_list[0][1], {"foo": "bar"}
bojeil-googled4d7f382021-02-16 12:33:20 -0800718 )
719
720 def test_retrieve_subject_token_from_url_json(self):
721 credentials = self.make_credentials(
722 credential_source=self.CREDENTIAL_SOURCE_JSON_URL
723 )
724 request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
725 subject_token = credentials.retrieve_subject_token(request)
726
727 assert subject_token == JSON_FILE_SUBJECT_TOKEN
arithmetic1728d80c85f2021-03-08 13:35:44 -0800728 self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
bojeil-googled4d7f382021-02-16 12:33:20 -0800729
730 def test_retrieve_subject_token_from_url_json_with_headers(self):
731 credentials = self.make_credentials(
732 credential_source={
733 "url": self.CREDENTIAL_URL,
734 "format": {"type": "json", "subject_token_field_name": "access_token"},
735 "headers": {"foo": "bar"},
736 }
737 )
738 request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
739 subject_token = credentials.retrieve_subject_token(request)
740
741 assert subject_token == JSON_FILE_SUBJECT_TOKEN
742 self.assert_credential_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800743 request.call_args_list[0][1], {"foo": "bar"}
bojeil-googled4d7f382021-02-16 12:33:20 -0800744 )
745
746 def test_retrieve_subject_token_from_url_not_found(self):
747 credentials = self.make_credentials(
748 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
749 )
750 with pytest.raises(exceptions.RefreshError) as excinfo:
751 credentials.retrieve_subject_token(
752 self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT)
753 )
754
755 assert excinfo.match("Unable to retrieve Identity Pool subject token")
756
757 def test_retrieve_subject_token_from_url_json_invalid_field(self):
758 credential_source = {
759 "url": self.CREDENTIAL_URL,
760 "format": {"type": "json", "subject_token_field_name": "not_found"},
761 }
762 credentials = self.make_credentials(credential_source=credential_source)
763
764 with pytest.raises(exceptions.RefreshError) as excinfo:
765 credentials.retrieve_subject_token(
766 self.make_mock_request(token_data=JSON_FILE_CONTENT)
767 )
768
769 assert excinfo.match(
770 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
771 self.CREDENTIAL_URL, "not_found"
772 )
773 )
774
775 def test_retrieve_subject_token_from_url_json_invalid_format(self):
776 credentials = self.make_credentials(
777 credential_source=self.CREDENTIAL_SOURCE_JSON_URL
778 )
779
780 with pytest.raises(exceptions.RefreshError) as excinfo:
781 credentials.retrieve_subject_token(self.make_mock_request(token_data="{"))
782
783 assert excinfo.match(
784 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
785 self.CREDENTIAL_URL, "access_token"
786 )
787 )
788
789 def test_refresh_text_file_success_without_impersonation_url(self):
790 credentials = self.make_credentials(
791 client_id=CLIENT_ID,
792 client_secret=CLIENT_SECRET,
793 # Test with text format type.
794 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
795 scopes=SCOPES,
796 )
797
798 self.assert_underlying_credentials_refresh(
799 credentials=credentials,
800 audience=AUDIENCE,
801 subject_token=TEXT_FILE_SUBJECT_TOKEN,
802 subject_token_type=SUBJECT_TOKEN_TYPE,
803 token_url=TOKEN_URL,
804 service_account_impersonation_url=None,
805 basic_auth_encoding=BASIC_AUTH_ENCODING,
806 quota_project_id=None,
807 used_scopes=SCOPES,
808 scopes=SCOPES,
809 default_scopes=None,
810 credential_data=TEXT_FILE_SUBJECT_TOKEN,
811 )
812
813 def test_refresh_text_file_success_with_impersonation_url(self):
814 # Initialize credentials with service account impersonation and basic auth.
815 credentials = self.make_credentials(
816 # Test with text format type.
817 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
818 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
819 scopes=SCOPES,
820 )
821
822 self.assert_underlying_credentials_refresh(
823 credentials=credentials,
824 audience=AUDIENCE,
825 subject_token=TEXT_FILE_SUBJECT_TOKEN,
826 subject_token_type=SUBJECT_TOKEN_TYPE,
827 token_url=TOKEN_URL,
828 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
829 basic_auth_encoding=None,
830 quota_project_id=None,
831 used_scopes=SCOPES,
832 scopes=SCOPES,
833 default_scopes=None,
834 credential_data=TEXT_FILE_SUBJECT_TOKEN,
835 )
836
837 def test_refresh_json_file_success_without_impersonation_url(self):
838 credentials = self.make_credentials(
839 client_id=CLIENT_ID,
840 client_secret=CLIENT_SECRET,
841 # Test with JSON format type.
842 credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
843 scopes=SCOPES,
844 )
845
846 self.assert_underlying_credentials_refresh(
847 credentials=credentials,
848 audience=AUDIENCE,
849 subject_token=JSON_FILE_SUBJECT_TOKEN,
850 subject_token_type=SUBJECT_TOKEN_TYPE,
851 token_url=TOKEN_URL,
852 service_account_impersonation_url=None,
853 basic_auth_encoding=BASIC_AUTH_ENCODING,
854 quota_project_id=None,
855 used_scopes=SCOPES,
856 scopes=SCOPES,
857 default_scopes=None,
858 credential_data=JSON_FILE_CONTENT,
859 )
860
861 def test_refresh_json_file_success_with_impersonation_url(self):
862 # Initialize credentials with service account impersonation and basic auth.
863 credentials = self.make_credentials(
864 # Test with JSON format type.
865 credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
866 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
867 scopes=SCOPES,
868 )
869
870 self.assert_underlying_credentials_refresh(
871 credentials=credentials,
872 audience=AUDIENCE,
873 subject_token=JSON_FILE_SUBJECT_TOKEN,
874 subject_token_type=SUBJECT_TOKEN_TYPE,
875 token_url=TOKEN_URL,
876 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
877 basic_auth_encoding=None,
878 quota_project_id=None,
879 used_scopes=SCOPES,
880 scopes=SCOPES,
881 default_scopes=None,
882 credential_data=JSON_FILE_CONTENT,
883 )
884
885 def test_refresh_with_retrieve_subject_token_error_url(self):
886 credential_source = {
887 "url": self.CREDENTIAL_URL,
888 "format": {"type": "json", "subject_token_field_name": "not_found"},
889 }
890 credentials = self.make_credentials(credential_source=credential_source)
891
892 with pytest.raises(exceptions.RefreshError) as excinfo:
893 credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
894
895 assert excinfo.match(
896 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
897 self.CREDENTIAL_URL, "not_found"
898 )
899 )