blob: e8297dab62af42cced01f3986c14c7d0c653726e [file] [log] [blame]
bojeil-googled4d7f382021-02-16 12:33:20 -08001# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
Tres Seaver560cf1e2021-08-03 16:35:54 -040016import http.client
bojeil-googled4d7f382021-02-16 12:33:20 -080017import json
Tres Seaver560cf1e2021-08-03 16:35:54 -040018import urllib
bojeil-googled4d7f382021-02-16 12:33:20 -080019
20import mock
21import pytest
bojeil-googled4d7f382021-02-16 12:33:20 -080022
23from google.auth import _helpers
24from google.auth import exceptions
25from google.auth import external_account
26from google.auth import transport
27
28
29CLIENT_ID = "username"
30CLIENT_SECRET = "password"
31# Base64 encoding of "username:password"
32BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
33SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
bojeil-googlef97499c2021-06-09 07:58:25 -070034# List of valid workforce pool audiences.
35TEST_USER_AUDIENCES = [
36 "//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
37 "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
38 "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
39]
bojeil-googled4d7f382021-02-16 12:33:20 -080040
41
42class CredentialsImpl(external_account.Credentials):
43 def __init__(
44 self,
45 audience,
46 subject_token_type,
47 token_url,
48 credential_source,
49 service_account_impersonation_url=None,
50 client_id=None,
51 client_secret=None,
52 quota_project_id=None,
53 scopes=None,
54 default_scopes=None,
55 ):
56 super(CredentialsImpl, self).__init__(
57 audience=audience,
58 subject_token_type=subject_token_type,
59 token_url=token_url,
60 credential_source=credential_source,
61 service_account_impersonation_url=service_account_impersonation_url,
62 client_id=client_id,
63 client_secret=client_secret,
64 quota_project_id=quota_project_id,
65 scopes=scopes,
66 default_scopes=default_scopes,
67 )
68 self._counter = 0
69
70 def retrieve_subject_token(self, request):
71 counter = self._counter
72 self._counter += 1
73 return "subject_token_{}".format(counter)
74
75
76class TestCredentials(object):
77 TOKEN_URL = "https://sts.googleapis.com/v1/token"
78 PROJECT_NUMBER = "123456"
79 POOL_ID = "POOL_ID"
80 PROVIDER_ID = "PROVIDER_ID"
81 AUDIENCE = (
82 "//iam.googleapis.com/projects/{}"
83 "/locations/global/workloadIdentityPools/{}"
84 "/providers/{}"
85 ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
86 SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
87 CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
88 SUCCESS_RESPONSE = {
89 "access_token": "ACCESS_TOKEN",
90 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
91 "token_type": "Bearer",
92 "expires_in": 3600,
93 "scope": "scope1 scope2",
94 }
95 ERROR_RESPONSE = {
96 "error": "invalid_request",
97 "error_description": "Invalid subject token",
98 "error_uri": "https://tools.ietf.org/html/rfc6749",
99 }
100 QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
101 SERVICE_ACCOUNT_IMPERSONATION_URL = (
102 "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
103 + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
104 )
105 SCOPES = ["scope1", "scope2"]
106 IMPERSONATION_ERROR_RESPONSE = {
107 "error": {
108 "code": 400,
109 "message": "Request contains an invalid argument",
110 "status": "INVALID_ARGUMENT",
111 }
112 }
113 PROJECT_ID = "my-proj-id"
114 CLOUD_RESOURCE_MANAGER_URL = (
115 "https://cloudresourcemanager.googleapis.com/v1/projects/"
116 )
117 CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
118 "projectNumber": PROJECT_NUMBER,
119 "projectId": PROJECT_ID,
120 "lifecycleState": "ACTIVE",
121 "name": "project-name",
122 "createTime": "2018-11-06T04:42:54.109Z",
123 "parent": {"type": "folder", "id": "12345678901"},
124 }
125
126 @classmethod
127 def make_credentials(
128 cls,
129 client_id=None,
130 client_secret=None,
131 quota_project_id=None,
132 scopes=None,
133 default_scopes=None,
134 service_account_impersonation_url=None,
135 ):
136 return CredentialsImpl(
137 audience=cls.AUDIENCE,
138 subject_token_type=cls.SUBJECT_TOKEN_TYPE,
139 token_url=cls.TOKEN_URL,
140 service_account_impersonation_url=service_account_impersonation_url,
141 credential_source=cls.CREDENTIAL_SOURCE,
142 client_id=client_id,
143 client_secret=client_secret,
144 quota_project_id=quota_project_id,
145 scopes=scopes,
146 default_scopes=default_scopes,
147 )
148
149 @classmethod
150 def make_mock_request(
151 cls,
Tres Seaver560cf1e2021-08-03 16:35:54 -0400152 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800153 data=None,
154 impersonation_status=None,
155 impersonation_data=None,
156 cloud_resource_manager_status=None,
157 cloud_resource_manager_data=None,
158 ):
159 # STS token exchange request.
160 token_response = mock.create_autospec(transport.Response, instance=True)
161 token_response.status = status
162 token_response.data = json.dumps(data).encode("utf-8")
163 responses = [token_response]
164
165 # If service account impersonation is requested, mock the expected response.
166 if impersonation_status:
167 impersonation_response = mock.create_autospec(
168 transport.Response, instance=True
169 )
170 impersonation_response.status = impersonation_status
171 impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
172 responses.append(impersonation_response)
173
174 # If cloud resource manager is requested, mock the expected response.
175 if cloud_resource_manager_status:
176 cloud_resource_manager_response = mock.create_autospec(
177 transport.Response, instance=True
178 )
179 cloud_resource_manager_response.status = cloud_resource_manager_status
180 cloud_resource_manager_response.data = json.dumps(
181 cloud_resource_manager_data
182 ).encode("utf-8")
183 responses.append(cloud_resource_manager_response)
184
185 request = mock.create_autospec(transport.Request)
186 request.side_effect = responses
187
188 return request
189
190 @classmethod
191 def assert_token_request_kwargs(cls, request_kwargs, headers, request_data):
192 assert request_kwargs["url"] == cls.TOKEN_URL
193 assert request_kwargs["method"] == "POST"
194 assert request_kwargs["headers"] == headers
195 assert request_kwargs["body"] is not None
196 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
197 for (k, v) in body_tuples:
198 assert v.decode("utf-8") == request_data[k.decode("utf-8")]
199 assert len(body_tuples) == len(request_data.keys())
200
201 @classmethod
202 def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data):
203 assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
204 assert request_kwargs["method"] == "POST"
205 assert request_kwargs["headers"] == headers
206 assert request_kwargs["body"] is not None
207 body_json = json.loads(request_kwargs["body"].decode("utf-8"))
208 assert body_json == request_data
209
210 @classmethod
211 def assert_resource_manager_request_kwargs(
212 cls, request_kwargs, project_number, headers
213 ):
214 assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
215 assert request_kwargs["method"] == "GET"
216 assert request_kwargs["headers"] == headers
217 assert "body" not in request_kwargs
218
219 def test_default_state(self):
220 credentials = self.make_credentials()
221
222 # Not token acquired yet
223 assert not credentials.token
224 assert not credentials.valid
225 # Expiration hasn't been set yet
226 assert not credentials.expiry
227 assert not credentials.expired
228 # Scopes are required
229 assert not credentials.scopes
230 assert credentials.requires_scopes
231 assert not credentials.quota_project_id
232
233 def test_with_scopes(self):
234 credentials = self.make_credentials()
235
236 assert not credentials.scopes
237 assert credentials.requires_scopes
238
239 scoped_credentials = credentials.with_scopes(["email"])
240
241 assert scoped_credentials.has_scopes(["email"])
242 assert not scoped_credentials.requires_scopes
243
244 def test_with_scopes_using_user_and_default_scopes(self):
245 credentials = self.make_credentials()
246
247 assert not credentials.scopes
248 assert credentials.requires_scopes
249
250 scoped_credentials = credentials.with_scopes(
251 ["email"], default_scopes=["profile"]
252 )
253
254 assert scoped_credentials.has_scopes(["email"])
255 assert not scoped_credentials.has_scopes(["profile"])
256 assert not scoped_credentials.requires_scopes
257 assert scoped_credentials.scopes == ["email"]
258 assert scoped_credentials.default_scopes == ["profile"]
259
260 def test_with_scopes_using_default_scopes_only(self):
261 credentials = self.make_credentials()
262
263 assert not credentials.scopes
264 assert credentials.requires_scopes
265
266 scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
267
268 assert scoped_credentials.has_scopes(["profile"])
269 assert not scoped_credentials.requires_scopes
270
271 def test_with_scopes_full_options_propagated(self):
272 credentials = self.make_credentials(
273 client_id=CLIENT_ID,
274 client_secret=CLIENT_SECRET,
275 quota_project_id=self.QUOTA_PROJECT_ID,
276 scopes=self.SCOPES,
277 default_scopes=["default1"],
278 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
279 )
280
281 with mock.patch.object(
282 external_account.Credentials, "__init__", return_value=None
283 ) as mock_init:
284 credentials.with_scopes(["email"], ["default2"])
285
286 # Confirm with_scopes initialized the credential with the expected
287 # parameters and scopes.
288 mock_init.assert_called_once_with(
289 audience=self.AUDIENCE,
290 subject_token_type=self.SUBJECT_TOKEN_TYPE,
291 token_url=self.TOKEN_URL,
292 credential_source=self.CREDENTIAL_SOURCE,
293 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
294 client_id=CLIENT_ID,
295 client_secret=CLIENT_SECRET,
296 quota_project_id=self.QUOTA_PROJECT_ID,
297 scopes=["email"],
298 default_scopes=["default2"],
299 )
300
301 def test_with_quota_project(self):
302 credentials = self.make_credentials()
303
304 assert not credentials.scopes
305 assert not credentials.quota_project_id
306
307 quota_project_creds = credentials.with_quota_project("project-foo")
308
309 assert quota_project_creds.quota_project_id == "project-foo"
310
311 def test_with_quota_project_full_options_propagated(self):
312 credentials = self.make_credentials(
313 client_id=CLIENT_ID,
314 client_secret=CLIENT_SECRET,
315 quota_project_id=self.QUOTA_PROJECT_ID,
316 scopes=self.SCOPES,
317 default_scopes=["default1"],
318 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
319 )
320
321 with mock.patch.object(
322 external_account.Credentials, "__init__", return_value=None
323 ) as mock_init:
324 credentials.with_quota_project("project-foo")
325
326 # Confirm with_quota_project initialized the credential with the
327 # expected parameters and quota project ID.
328 mock_init.assert_called_once_with(
329 audience=self.AUDIENCE,
330 subject_token_type=self.SUBJECT_TOKEN_TYPE,
331 token_url=self.TOKEN_URL,
332 credential_source=self.CREDENTIAL_SOURCE,
333 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
334 client_id=CLIENT_ID,
335 client_secret=CLIENT_SECRET,
336 quota_project_id="project-foo",
337 scopes=self.SCOPES,
338 default_scopes=["default1"],
339 )
340
341 def test_with_invalid_impersonation_target_principal(self):
342 invalid_url = "https://iamcredentials.googleapis.com/v1/invalid"
343
344 with pytest.raises(exceptions.RefreshError) as excinfo:
345 self.make_credentials(service_account_impersonation_url=invalid_url)
346
347 assert excinfo.match(
348 r"Unable to determine target principal from service account impersonation URL."
349 )
350
bojeil-googlef97499c2021-06-09 07:58:25 -0700351 def test_info(self):
352 credentials = self.make_credentials()
353
354 assert credentials.info == {
355 "type": "external_account",
356 "audience": self.AUDIENCE,
357 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
358 "token_url": self.TOKEN_URL,
359 "credential_source": self.CREDENTIAL_SOURCE.copy(),
360 }
361
362 def test_info_with_full_options(self):
363 credentials = self.make_credentials(
364 client_id=CLIENT_ID,
365 client_secret=CLIENT_SECRET,
366 quota_project_id=self.QUOTA_PROJECT_ID,
367 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
368 )
369
370 assert credentials.info == {
371 "type": "external_account",
372 "audience": self.AUDIENCE,
373 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
374 "token_url": self.TOKEN_URL,
375 "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
376 "credential_source": self.CREDENTIAL_SOURCE.copy(),
377 "quota_project_id": self.QUOTA_PROJECT_ID,
378 "client_id": CLIENT_ID,
379 "client_secret": CLIENT_SECRET,
380 }
381
382 def test_service_account_email_without_impersonation(self):
383 credentials = self.make_credentials()
384
385 assert credentials.service_account_email is None
386
387 def test_service_account_email_with_impersonation(self):
388 credentials = self.make_credentials(
389 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
390 )
391
392 assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
393
394 @pytest.mark.parametrize(
395 "audience",
396 # Workload identity pool audiences or invalid workforce pool audiences.
397 [
398 # Legacy K8s audience format.
399 "identitynamespace:1f12345:my_provider",
400 (
401 "//iam.googleapis.com/projects/123456/locations/"
402 "global/workloadIdentityPools/pool-id/providers/"
403 "provider-id"
404 ),
405 (
406 "//iam.googleapis.com/projects/123456/locations/"
407 "eu/workloadIdentityPools/pool-id/providers/"
408 "provider-id"
409 ),
410 # Pool ID with workforcePools string.
411 (
412 "//iam.googleapis.com/projects/123456/locations/"
413 "global/workloadIdentityPools/workforcePools/providers/"
414 "provider-id"
415 ),
416 # Unrealistic / incorrect workforce pool audiences.
417 "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
418 "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
419 "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
420 "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
421 "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
422 ],
423 )
424 def test_is_user_with_non_users(self, audience):
425 credentials = CredentialsImpl(
426 audience=audience,
427 subject_token_type=self.SUBJECT_TOKEN_TYPE,
428 token_url=self.TOKEN_URL,
429 credential_source=self.CREDENTIAL_SOURCE,
430 )
431
432 assert credentials.is_user is False
433
434 @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
435 def test_is_user_with_users(self, audience):
436 credentials = CredentialsImpl(
437 audience=audience,
438 subject_token_type=self.SUBJECT_TOKEN_TYPE,
439 token_url=self.TOKEN_URL,
440 credential_source=self.CREDENTIAL_SOURCE,
441 )
442
443 assert credentials.is_user is True
444
445 @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
446 def test_is_user_with_users_and_impersonation(self, audience):
447 # Initialize the credentials with service account impersonation.
448 credentials = CredentialsImpl(
449 audience=audience,
450 subject_token_type=self.SUBJECT_TOKEN_TYPE,
451 token_url=self.TOKEN_URL,
452 credential_source=self.CREDENTIAL_SOURCE,
453 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
454 )
455
456 # Even though the audience is for a workforce pool, since service account
457 # impersonation is used, the credentials will represent a service account and
458 # not a user.
459 assert credentials.is_user is False
460
bojeil-googled4d7f382021-02-16 12:33:20 -0800461 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
462 def test_refresh_without_client_auth_success(self, unused_utcnow):
463 response = self.SUCCESS_RESPONSE.copy()
464 # Test custom expiration to confirm expiry is set correctly.
465 response["expires_in"] = 2800
466 expected_expiry = datetime.datetime.min + datetime.timedelta(
467 seconds=response["expires_in"]
468 )
469 headers = {"Content-Type": "application/x-www-form-urlencoded"}
470 request_data = {
471 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
472 "audience": self.AUDIENCE,
473 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
474 "subject_token": "subject_token_0",
475 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
476 }
Tres Seaver560cf1e2021-08-03 16:35:54 -0400477 request = self.make_mock_request(status=http.client.OK, data=response)
bojeil-googled4d7f382021-02-16 12:33:20 -0800478 credentials = self.make_credentials()
479
480 credentials.refresh(request)
481
arithmetic1728d80c85f2021-03-08 13:35:44 -0800482 self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
bojeil-googled4d7f382021-02-16 12:33:20 -0800483 assert credentials.valid
484 assert credentials.expiry == expected_expiry
485 assert not credentials.expired
486 assert credentials.token == response["access_token"]
487
488 def test_refresh_impersonation_without_client_auth_success(self):
489 # Simulate service account access token expires in 2800 seconds.
490 expire_time = (
491 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
492 ).isoformat("T") + "Z"
493 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
494 # STS token exchange request/response.
495 token_response = self.SUCCESS_RESPONSE.copy()
496 token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
497 token_request_data = {
498 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
499 "audience": self.AUDIENCE,
500 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
501 "subject_token": "subject_token_0",
502 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
503 "scope": "https://www.googleapis.com/auth/iam",
504 }
505 # Service account impersonation request/response.
506 impersonation_response = {
507 "accessToken": "SA_ACCESS_TOKEN",
508 "expireTime": expire_time,
509 }
510 impersonation_headers = {
511 "Content-Type": "application/json",
512 "authorization": "Bearer {}".format(token_response["access_token"]),
513 }
514 impersonation_request_data = {
515 "delegates": None,
516 "scope": self.SCOPES,
517 "lifetime": "3600s",
518 }
519 # Initialize mock request to handle token exchange and service account
520 # impersonation request.
521 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400522 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800523 data=token_response,
Tres Seaver560cf1e2021-08-03 16:35:54 -0400524 impersonation_status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800525 impersonation_data=impersonation_response,
526 )
527 # Initialize credentials with service account impersonation.
528 credentials = self.make_credentials(
529 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
530 scopes=self.SCOPES,
531 )
532
533 credentials.refresh(request)
534
535 # Only 2 requests should be processed.
536 assert len(request.call_args_list) == 2
537 # Verify token exchange request parameters.
538 self.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800539 request.call_args_list[0][1], token_headers, token_request_data
bojeil-googled4d7f382021-02-16 12:33:20 -0800540 )
541 # Verify service account impersonation request parameters.
542 self.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800543 request.call_args_list[1][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800544 impersonation_headers,
545 impersonation_request_data,
546 )
547 assert credentials.valid
548 assert credentials.expiry == expected_expiry
549 assert not credentials.expired
550 assert credentials.token == impersonation_response["accessToken"]
551
552 def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800553 self,
bojeil-googled4d7f382021-02-16 12:33:20 -0800554 ):
555 headers = {"Content-Type": "application/x-www-form-urlencoded"}
556 request_data = {
557 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
558 "audience": self.AUDIENCE,
559 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
560 "scope": "scope1 scope2",
561 "subject_token": "subject_token_0",
562 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
563 }
564 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400565 status=http.client.OK, data=self.SUCCESS_RESPONSE
bojeil-googled4d7f382021-02-16 12:33:20 -0800566 )
567 credentials = self.make_credentials(
568 scopes=["scope1", "scope2"],
569 # Default scopes will be ignored in favor of user scopes.
570 default_scopes=["ignored"],
571 )
572
573 credentials.refresh(request)
574
arithmetic1728d80c85f2021-03-08 13:35:44 -0800575 self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
bojeil-googled4d7f382021-02-16 12:33:20 -0800576 assert credentials.valid
577 assert not credentials.expired
578 assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
579 assert credentials.has_scopes(["scope1", "scope2"])
580 assert not credentials.has_scopes(["ignored"])
581
582 def test_refresh_without_client_auth_success_explicit_default_scopes_only(self):
583 headers = {"Content-Type": "application/x-www-form-urlencoded"}
584 request_data = {
585 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
586 "audience": self.AUDIENCE,
587 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
588 "scope": "scope1 scope2",
589 "subject_token": "subject_token_0",
590 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
591 }
592 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400593 status=http.client.OK, data=self.SUCCESS_RESPONSE
bojeil-googled4d7f382021-02-16 12:33:20 -0800594 )
595 credentials = self.make_credentials(
596 scopes=None,
597 # Default scopes will be used since user scopes are none.
598 default_scopes=["scope1", "scope2"],
599 )
600
601 credentials.refresh(request)
602
arithmetic1728d80c85f2021-03-08 13:35:44 -0800603 self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
bojeil-googled4d7f382021-02-16 12:33:20 -0800604 assert credentials.valid
605 assert not credentials.expired
606 assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
607 assert credentials.has_scopes(["scope1", "scope2"])
608
609 def test_refresh_without_client_auth_error(self):
610 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400611 status=http.client.BAD_REQUEST, data=self.ERROR_RESPONSE
bojeil-googled4d7f382021-02-16 12:33:20 -0800612 )
613 credentials = self.make_credentials()
614
615 with pytest.raises(exceptions.OAuthError) as excinfo:
616 credentials.refresh(request)
617
618 assert excinfo.match(
619 r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
620 )
621 assert not credentials.expired
622 assert credentials.token is None
623
624 def test_refresh_impersonation_without_client_auth_error(self):
625 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400626 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800627 data=self.SUCCESS_RESPONSE,
Tres Seaver560cf1e2021-08-03 16:35:54 -0400628 impersonation_status=http.client.BAD_REQUEST,
bojeil-googled4d7f382021-02-16 12:33:20 -0800629 impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
630 )
631 credentials = self.make_credentials(
632 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
633 scopes=self.SCOPES,
634 )
635
636 with pytest.raises(exceptions.RefreshError) as excinfo:
637 credentials.refresh(request)
638
639 assert excinfo.match(r"Unable to acquire impersonated credentials")
640 assert not credentials.expired
641 assert credentials.token is None
642
643 def test_refresh_with_client_auth_success(self):
644 headers = {
645 "Content-Type": "application/x-www-form-urlencoded",
646 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
647 }
648 request_data = {
649 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
650 "audience": self.AUDIENCE,
651 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
652 "subject_token": "subject_token_0",
653 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
654 }
655 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400656 status=http.client.OK, data=self.SUCCESS_RESPONSE
bojeil-googled4d7f382021-02-16 12:33:20 -0800657 )
658 credentials = self.make_credentials(
659 client_id=CLIENT_ID, client_secret=CLIENT_SECRET
660 )
661
662 credentials.refresh(request)
663
arithmetic1728d80c85f2021-03-08 13:35:44 -0800664 self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
bojeil-googled4d7f382021-02-16 12:33:20 -0800665 assert credentials.valid
666 assert not credentials.expired
667 assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
668
669 def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(self):
670 # Simulate service account access token expires in 2800 seconds.
671 expire_time = (
672 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
673 ).isoformat("T") + "Z"
674 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
675 # STS token exchange request/response.
676 token_response = self.SUCCESS_RESPONSE.copy()
677 token_headers = {
678 "Content-Type": "application/x-www-form-urlencoded",
679 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
680 }
681 token_request_data = {
682 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
683 "audience": self.AUDIENCE,
684 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
685 "subject_token": "subject_token_0",
686 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
687 "scope": "https://www.googleapis.com/auth/iam",
688 }
689 # Service account impersonation request/response.
690 impersonation_response = {
691 "accessToken": "SA_ACCESS_TOKEN",
692 "expireTime": expire_time,
693 }
694 impersonation_headers = {
695 "Content-Type": "application/json",
696 "authorization": "Bearer {}".format(token_response["access_token"]),
697 }
698 impersonation_request_data = {
699 "delegates": None,
700 "scope": self.SCOPES,
701 "lifetime": "3600s",
702 }
703 # Initialize mock request to handle token exchange and service account
704 # impersonation request.
705 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400706 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800707 data=token_response,
Tres Seaver560cf1e2021-08-03 16:35:54 -0400708 impersonation_status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800709 impersonation_data=impersonation_response,
710 )
711 # Initialize credentials with service account impersonation and basic auth.
712 credentials = self.make_credentials(
713 client_id=CLIENT_ID,
714 client_secret=CLIENT_SECRET,
715 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
716 scopes=self.SCOPES,
717 # Default scopes will be ignored since user scopes are specified.
718 default_scopes=["ignored"],
719 )
720
721 credentials.refresh(request)
722
723 # Only 2 requests should be processed.
724 assert len(request.call_args_list) == 2
725 # Verify token exchange request parameters.
726 self.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800727 request.call_args_list[0][1], token_headers, token_request_data
bojeil-googled4d7f382021-02-16 12:33:20 -0800728 )
729 # Verify service account impersonation request parameters.
730 self.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800731 request.call_args_list[1][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800732 impersonation_headers,
733 impersonation_request_data,
734 )
735 assert credentials.valid
736 assert credentials.expiry == expected_expiry
737 assert not credentials.expired
738 assert credentials.token == impersonation_response["accessToken"]
739
740 def test_refresh_impersonation_with_client_auth_success_use_default_scopes(self):
741 # Simulate service account access token expires in 2800 seconds.
742 expire_time = (
743 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
744 ).isoformat("T") + "Z"
745 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
746 # STS token exchange request/response.
747 token_response = self.SUCCESS_RESPONSE.copy()
748 token_headers = {
749 "Content-Type": "application/x-www-form-urlencoded",
750 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
751 }
752 token_request_data = {
753 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
754 "audience": self.AUDIENCE,
755 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
756 "subject_token": "subject_token_0",
757 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
758 "scope": "https://www.googleapis.com/auth/iam",
759 }
760 # Service account impersonation request/response.
761 impersonation_response = {
762 "accessToken": "SA_ACCESS_TOKEN",
763 "expireTime": expire_time,
764 }
765 impersonation_headers = {
766 "Content-Type": "application/json",
767 "authorization": "Bearer {}".format(token_response["access_token"]),
768 }
769 impersonation_request_data = {
770 "delegates": None,
771 "scope": self.SCOPES,
772 "lifetime": "3600s",
773 }
774 # Initialize mock request to handle token exchange and service account
775 # impersonation request.
776 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400777 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800778 data=token_response,
Tres Seaver560cf1e2021-08-03 16:35:54 -0400779 impersonation_status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800780 impersonation_data=impersonation_response,
781 )
782 # Initialize credentials with service account impersonation and basic auth.
783 credentials = self.make_credentials(
784 client_id=CLIENT_ID,
785 client_secret=CLIENT_SECRET,
786 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
787 scopes=None,
788 # Default scopes will be used since user specified scopes are none.
789 default_scopes=self.SCOPES,
790 )
791
792 credentials.refresh(request)
793
794 # Only 2 requests should be processed.
795 assert len(request.call_args_list) == 2
796 # Verify token exchange request parameters.
797 self.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800798 request.call_args_list[0][1], token_headers, token_request_data
bojeil-googled4d7f382021-02-16 12:33:20 -0800799 )
800 # Verify service account impersonation request parameters.
801 self.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800802 request.call_args_list[1][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800803 impersonation_headers,
804 impersonation_request_data,
805 )
806 assert credentials.valid
807 assert credentials.expiry == expected_expiry
808 assert not credentials.expired
809 assert credentials.token == impersonation_response["accessToken"]
810
811 def test_apply_without_quota_project_id(self):
812 headers = {}
813 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400814 status=http.client.OK, data=self.SUCCESS_RESPONSE
bojeil-googled4d7f382021-02-16 12:33:20 -0800815 )
816 credentials = self.make_credentials()
817
818 credentials.refresh(request)
819 credentials.apply(headers)
820
821 assert headers == {
822 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
823 }
824
825 def test_apply_impersonation_without_quota_project_id(self):
826 expire_time = (
827 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
828 ).isoformat("T") + "Z"
829 # Service account impersonation response.
830 impersonation_response = {
831 "accessToken": "SA_ACCESS_TOKEN",
832 "expireTime": expire_time,
833 }
834 # Initialize mock request to handle token exchange and service account
835 # impersonation request.
836 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400837 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800838 data=self.SUCCESS_RESPONSE.copy(),
Tres Seaver560cf1e2021-08-03 16:35:54 -0400839 impersonation_status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800840 impersonation_data=impersonation_response,
841 )
842 # Initialize credentials with service account impersonation.
843 credentials = self.make_credentials(
844 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
845 scopes=self.SCOPES,
846 )
847 headers = {}
848
849 credentials.refresh(request)
850 credentials.apply(headers)
851
852 assert headers == {
853 "authorization": "Bearer {}".format(impersonation_response["accessToken"])
854 }
855
856 def test_apply_with_quota_project_id(self):
857 headers = {"other": "header-value"}
858 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400859 status=http.client.OK, data=self.SUCCESS_RESPONSE
bojeil-googled4d7f382021-02-16 12:33:20 -0800860 )
861 credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
862
863 credentials.refresh(request)
864 credentials.apply(headers)
865
866 assert headers == {
867 "other": "header-value",
868 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
869 "x-goog-user-project": self.QUOTA_PROJECT_ID,
870 }
871
872 def test_apply_impersonation_with_quota_project_id(self):
873 expire_time = (
874 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
875 ).isoformat("T") + "Z"
876 # Service account impersonation response.
877 impersonation_response = {
878 "accessToken": "SA_ACCESS_TOKEN",
879 "expireTime": expire_time,
880 }
881 # Initialize mock request to handle token exchange and service account
882 # impersonation request.
883 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400884 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800885 data=self.SUCCESS_RESPONSE.copy(),
Tres Seaver560cf1e2021-08-03 16:35:54 -0400886 impersonation_status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800887 impersonation_data=impersonation_response,
888 )
889 # Initialize credentials with service account impersonation.
890 credentials = self.make_credentials(
891 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
892 scopes=self.SCOPES,
893 quota_project_id=self.QUOTA_PROJECT_ID,
894 )
895 headers = {"other": "header-value"}
896
897 credentials.refresh(request)
898 credentials.apply(headers)
899
900 assert headers == {
901 "other": "header-value",
902 "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
903 "x-goog-user-project": self.QUOTA_PROJECT_ID,
904 }
905
906 def test_before_request(self):
907 headers = {"other": "header-value"}
908 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400909 status=http.client.OK, data=self.SUCCESS_RESPONSE
bojeil-googled4d7f382021-02-16 12:33:20 -0800910 )
911 credentials = self.make_credentials()
912
913 # First call should call refresh, setting the token.
914 credentials.before_request(request, "POST", "https://example.com/api", headers)
915
916 assert headers == {
917 "other": "header-value",
918 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
919 }
920
921 # Second call shouldn't call refresh.
922 credentials.before_request(request, "POST", "https://example.com/api", headers)
923
924 assert headers == {
925 "other": "header-value",
926 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
927 }
928
929 def test_before_request_impersonation(self):
930 expire_time = (
931 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
932 ).isoformat("T") + "Z"
933 # Service account impersonation response.
934 impersonation_response = {
935 "accessToken": "SA_ACCESS_TOKEN",
936 "expireTime": expire_time,
937 }
938 # Initialize mock request to handle token exchange and service account
939 # impersonation request.
940 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400941 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800942 data=self.SUCCESS_RESPONSE.copy(),
Tres Seaver560cf1e2021-08-03 16:35:54 -0400943 impersonation_status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -0800944 impersonation_data=impersonation_response,
945 )
946 headers = {"other": "header-value"}
947 credentials = self.make_credentials(
948 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
949 )
950
951 # First call should call refresh, setting the token.
952 credentials.before_request(request, "POST", "https://example.com/api", headers)
953
954 assert headers == {
955 "other": "header-value",
956 "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
957 }
958
959 # Second call shouldn't call refresh.
960 credentials.before_request(request, "POST", "https://example.com/api", headers)
961
962 assert headers == {
963 "other": "header-value",
964 "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
965 }
966
967 @mock.patch("google.auth._helpers.utcnow")
968 def test_before_request_expired(self, utcnow):
969 headers = {}
970 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -0400971 status=http.client.OK, data=self.SUCCESS_RESPONSE
bojeil-googled4d7f382021-02-16 12:33:20 -0800972 )
973 credentials = self.make_credentials()
974 credentials.token = "token"
975 utcnow.return_value = datetime.datetime.min
976 # Set the expiration to one second more than now plus the clock skew
977 # accomodation. These credentials should be valid.
978 credentials.expiry = (
979 datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
980 )
981
982 assert credentials.valid
983 assert not credentials.expired
984
985 credentials.before_request(request, "POST", "https://example.com/api", headers)
986
987 # Cached token should be used.
988 assert headers == {"authorization": "Bearer token"}
989
990 # Next call should simulate 1 second passed.
991 utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
992
993 assert not credentials.valid
994 assert credentials.expired
995
996 credentials.before_request(request, "POST", "https://example.com/api", headers)
997
998 # New token should be retrieved.
999 assert headers == {
1000 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
1001 }
1002
1003 @mock.patch("google.auth._helpers.utcnow")
1004 def test_before_request_impersonation_expired(self, utcnow):
1005 headers = {}
1006 expire_time = (
1007 datetime.datetime.min + datetime.timedelta(seconds=3601)
1008 ).isoformat("T") + "Z"
1009 # Service account impersonation response.
1010 impersonation_response = {
1011 "accessToken": "SA_ACCESS_TOKEN",
1012 "expireTime": expire_time,
1013 }
1014 # Initialize mock request to handle token exchange and service account
1015 # impersonation request.
1016 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -04001017 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -08001018 data=self.SUCCESS_RESPONSE.copy(),
Tres Seaver560cf1e2021-08-03 16:35:54 -04001019 impersonation_status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -08001020 impersonation_data=impersonation_response,
1021 )
1022 credentials = self.make_credentials(
1023 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
1024 )
1025 credentials.token = "token"
1026 utcnow.return_value = datetime.datetime.min
1027 # Set the expiration to one second more than now plus the clock skew
1028 # accomodation. These credentials should be valid.
1029 credentials.expiry = (
1030 datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
1031 )
1032
1033 assert credentials.valid
1034 assert not credentials.expired
1035
1036 credentials.before_request(request, "POST", "https://example.com/api", headers)
1037
1038 # Cached token should be used.
1039 assert headers == {"authorization": "Bearer token"}
1040
1041 # Next call should simulate 1 second passed. This will trigger the expiration
1042 # threshold.
1043 utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
1044
1045 assert not credentials.valid
1046 assert credentials.expired
1047
1048 credentials.before_request(request, "POST", "https://example.com/api", headers)
1049
1050 # New token should be retrieved.
1051 assert headers == {
1052 "authorization": "Bearer {}".format(impersonation_response["accessToken"])
1053 }
1054
1055 @pytest.mark.parametrize(
1056 "audience",
1057 [
1058 # Legacy K8s audience format.
1059 "identitynamespace:1f12345:my_provider",
1060 # Unrealistic audiences.
1061 "//iam.googleapis.com/projects",
1062 "//iam.googleapis.com/projects/",
1063 "//iam.googleapis.com/project/123456",
1064 "//iam.googleapis.com/projects//123456",
1065 "//iam.googleapis.com/prefix_projects/123456",
1066 "//iam.googleapis.com/projects_suffix/123456",
1067 ],
1068 )
1069 def test_project_number_indeterminable(self, audience):
1070 credentials = CredentialsImpl(
1071 audience=audience,
1072 subject_token_type=self.SUBJECT_TOKEN_TYPE,
1073 token_url=self.TOKEN_URL,
1074 credential_source=self.CREDENTIAL_SOURCE,
1075 )
1076
1077 assert credentials.project_number is None
1078 assert credentials.get_project_id(None) is None
1079
1080 def test_project_number_determinable(self):
1081 credentials = CredentialsImpl(
1082 audience=self.AUDIENCE,
1083 subject_token_type=self.SUBJECT_TOKEN_TYPE,
1084 token_url=self.TOKEN_URL,
1085 credential_source=self.CREDENTIAL_SOURCE,
1086 )
1087
1088 assert credentials.project_number == self.PROJECT_NUMBER
1089
1090 def test_project_id_without_scopes(self):
1091 # Initialize credentials with no scopes.
1092 credentials = CredentialsImpl(
1093 audience=self.AUDIENCE,
1094 subject_token_type=self.SUBJECT_TOKEN_TYPE,
1095 token_url=self.TOKEN_URL,
1096 credential_source=self.CREDENTIAL_SOURCE,
1097 )
1098
1099 assert credentials.get_project_id(None) is None
1100
1101 def test_get_project_id_cloud_resource_manager_success(self):
1102 # STS token exchange request/response.
1103 token_response = self.SUCCESS_RESPONSE.copy()
1104 token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
1105 token_request_data = {
1106 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
1107 "audience": self.AUDIENCE,
1108 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
1109 "subject_token": "subject_token_0",
1110 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
1111 "scope": "https://www.googleapis.com/auth/iam",
1112 }
1113 # Service account impersonation request/response.
1114 expire_time = (
1115 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
1116 ).isoformat("T") + "Z"
1117 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
1118 impersonation_response = {
1119 "accessToken": "SA_ACCESS_TOKEN",
1120 "expireTime": expire_time,
1121 }
1122 impersonation_headers = {
1123 "Content-Type": "application/json",
1124 "x-goog-user-project": self.QUOTA_PROJECT_ID,
1125 "authorization": "Bearer {}".format(token_response["access_token"]),
1126 }
1127 impersonation_request_data = {
1128 "delegates": None,
1129 "scope": self.SCOPES,
1130 "lifetime": "3600s",
1131 }
1132 # Initialize mock request to handle token exchange, service account
1133 # impersonation and cloud resource manager request.
1134 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -04001135 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -08001136 data=self.SUCCESS_RESPONSE.copy(),
Tres Seaver560cf1e2021-08-03 16:35:54 -04001137 impersonation_status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -08001138 impersonation_data=impersonation_response,
Tres Seaver560cf1e2021-08-03 16:35:54 -04001139 cloud_resource_manager_status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -08001140 cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
1141 )
1142 credentials = self.make_credentials(
1143 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
1144 scopes=self.SCOPES,
1145 quota_project_id=self.QUOTA_PROJECT_ID,
1146 )
1147
1148 # Expected project ID from cloud resource manager response should be returned.
1149 project_id = credentials.get_project_id(request)
1150
1151 assert project_id == self.PROJECT_ID
1152 # 3 requests should be processed.
1153 assert len(request.call_args_list) == 3
1154 # Verify token exchange request parameters.
1155 self.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -08001156 request.call_args_list[0][1], token_headers, token_request_data
bojeil-googled4d7f382021-02-16 12:33:20 -08001157 )
1158 # Verify service account impersonation request parameters.
1159 self.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -08001160 request.call_args_list[1][1],
bojeil-googled4d7f382021-02-16 12:33:20 -08001161 impersonation_headers,
1162 impersonation_request_data,
1163 )
1164 # In the process of getting project ID, an access token should be
1165 # retrieved.
1166 assert credentials.valid
1167 assert credentials.expiry == expected_expiry
1168 assert not credentials.expired
1169 assert credentials.token == impersonation_response["accessToken"]
1170 # Verify cloud resource manager request parameters.
1171 self.assert_resource_manager_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -08001172 request.call_args_list[2][1],
bojeil-googled4d7f382021-02-16 12:33:20 -08001173 self.PROJECT_NUMBER,
1174 {
1175 "x-goog-user-project": self.QUOTA_PROJECT_ID,
1176 "authorization": "Bearer {}".format(
1177 impersonation_response["accessToken"]
1178 ),
1179 },
1180 )
1181
1182 # Calling get_project_id again should return the cached project_id.
1183 project_id = credentials.get_project_id(request)
1184
1185 assert project_id == self.PROJECT_ID
1186 # No additional requests.
1187 assert len(request.call_args_list) == 3
1188
1189 def test_get_project_id_cloud_resource_manager_error(self):
1190 # Simulate resource doesn't have sufficient permissions to access
1191 # cloud resource manager.
1192 request = self.make_mock_request(
Tres Seaver560cf1e2021-08-03 16:35:54 -04001193 status=http.client.OK,
bojeil-googled4d7f382021-02-16 12:33:20 -08001194 data=self.SUCCESS_RESPONSE.copy(),
Tres Seaver560cf1e2021-08-03 16:35:54 -04001195 cloud_resource_manager_status=http.client.UNAUTHORIZED,
bojeil-googled4d7f382021-02-16 12:33:20 -08001196 )
1197 credentials = self.make_credentials(scopes=self.SCOPES)
1198
1199 project_id = credentials.get_project_id(request)
1200
1201 assert project_id is None
1202 # Only 2 requests to STS and cloud resource manager should be sent.
1203 assert len(request.call_args_list) == 2