blob: 87e343be49818341598ce46a428ff847edf56abe [file] [log] [blame]
bojeil-googled4d7f382021-02-16 12:33:20 -08001# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17import os
18
19import mock
20import pytest
arithmetic17285bd5ccf2021-10-21 15:25:46 -070021from six.moves import http_client
22from six.moves import urllib
bojeil-googled4d7f382021-02-16 12:33:20 -080023
24from google.auth import _helpers
25from google.auth import exceptions
26from google.auth import identity_pool
27from google.auth import transport
28
29
30CLIENT_ID = "username"
31CLIENT_SECRET = "password"
32# Base64 encoding of "username:password".
33BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
34SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
35SERVICE_ACCOUNT_IMPERSONATION_URL = (
36 "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
37 + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
38)
39QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
40SCOPES = ["scope1", "scope2"]
41DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
42SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
43SUBJECT_TOKEN_JSON_FILE = os.path.join(DATA_DIR, "external_subject_token.json")
44SUBJECT_TOKEN_FIELD_NAME = "access_token"
45
46with open(SUBJECT_TOKEN_TEXT_FILE) as fh:
47 TEXT_FILE_SUBJECT_TOKEN = fh.read()
48
49with open(SUBJECT_TOKEN_JSON_FILE) as fh:
50 JSON_FILE_CONTENT = json.load(fh)
51 JSON_FILE_SUBJECT_TOKEN = JSON_FILE_CONTENT.get(SUBJECT_TOKEN_FIELD_NAME)
52
53TOKEN_URL = "https://sts.googleapis.com/v1/token"
54SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
55AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
bojeil-google993bab22021-09-21 14:00:15 -070056WORKFORCE_AUDIENCE = (
57 "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
58)
59WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
60WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
bojeil-googled4d7f382021-02-16 12:33:20 -080061
62
63class TestCredentials(object):
64 CREDENTIAL_SOURCE_TEXT = {"file": SUBJECT_TOKEN_TEXT_FILE}
65 CREDENTIAL_SOURCE_JSON = {
66 "file": SUBJECT_TOKEN_JSON_FILE,
67 "format": {"type": "json", "subject_token_field_name": "access_token"},
68 }
69 CREDENTIAL_URL = "http://fakeurl.com"
70 CREDENTIAL_SOURCE_TEXT_URL = {"url": CREDENTIAL_URL}
71 CREDENTIAL_SOURCE_JSON_URL = {
72 "url": CREDENTIAL_URL,
73 "format": {"type": "json", "subject_token_field_name": "access_token"},
74 }
75 SUCCESS_RESPONSE = {
76 "access_token": "ACCESS_TOKEN",
77 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
78 "token_type": "Bearer",
79 "expires_in": 3600,
80 "scope": " ".join(SCOPES),
81 }
82
83 @classmethod
84 def make_mock_response(cls, status, data):
85 response = mock.create_autospec(transport.Response, instance=True)
86 response.status = status
87 if isinstance(data, dict):
88 response.data = json.dumps(data).encode("utf-8")
89 else:
90 response.data = data
91 return response
92
93 @classmethod
94 def make_mock_request(
arithmetic17285bd5ccf2021-10-21 15:25:46 -070095 cls, token_status=http_client.OK, token_data=None, *extra_requests
bojeil-googled4d7f382021-02-16 12:33:20 -080096 ):
97 responses = []
98 responses.append(cls.make_mock_response(token_status, token_data))
99
100 while len(extra_requests) > 0:
101 # If service account impersonation is requested, mock the expected response.
102 status, data, extra_requests = (
103 extra_requests[0],
104 extra_requests[1],
105 extra_requests[2:],
106 )
107 responses.append(cls.make_mock_response(status, data))
108
109 request = mock.create_autospec(transport.Request)
110 request.side_effect = responses
111
112 return request
113
114 @classmethod
115 def assert_credential_request_kwargs(
116 cls, request_kwargs, headers, url=CREDENTIAL_URL
117 ):
118 assert request_kwargs["url"] == url
119 assert request_kwargs["method"] == "GET"
120 assert request_kwargs["headers"] == headers
121 assert request_kwargs.get("body", None) is None
122
123 @classmethod
124 def assert_token_request_kwargs(
125 cls, request_kwargs, headers, request_data, token_url=TOKEN_URL
126 ):
127 assert request_kwargs["url"] == token_url
128 assert request_kwargs["method"] == "POST"
129 assert request_kwargs["headers"] == headers
130 assert request_kwargs["body"] is not None
131 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
132 assert len(body_tuples) == len(request_data.keys())
133 for (k, v) in body_tuples:
134 assert v.decode("utf-8") == request_data[k.decode("utf-8")]
135
136 @classmethod
137 def assert_impersonation_request_kwargs(
138 cls,
139 request_kwargs,
140 headers,
141 request_data,
142 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
143 ):
144 assert request_kwargs["url"] == service_account_impersonation_url
145 assert request_kwargs["method"] == "POST"
146 assert request_kwargs["headers"] == headers
147 assert request_kwargs["body"] is not None
148 body_json = json.loads(request_kwargs["body"].decode("utf-8"))
149 assert body_json == request_data
150
151 @classmethod
152 def assert_underlying_credentials_refresh(
153 cls,
154 credentials,
155 audience,
156 subject_token,
157 subject_token_type,
158 token_url,
159 service_account_impersonation_url=None,
160 basic_auth_encoding=None,
161 quota_project_id=None,
162 used_scopes=None,
163 credential_data=None,
164 scopes=None,
165 default_scopes=None,
bojeil-google993bab22021-09-21 14:00:15 -0700166 workforce_pool_user_project=None,
bojeil-googled4d7f382021-02-16 12:33:20 -0800167 ):
168 """Utility to assert that a credentials are initialized with the expected
169 attributes by calling refresh functionality and confirming response matches
170 expected one and that the underlying requests were populated with the
171 expected parameters.
172 """
173 # STS token exchange request/response.
174 token_response = cls.SUCCESS_RESPONSE.copy()
175 token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
176 if basic_auth_encoding:
177 token_headers["Authorization"] = "Basic " + basic_auth_encoding
178
179 if service_account_impersonation_url:
180 token_scopes = "https://www.googleapis.com/auth/iam"
181 else:
182 token_scopes = " ".join(used_scopes or [])
183
184 token_request_data = {
185 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
186 "audience": audience,
187 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
188 "scope": token_scopes,
189 "subject_token": subject_token,
190 "subject_token_type": subject_token_type,
191 }
bojeil-google993bab22021-09-21 14:00:15 -0700192 if workforce_pool_user_project:
193 token_request_data["options"] = urllib.parse.quote(
194 json.dumps({"userProject": workforce_pool_user_project})
195 )
bojeil-googled4d7f382021-02-16 12:33:20 -0800196
197 if service_account_impersonation_url:
198 # Service account impersonation request/response.
199 expire_time = (
200 _helpers.utcnow().replace(microsecond=0)
201 + datetime.timedelta(seconds=3600)
202 ).isoformat("T") + "Z"
203 impersonation_response = {
204 "accessToken": "SA_ACCESS_TOKEN",
205 "expireTime": expire_time,
206 }
207 impersonation_headers = {
208 "Content-Type": "application/json",
209 "authorization": "Bearer {}".format(token_response["access_token"]),
210 }
211 impersonation_request_data = {
212 "delegates": None,
213 "scope": used_scopes,
214 "lifetime": "3600s",
215 }
216
217 # Initialize mock request to handle token retrieval, token exchange and
218 # service account impersonation request.
219 requests = []
220 if credential_data:
arithmetic17285bd5ccf2021-10-21 15:25:46 -0700221 requests.append((http_client.OK, credential_data))
bojeil-googled4d7f382021-02-16 12:33:20 -0800222
223 token_request_index = len(requests)
arithmetic17285bd5ccf2021-10-21 15:25:46 -0700224 requests.append((http_client.OK, token_response))
bojeil-googled4d7f382021-02-16 12:33:20 -0800225
226 if service_account_impersonation_url:
227 impersonation_request_index = len(requests)
arithmetic17285bd5ccf2021-10-21 15:25:46 -0700228 requests.append((http_client.OK, impersonation_response))
bojeil-googled4d7f382021-02-16 12:33:20 -0800229
230 request = cls.make_mock_request(*[el for req in requests for el in req])
231
232 credentials.refresh(request)
233
234 assert len(request.call_args_list) == len(requests)
235 if credential_data:
arithmetic1728d80c85f2021-03-08 13:35:44 -0800236 cls.assert_credential_request_kwargs(request.call_args_list[0][1], None)
bojeil-googled4d7f382021-02-16 12:33:20 -0800237 # Verify token exchange request parameters.
238 cls.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800239 request.call_args_list[token_request_index][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800240 token_headers,
241 token_request_data,
242 token_url,
243 )
244 # Verify service account impersonation request parameters if the request
245 # is processed.
246 if service_account_impersonation_url:
247 cls.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800248 request.call_args_list[impersonation_request_index][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800249 impersonation_headers,
250 impersonation_request_data,
251 service_account_impersonation_url,
252 )
253 assert credentials.token == impersonation_response["accessToken"]
254 else:
255 assert credentials.token == token_response["access_token"]
256 assert credentials.quota_project_id == quota_project_id
257 assert credentials.scopes == scopes
258 assert credentials.default_scopes == default_scopes
259
260 @classmethod
261 def make_credentials(
262 cls,
bojeil-google993bab22021-09-21 14:00:15 -0700263 audience=AUDIENCE,
264 subject_token_type=SUBJECT_TOKEN_TYPE,
bojeil-googled4d7f382021-02-16 12:33:20 -0800265 client_id=None,
266 client_secret=None,
267 quota_project_id=None,
268 scopes=None,
269 default_scopes=None,
270 service_account_impersonation_url=None,
271 credential_source=None,
bojeil-google993bab22021-09-21 14:00:15 -0700272 workforce_pool_user_project=None,
bojeil-googled4d7f382021-02-16 12:33:20 -0800273 ):
274 return identity_pool.Credentials(
bojeil-google993bab22021-09-21 14:00:15 -0700275 audience=audience,
276 subject_token_type=subject_token_type,
bojeil-googled4d7f382021-02-16 12:33:20 -0800277 token_url=TOKEN_URL,
278 service_account_impersonation_url=service_account_impersonation_url,
279 credential_source=credential_source,
280 client_id=client_id,
281 client_secret=client_secret,
282 quota_project_id=quota_project_id,
283 scopes=scopes,
284 default_scopes=default_scopes,
bojeil-google993bab22021-09-21 14:00:15 -0700285 workforce_pool_user_project=workforce_pool_user_project,
bojeil-googled4d7f382021-02-16 12:33:20 -0800286 )
287
288 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
289 def test_from_info_full_options(self, mock_init):
290 credentials = identity_pool.Credentials.from_info(
291 {
292 "audience": AUDIENCE,
293 "subject_token_type": SUBJECT_TOKEN_TYPE,
294 "token_url": TOKEN_URL,
295 "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
296 "client_id": CLIENT_ID,
297 "client_secret": CLIENT_SECRET,
298 "quota_project_id": QUOTA_PROJECT_ID,
299 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
300 }
301 )
302
303 # Confirm identity_pool.Credentials instantiated with expected attributes.
304 assert isinstance(credentials, identity_pool.Credentials)
305 mock_init.assert_called_once_with(
306 audience=AUDIENCE,
307 subject_token_type=SUBJECT_TOKEN_TYPE,
308 token_url=TOKEN_URL,
309 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
310 client_id=CLIENT_ID,
311 client_secret=CLIENT_SECRET,
312 credential_source=self.CREDENTIAL_SOURCE_TEXT,
313 quota_project_id=QUOTA_PROJECT_ID,
bojeil-google993bab22021-09-21 14:00:15 -0700314 workforce_pool_user_project=None,
bojeil-googled4d7f382021-02-16 12:33:20 -0800315 )
316
317 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
318 def test_from_info_required_options_only(self, mock_init):
319 credentials = identity_pool.Credentials.from_info(
320 {
321 "audience": AUDIENCE,
322 "subject_token_type": SUBJECT_TOKEN_TYPE,
323 "token_url": TOKEN_URL,
324 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
325 }
326 )
327
328 # Confirm identity_pool.Credentials instantiated with expected attributes.
329 assert isinstance(credentials, identity_pool.Credentials)
330 mock_init.assert_called_once_with(
331 audience=AUDIENCE,
332 subject_token_type=SUBJECT_TOKEN_TYPE,
333 token_url=TOKEN_URL,
334 service_account_impersonation_url=None,
335 client_id=None,
336 client_secret=None,
337 credential_source=self.CREDENTIAL_SOURCE_TEXT,
338 quota_project_id=None,
bojeil-google993bab22021-09-21 14:00:15 -0700339 workforce_pool_user_project=None,
340 )
341
342 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
343 def test_from_info_workforce_pool(self, mock_init):
344 credentials = identity_pool.Credentials.from_info(
345 {
346 "audience": WORKFORCE_AUDIENCE,
347 "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
348 "token_url": TOKEN_URL,
349 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
350 "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
351 }
352 )
353
354 # Confirm identity_pool.Credentials instantiated with expected attributes.
355 assert isinstance(credentials, identity_pool.Credentials)
356 mock_init.assert_called_once_with(
357 audience=WORKFORCE_AUDIENCE,
358 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
359 token_url=TOKEN_URL,
360 service_account_impersonation_url=None,
361 client_id=None,
362 client_secret=None,
363 credential_source=self.CREDENTIAL_SOURCE_TEXT,
364 quota_project_id=None,
365 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
bojeil-googled4d7f382021-02-16 12:33:20 -0800366 )
367
368 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
369 def test_from_file_full_options(self, mock_init, tmpdir):
370 info = {
371 "audience": AUDIENCE,
372 "subject_token_type": SUBJECT_TOKEN_TYPE,
373 "token_url": TOKEN_URL,
374 "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
375 "client_id": CLIENT_ID,
376 "client_secret": CLIENT_SECRET,
377 "quota_project_id": QUOTA_PROJECT_ID,
378 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
379 }
380 config_file = tmpdir.join("config.json")
381 config_file.write(json.dumps(info))
382 credentials = identity_pool.Credentials.from_file(str(config_file))
383
384 # Confirm identity_pool.Credentials instantiated with expected attributes.
385 assert isinstance(credentials, identity_pool.Credentials)
386 mock_init.assert_called_once_with(
387 audience=AUDIENCE,
388 subject_token_type=SUBJECT_TOKEN_TYPE,
389 token_url=TOKEN_URL,
390 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
391 client_id=CLIENT_ID,
392 client_secret=CLIENT_SECRET,
393 credential_source=self.CREDENTIAL_SOURCE_TEXT,
394 quota_project_id=QUOTA_PROJECT_ID,
bojeil-google993bab22021-09-21 14:00:15 -0700395 workforce_pool_user_project=None,
bojeil-googled4d7f382021-02-16 12:33:20 -0800396 )
397
398 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
399 def test_from_file_required_options_only(self, mock_init, tmpdir):
400 info = {
401 "audience": AUDIENCE,
402 "subject_token_type": SUBJECT_TOKEN_TYPE,
403 "token_url": TOKEN_URL,
404 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
405 }
406 config_file = tmpdir.join("config.json")
407 config_file.write(json.dumps(info))
408 credentials = identity_pool.Credentials.from_file(str(config_file))
409
410 # Confirm identity_pool.Credentials instantiated with expected attributes.
411 assert isinstance(credentials, identity_pool.Credentials)
412 mock_init.assert_called_once_with(
413 audience=AUDIENCE,
414 subject_token_type=SUBJECT_TOKEN_TYPE,
415 token_url=TOKEN_URL,
416 service_account_impersonation_url=None,
417 client_id=None,
418 client_secret=None,
419 credential_source=self.CREDENTIAL_SOURCE_TEXT,
420 quota_project_id=None,
bojeil-google993bab22021-09-21 14:00:15 -0700421 workforce_pool_user_project=None,
422 )
423
424 @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
425 def test_from_file_workforce_pool(self, mock_init, tmpdir):
426 info = {
427 "audience": WORKFORCE_AUDIENCE,
428 "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
429 "token_url": TOKEN_URL,
430 "credential_source": self.CREDENTIAL_SOURCE_TEXT,
431 "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
432 }
433 config_file = tmpdir.join("config.json")
434 config_file.write(json.dumps(info))
435 credentials = identity_pool.Credentials.from_file(str(config_file))
436
437 # Confirm identity_pool.Credentials instantiated with expected attributes.
438 assert isinstance(credentials, identity_pool.Credentials)
439 mock_init.assert_called_once_with(
440 audience=WORKFORCE_AUDIENCE,
441 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
442 token_url=TOKEN_URL,
443 service_account_impersonation_url=None,
444 client_id=None,
445 client_secret=None,
446 credential_source=self.CREDENTIAL_SOURCE_TEXT,
447 quota_project_id=None,
448 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
449 )
450
451 def test_constructor_nonworkforce_with_workforce_pool_user_project(self):
452 with pytest.raises(ValueError) as excinfo:
453 self.make_credentials(
454 audience=AUDIENCE,
455 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
456 )
457
458 assert excinfo.match(
459 "workforce_pool_user_project should not be set for non-workforce "
460 "pool credentials"
bojeil-googled4d7f382021-02-16 12:33:20 -0800461 )
462
463 def test_constructor_invalid_options(self):
464 credential_source = {"unsupported": "value"}
465
466 with pytest.raises(ValueError) as excinfo:
467 self.make_credentials(credential_source=credential_source)
468
469 assert excinfo.match(r"Missing credential_source")
470
471 def test_constructor_invalid_options_url_and_file(self):
472 credential_source = {
473 "url": self.CREDENTIAL_URL,
474 "file": SUBJECT_TOKEN_TEXT_FILE,
475 }
476
477 with pytest.raises(ValueError) as excinfo:
478 self.make_credentials(credential_source=credential_source)
479
480 assert excinfo.match(r"Ambiguous credential_source")
481
482 def test_constructor_invalid_options_environment_id(self):
483 credential_source = {"url": self.CREDENTIAL_URL, "environment_id": "aws1"}
484
485 with pytest.raises(ValueError) as excinfo:
486 self.make_credentials(credential_source=credential_source)
487
488 assert excinfo.match(
489 r"Invalid Identity Pool credential_source field 'environment_id'"
490 )
491
492 def test_constructor_invalid_credential_source(self):
493 with pytest.raises(ValueError) as excinfo:
494 self.make_credentials(credential_source="non-dict")
495
496 assert excinfo.match(r"Missing credential_source")
497
498 def test_constructor_invalid_credential_source_format_type(self):
499 credential_source = {"format": {"type": "xml"}}
500
501 with pytest.raises(ValueError) as excinfo:
502 self.make_credentials(credential_source=credential_source)
503
504 assert excinfo.match(r"Invalid credential_source format 'xml'")
505
506 def test_constructor_missing_subject_token_field_name(self):
507 credential_source = {"format": {"type": "json"}}
508
509 with pytest.raises(ValueError) as excinfo:
510 self.make_credentials(credential_source=credential_source)
511
512 assert excinfo.match(
513 r"Missing subject_token_field_name for JSON credential_source format"
514 )
515
bojeil-google993bab22021-09-21 14:00:15 -0700516 def test_info_with_workforce_pool_user_project(self):
517 credentials = self.make_credentials(
518 audience=WORKFORCE_AUDIENCE,
519 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
520 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
521 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
522 )
523
524 assert credentials.info == {
525 "type": "external_account",
526 "audience": WORKFORCE_AUDIENCE,
527 "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
528 "token_url": TOKEN_URL,
529 "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
530 "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
531 }
532
bojeil-googlef97499c2021-06-09 07:58:25 -0700533 def test_info_with_file_credential_source(self):
534 credentials = self.make_credentials(
535 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
536 )
537
538 assert credentials.info == {
539 "type": "external_account",
540 "audience": AUDIENCE,
541 "subject_token_type": SUBJECT_TOKEN_TYPE,
542 "token_url": TOKEN_URL,
543 "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
544 }
545
546 def test_info_with_url_credential_source(self):
547 credentials = self.make_credentials(
548 credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
549 )
550
551 assert credentials.info == {
552 "type": "external_account",
553 "audience": AUDIENCE,
554 "subject_token_type": SUBJECT_TOKEN_TYPE,
555 "token_url": TOKEN_URL,
556 "credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
557 }
558
bojeil-googled4d7f382021-02-16 12:33:20 -0800559 def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
560 # Provide empty text file.
561 empty_file = tmpdir.join("empty.txt")
562 empty_file.write("")
563 credential_source = {"file": str(empty_file)}
564 credentials = self.make_credentials(credential_source=credential_source)
565
566 with pytest.raises(exceptions.RefreshError) as excinfo:
567 credentials.retrieve_subject_token(None)
568
569 assert excinfo.match(r"Missing subject_token in the credential_source file")
570
571 def test_retrieve_subject_token_text_file(self):
572 credentials = self.make_credentials(
573 credential_source=self.CREDENTIAL_SOURCE_TEXT
574 )
575
576 subject_token = credentials.retrieve_subject_token(None)
577
578 assert subject_token == TEXT_FILE_SUBJECT_TOKEN
579
580 def test_retrieve_subject_token_json_file(self):
581 credentials = self.make_credentials(
582 credential_source=self.CREDENTIAL_SOURCE_JSON
583 )
584
585 subject_token = credentials.retrieve_subject_token(None)
586
587 assert subject_token == JSON_FILE_SUBJECT_TOKEN
588
589 def test_retrieve_subject_token_json_file_invalid_field_name(self):
590 credential_source = {
591 "file": SUBJECT_TOKEN_JSON_FILE,
592 "format": {"type": "json", "subject_token_field_name": "not_found"},
593 }
594 credentials = self.make_credentials(credential_source=credential_source)
595
596 with pytest.raises(exceptions.RefreshError) as excinfo:
597 credentials.retrieve_subject_token(None)
598
599 assert excinfo.match(
600 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
601 SUBJECT_TOKEN_JSON_FILE, "not_found"
602 )
603 )
604
605 def test_retrieve_subject_token_invalid_json(self, tmpdir):
606 # Provide JSON file. This should result in JSON parsing error.
607 invalid_json_file = tmpdir.join("invalid.json")
608 invalid_json_file.write("{")
609 credential_source = {
610 "file": str(invalid_json_file),
611 "format": {"type": "json", "subject_token_field_name": "access_token"},
612 }
613 credentials = self.make_credentials(credential_source=credential_source)
614
615 with pytest.raises(exceptions.RefreshError) as excinfo:
616 credentials.retrieve_subject_token(None)
617
618 assert excinfo.match(
619 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
620 str(invalid_json_file), "access_token"
621 )
622 )
623
624 def test_retrieve_subject_token_file_not_found(self):
625 credential_source = {"file": "./not_found.txt"}
626 credentials = self.make_credentials(credential_source=credential_source)
627
628 with pytest.raises(exceptions.RefreshError) as excinfo:
629 credentials.retrieve_subject_token(None)
630
631 assert excinfo.match(r"File './not_found.txt' was not found")
632
633 def test_refresh_text_file_success_without_impersonation_ignore_default_scopes(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800634 self,
bojeil-googled4d7f382021-02-16 12:33:20 -0800635 ):
636 credentials = self.make_credentials(
637 client_id=CLIENT_ID,
638 client_secret=CLIENT_SECRET,
639 # Test with text format type.
640 credential_source=self.CREDENTIAL_SOURCE_TEXT,
641 scopes=SCOPES,
642 # Default scopes should be ignored.
643 default_scopes=["ignored"],
644 )
645
646 self.assert_underlying_credentials_refresh(
647 credentials=credentials,
648 audience=AUDIENCE,
649 subject_token=TEXT_FILE_SUBJECT_TOKEN,
650 subject_token_type=SUBJECT_TOKEN_TYPE,
651 token_url=TOKEN_URL,
652 service_account_impersonation_url=None,
653 basic_auth_encoding=BASIC_AUTH_ENCODING,
654 quota_project_id=None,
655 used_scopes=SCOPES,
656 scopes=SCOPES,
657 default_scopes=["ignored"],
658 )
659
bojeil-google993bab22021-09-21 14:00:15 -0700660 def test_refresh_workforce_success_with_client_auth_without_impersonation(self):
661 credentials = self.make_credentials(
662 audience=WORKFORCE_AUDIENCE,
663 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
664 client_id=CLIENT_ID,
665 client_secret=CLIENT_SECRET,
666 # Test with text format type.
667 credential_source=self.CREDENTIAL_SOURCE_TEXT,
668 scopes=SCOPES,
669 # This will be ignored in favor of client auth.
670 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
671 )
672
673 self.assert_underlying_credentials_refresh(
674 credentials=credentials,
675 audience=WORKFORCE_AUDIENCE,
676 subject_token=TEXT_FILE_SUBJECT_TOKEN,
677 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
678 token_url=TOKEN_URL,
679 service_account_impersonation_url=None,
680 basic_auth_encoding=BASIC_AUTH_ENCODING,
681 quota_project_id=None,
682 used_scopes=SCOPES,
683 scopes=SCOPES,
684 workforce_pool_user_project=None,
685 )
686
687 def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self):
688 credentials = self.make_credentials(
689 audience=WORKFORCE_AUDIENCE,
690 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
691 client_id=CLIENT_ID,
692 client_secret=CLIENT_SECRET,
693 # Test with text format type.
694 credential_source=self.CREDENTIAL_SOURCE_TEXT,
695 scopes=SCOPES,
696 # This is not needed when client Auth is used.
697 workforce_pool_user_project=None,
698 )
699
700 self.assert_underlying_credentials_refresh(
701 credentials=credentials,
702 audience=WORKFORCE_AUDIENCE,
703 subject_token=TEXT_FILE_SUBJECT_TOKEN,
704 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
705 token_url=TOKEN_URL,
706 service_account_impersonation_url=None,
707 basic_auth_encoding=BASIC_AUTH_ENCODING,
708 quota_project_id=None,
709 used_scopes=SCOPES,
710 scopes=SCOPES,
711 workforce_pool_user_project=None,
712 )
713
714 def test_refresh_workforce_success_without_client_auth_without_impersonation(self):
715 credentials = self.make_credentials(
716 audience=WORKFORCE_AUDIENCE,
717 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
718 client_id=None,
719 client_secret=None,
720 # Test with text format type.
721 credential_source=self.CREDENTIAL_SOURCE_TEXT,
722 scopes=SCOPES,
723 # This will not be ignored as client auth is not used.
724 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
725 )
726
727 self.assert_underlying_credentials_refresh(
728 credentials=credentials,
729 audience=WORKFORCE_AUDIENCE,
730 subject_token=TEXT_FILE_SUBJECT_TOKEN,
731 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
732 token_url=TOKEN_URL,
733 service_account_impersonation_url=None,
734 basic_auth_encoding=None,
735 quota_project_id=None,
736 used_scopes=SCOPES,
737 scopes=SCOPES,
738 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
739 )
740
741 def test_refresh_workforce_success_without_client_auth_with_impersonation(self):
742 credentials = self.make_credentials(
743 audience=WORKFORCE_AUDIENCE,
744 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
745 client_id=None,
746 client_secret=None,
747 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
748 # Test with text format type.
749 credential_source=self.CREDENTIAL_SOURCE_TEXT,
750 scopes=SCOPES,
751 # This will not be ignored as client auth is not used.
752 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
753 )
754
755 self.assert_underlying_credentials_refresh(
756 credentials=credentials,
757 audience=WORKFORCE_AUDIENCE,
758 subject_token=TEXT_FILE_SUBJECT_TOKEN,
759 subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
760 token_url=TOKEN_URL,
761 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
762 basic_auth_encoding=None,
763 quota_project_id=None,
764 used_scopes=SCOPES,
765 scopes=SCOPES,
766 workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
767 )
768
bojeil-googled4d7f382021-02-16 12:33:20 -0800769 def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
770 credentials = self.make_credentials(
771 client_id=CLIENT_ID,
772 client_secret=CLIENT_SECRET,
773 # Test with text format type.
774 credential_source=self.CREDENTIAL_SOURCE_TEXT,
775 scopes=None,
776 # Default scopes should be used since user specified scopes are none.
777 default_scopes=SCOPES,
778 )
779
780 self.assert_underlying_credentials_refresh(
781 credentials=credentials,
782 audience=AUDIENCE,
783 subject_token=TEXT_FILE_SUBJECT_TOKEN,
784 subject_token_type=SUBJECT_TOKEN_TYPE,
785 token_url=TOKEN_URL,
786 service_account_impersonation_url=None,
787 basic_auth_encoding=BASIC_AUTH_ENCODING,
788 quota_project_id=None,
789 used_scopes=SCOPES,
790 scopes=None,
791 default_scopes=SCOPES,
792 )
793
794 def test_refresh_text_file_success_with_impersonation_ignore_default_scopes(self):
795 # Initialize credentials with service account impersonation and basic auth.
796 credentials = self.make_credentials(
797 # Test with text format type.
798 credential_source=self.CREDENTIAL_SOURCE_TEXT,
799 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
800 scopes=SCOPES,
801 # Default scopes should be ignored.
802 default_scopes=["ignored"],
803 )
804
805 self.assert_underlying_credentials_refresh(
806 credentials=credentials,
807 audience=AUDIENCE,
808 subject_token=TEXT_FILE_SUBJECT_TOKEN,
809 subject_token_type=SUBJECT_TOKEN_TYPE,
810 token_url=TOKEN_URL,
811 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
812 basic_auth_encoding=None,
813 quota_project_id=None,
814 used_scopes=SCOPES,
815 scopes=SCOPES,
816 default_scopes=["ignored"],
817 )
818
819 def test_refresh_text_file_success_with_impersonation_use_default_scopes(self):
820 # Initialize credentials with service account impersonation, basic auth
821 # and default scopes (no user scopes).
822 credentials = self.make_credentials(
823 # Test with text format type.
824 credential_source=self.CREDENTIAL_SOURCE_TEXT,
825 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
826 scopes=None,
827 # Default scopes should be used since user specified scopes are none.
828 default_scopes=SCOPES,
829 )
830
831 self.assert_underlying_credentials_refresh(
832 credentials=credentials,
833 audience=AUDIENCE,
834 subject_token=TEXT_FILE_SUBJECT_TOKEN,
835 subject_token_type=SUBJECT_TOKEN_TYPE,
836 token_url=TOKEN_URL,
837 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
838 basic_auth_encoding=None,
839 quota_project_id=None,
840 used_scopes=SCOPES,
841 scopes=None,
842 default_scopes=SCOPES,
843 )
844
845 def test_refresh_json_file_success_without_impersonation(self):
846 credentials = self.make_credentials(
847 client_id=CLIENT_ID,
848 client_secret=CLIENT_SECRET,
849 # Test with JSON format type.
850 credential_source=self.CREDENTIAL_SOURCE_JSON,
851 scopes=SCOPES,
852 )
853
854 self.assert_underlying_credentials_refresh(
855 credentials=credentials,
856 audience=AUDIENCE,
857 subject_token=JSON_FILE_SUBJECT_TOKEN,
858 subject_token_type=SUBJECT_TOKEN_TYPE,
859 token_url=TOKEN_URL,
860 service_account_impersonation_url=None,
861 basic_auth_encoding=BASIC_AUTH_ENCODING,
862 quota_project_id=None,
863 used_scopes=SCOPES,
864 scopes=SCOPES,
865 default_scopes=None,
866 )
867
868 def test_refresh_json_file_success_with_impersonation(self):
869 # Initialize credentials with service account impersonation and basic auth.
870 credentials = self.make_credentials(
871 # Test with JSON format type.
872 credential_source=self.CREDENTIAL_SOURCE_JSON,
873 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
874 scopes=SCOPES,
875 )
876
877 self.assert_underlying_credentials_refresh(
878 credentials=credentials,
879 audience=AUDIENCE,
880 subject_token=JSON_FILE_SUBJECT_TOKEN,
881 subject_token_type=SUBJECT_TOKEN_TYPE,
882 token_url=TOKEN_URL,
883 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
884 basic_auth_encoding=None,
885 quota_project_id=None,
886 used_scopes=SCOPES,
887 scopes=SCOPES,
888 default_scopes=None,
889 )
890
891 def test_refresh_with_retrieve_subject_token_error(self):
892 credential_source = {
893 "file": SUBJECT_TOKEN_JSON_FILE,
894 "format": {"type": "json", "subject_token_field_name": "not_found"},
895 }
896 credentials = self.make_credentials(credential_source=credential_source)
897
898 with pytest.raises(exceptions.RefreshError) as excinfo:
899 credentials.refresh(None)
900
901 assert excinfo.match(
902 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
903 SUBJECT_TOKEN_JSON_FILE, "not_found"
904 )
905 )
906
907 def test_retrieve_subject_token_from_url(self):
908 credentials = self.make_credentials(
909 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
910 )
911 request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
912 subject_token = credentials.retrieve_subject_token(request)
913
914 assert subject_token == TEXT_FILE_SUBJECT_TOKEN
arithmetic1728d80c85f2021-03-08 13:35:44 -0800915 self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
bojeil-googled4d7f382021-02-16 12:33:20 -0800916
917 def test_retrieve_subject_token_from_url_with_headers(self):
918 credentials = self.make_credentials(
919 credential_source={"url": self.CREDENTIAL_URL, "headers": {"foo": "bar"}}
920 )
921 request = self.make_mock_request(token_data=TEXT_FILE_SUBJECT_TOKEN)
922 subject_token = credentials.retrieve_subject_token(request)
923
924 assert subject_token == TEXT_FILE_SUBJECT_TOKEN
925 self.assert_credential_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800926 request.call_args_list[0][1], {"foo": "bar"}
bojeil-googled4d7f382021-02-16 12:33:20 -0800927 )
928
929 def test_retrieve_subject_token_from_url_json(self):
930 credentials = self.make_credentials(
931 credential_source=self.CREDENTIAL_SOURCE_JSON_URL
932 )
933 request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
934 subject_token = credentials.retrieve_subject_token(request)
935
936 assert subject_token == JSON_FILE_SUBJECT_TOKEN
arithmetic1728d80c85f2021-03-08 13:35:44 -0800937 self.assert_credential_request_kwargs(request.call_args_list[0][1], None)
bojeil-googled4d7f382021-02-16 12:33:20 -0800938
939 def test_retrieve_subject_token_from_url_json_with_headers(self):
940 credentials = self.make_credentials(
941 credential_source={
942 "url": self.CREDENTIAL_URL,
943 "format": {"type": "json", "subject_token_field_name": "access_token"},
944 "headers": {"foo": "bar"},
945 }
946 )
947 request = self.make_mock_request(token_data=JSON_FILE_CONTENT)
948 subject_token = credentials.retrieve_subject_token(request)
949
950 assert subject_token == JSON_FILE_SUBJECT_TOKEN
951 self.assert_credential_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800952 request.call_args_list[0][1], {"foo": "bar"}
bojeil-googled4d7f382021-02-16 12:33:20 -0800953 )
954
955 def test_retrieve_subject_token_from_url_not_found(self):
956 credentials = self.make_credentials(
957 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL
958 )
959 with pytest.raises(exceptions.RefreshError) as excinfo:
960 credentials.retrieve_subject_token(
961 self.make_mock_request(token_status=404, token_data=JSON_FILE_CONTENT)
962 )
963
964 assert excinfo.match("Unable to retrieve Identity Pool subject token")
965
966 def test_retrieve_subject_token_from_url_json_invalid_field(self):
967 credential_source = {
968 "url": self.CREDENTIAL_URL,
969 "format": {"type": "json", "subject_token_field_name": "not_found"},
970 }
971 credentials = self.make_credentials(credential_source=credential_source)
972
973 with pytest.raises(exceptions.RefreshError) as excinfo:
974 credentials.retrieve_subject_token(
975 self.make_mock_request(token_data=JSON_FILE_CONTENT)
976 )
977
978 assert excinfo.match(
979 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
980 self.CREDENTIAL_URL, "not_found"
981 )
982 )
983
984 def test_retrieve_subject_token_from_url_json_invalid_format(self):
985 credentials = self.make_credentials(
986 credential_source=self.CREDENTIAL_SOURCE_JSON_URL
987 )
988
989 with pytest.raises(exceptions.RefreshError) as excinfo:
990 credentials.retrieve_subject_token(self.make_mock_request(token_data="{"))
991
992 assert excinfo.match(
993 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
994 self.CREDENTIAL_URL, "access_token"
995 )
996 )
997
998 def test_refresh_text_file_success_without_impersonation_url(self):
999 credentials = self.make_credentials(
1000 client_id=CLIENT_ID,
1001 client_secret=CLIENT_SECRET,
1002 # Test with text format type.
1003 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
1004 scopes=SCOPES,
1005 )
1006
1007 self.assert_underlying_credentials_refresh(
1008 credentials=credentials,
1009 audience=AUDIENCE,
1010 subject_token=TEXT_FILE_SUBJECT_TOKEN,
1011 subject_token_type=SUBJECT_TOKEN_TYPE,
1012 token_url=TOKEN_URL,
1013 service_account_impersonation_url=None,
1014 basic_auth_encoding=BASIC_AUTH_ENCODING,
1015 quota_project_id=None,
1016 used_scopes=SCOPES,
1017 scopes=SCOPES,
1018 default_scopes=None,
1019 credential_data=TEXT_FILE_SUBJECT_TOKEN,
1020 )
1021
1022 def test_refresh_text_file_success_with_impersonation_url(self):
1023 # Initialize credentials with service account impersonation and basic auth.
1024 credentials = self.make_credentials(
1025 # Test with text format type.
1026 credential_source=self.CREDENTIAL_SOURCE_TEXT_URL,
1027 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1028 scopes=SCOPES,
1029 )
1030
1031 self.assert_underlying_credentials_refresh(
1032 credentials=credentials,
1033 audience=AUDIENCE,
1034 subject_token=TEXT_FILE_SUBJECT_TOKEN,
1035 subject_token_type=SUBJECT_TOKEN_TYPE,
1036 token_url=TOKEN_URL,
1037 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1038 basic_auth_encoding=None,
1039 quota_project_id=None,
1040 used_scopes=SCOPES,
1041 scopes=SCOPES,
1042 default_scopes=None,
1043 credential_data=TEXT_FILE_SUBJECT_TOKEN,
1044 )
1045
1046 def test_refresh_json_file_success_without_impersonation_url(self):
1047 credentials = self.make_credentials(
1048 client_id=CLIENT_ID,
1049 client_secret=CLIENT_SECRET,
1050 # Test with JSON format type.
1051 credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
1052 scopes=SCOPES,
1053 )
1054
1055 self.assert_underlying_credentials_refresh(
1056 credentials=credentials,
1057 audience=AUDIENCE,
1058 subject_token=JSON_FILE_SUBJECT_TOKEN,
1059 subject_token_type=SUBJECT_TOKEN_TYPE,
1060 token_url=TOKEN_URL,
1061 service_account_impersonation_url=None,
1062 basic_auth_encoding=BASIC_AUTH_ENCODING,
1063 quota_project_id=None,
1064 used_scopes=SCOPES,
1065 scopes=SCOPES,
1066 default_scopes=None,
1067 credential_data=JSON_FILE_CONTENT,
1068 )
1069
1070 def test_refresh_json_file_success_with_impersonation_url(self):
1071 # Initialize credentials with service account impersonation and basic auth.
1072 credentials = self.make_credentials(
1073 # Test with JSON format type.
1074 credential_source=self.CREDENTIAL_SOURCE_JSON_URL,
1075 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1076 scopes=SCOPES,
1077 )
1078
1079 self.assert_underlying_credentials_refresh(
1080 credentials=credentials,
1081 audience=AUDIENCE,
1082 subject_token=JSON_FILE_SUBJECT_TOKEN,
1083 subject_token_type=SUBJECT_TOKEN_TYPE,
1084 token_url=TOKEN_URL,
1085 service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
1086 basic_auth_encoding=None,
1087 quota_project_id=None,
1088 used_scopes=SCOPES,
1089 scopes=SCOPES,
1090 default_scopes=None,
1091 credential_data=JSON_FILE_CONTENT,
1092 )
1093
1094 def test_refresh_with_retrieve_subject_token_error_url(self):
1095 credential_source = {
1096 "url": self.CREDENTIAL_URL,
1097 "format": {"type": "json", "subject_token_field_name": "not_found"},
1098 }
1099 credentials = self.make_credentials(credential_source=credential_source)
1100
1101 with pytest.raises(exceptions.RefreshError) as excinfo:
1102 credentials.refresh(self.make_mock_request(token_data=JSON_FILE_CONTENT))
1103
1104 assert excinfo.match(
1105 "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
1106 self.CREDENTIAL_URL, "not_found"
1107 )
1108 )