blob: 42e53ecb5bf67a77eb7eefd22942a06f0e449777 [file] [log] [blame]
bojeil-googled4d7f382021-02-16 12:33:20 -08001# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17
18import mock
19import pytest
20from six.moves import http_client
21from six.moves import urllib
22
23from google.auth import _helpers
24from google.auth import exceptions
25from google.auth import external_account
26from google.auth import transport
27
28
29CLIENT_ID = "username"
30CLIENT_SECRET = "password"
31# Base64 encoding of "username:password"
32BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
33SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
34
35
36class CredentialsImpl(external_account.Credentials):
37 def __init__(
38 self,
39 audience,
40 subject_token_type,
41 token_url,
42 credential_source,
43 service_account_impersonation_url=None,
44 client_id=None,
45 client_secret=None,
46 quota_project_id=None,
47 scopes=None,
48 default_scopes=None,
49 ):
50 super(CredentialsImpl, self).__init__(
51 audience=audience,
52 subject_token_type=subject_token_type,
53 token_url=token_url,
54 credential_source=credential_source,
55 service_account_impersonation_url=service_account_impersonation_url,
56 client_id=client_id,
57 client_secret=client_secret,
58 quota_project_id=quota_project_id,
59 scopes=scopes,
60 default_scopes=default_scopes,
61 )
62 self._counter = 0
63
64 def retrieve_subject_token(self, request):
65 counter = self._counter
66 self._counter += 1
67 return "subject_token_{}".format(counter)
68
69
70class TestCredentials(object):
71 TOKEN_URL = "https://sts.googleapis.com/v1/token"
72 PROJECT_NUMBER = "123456"
73 POOL_ID = "POOL_ID"
74 PROVIDER_ID = "PROVIDER_ID"
75 AUDIENCE = (
76 "//iam.googleapis.com/projects/{}"
77 "/locations/global/workloadIdentityPools/{}"
78 "/providers/{}"
79 ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
80 SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
81 CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
82 SUCCESS_RESPONSE = {
83 "access_token": "ACCESS_TOKEN",
84 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
85 "token_type": "Bearer",
86 "expires_in": 3600,
87 "scope": "scope1 scope2",
88 }
89 ERROR_RESPONSE = {
90 "error": "invalid_request",
91 "error_description": "Invalid subject token",
92 "error_uri": "https://tools.ietf.org/html/rfc6749",
93 }
94 QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
95 SERVICE_ACCOUNT_IMPERSONATION_URL = (
96 "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
97 + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
98 )
99 SCOPES = ["scope1", "scope2"]
100 IMPERSONATION_ERROR_RESPONSE = {
101 "error": {
102 "code": 400,
103 "message": "Request contains an invalid argument",
104 "status": "INVALID_ARGUMENT",
105 }
106 }
107 PROJECT_ID = "my-proj-id"
108 CLOUD_RESOURCE_MANAGER_URL = (
109 "https://cloudresourcemanager.googleapis.com/v1/projects/"
110 )
111 CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
112 "projectNumber": PROJECT_NUMBER,
113 "projectId": PROJECT_ID,
114 "lifecycleState": "ACTIVE",
115 "name": "project-name",
116 "createTime": "2018-11-06T04:42:54.109Z",
117 "parent": {"type": "folder", "id": "12345678901"},
118 }
119
120 @classmethod
121 def make_credentials(
122 cls,
123 client_id=None,
124 client_secret=None,
125 quota_project_id=None,
126 scopes=None,
127 default_scopes=None,
128 service_account_impersonation_url=None,
129 ):
130 return CredentialsImpl(
131 audience=cls.AUDIENCE,
132 subject_token_type=cls.SUBJECT_TOKEN_TYPE,
133 token_url=cls.TOKEN_URL,
134 service_account_impersonation_url=service_account_impersonation_url,
135 credential_source=cls.CREDENTIAL_SOURCE,
136 client_id=client_id,
137 client_secret=client_secret,
138 quota_project_id=quota_project_id,
139 scopes=scopes,
140 default_scopes=default_scopes,
141 )
142
143 @classmethod
144 def make_mock_request(
145 cls,
146 status=http_client.OK,
147 data=None,
148 impersonation_status=None,
149 impersonation_data=None,
150 cloud_resource_manager_status=None,
151 cloud_resource_manager_data=None,
152 ):
153 # STS token exchange request.
154 token_response = mock.create_autospec(transport.Response, instance=True)
155 token_response.status = status
156 token_response.data = json.dumps(data).encode("utf-8")
157 responses = [token_response]
158
159 # If service account impersonation is requested, mock the expected response.
160 if impersonation_status:
161 impersonation_response = mock.create_autospec(
162 transport.Response, instance=True
163 )
164 impersonation_response.status = impersonation_status
165 impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
166 responses.append(impersonation_response)
167
168 # If cloud resource manager is requested, mock the expected response.
169 if cloud_resource_manager_status:
170 cloud_resource_manager_response = mock.create_autospec(
171 transport.Response, instance=True
172 )
173 cloud_resource_manager_response.status = cloud_resource_manager_status
174 cloud_resource_manager_response.data = json.dumps(
175 cloud_resource_manager_data
176 ).encode("utf-8")
177 responses.append(cloud_resource_manager_response)
178
179 request = mock.create_autospec(transport.Request)
180 request.side_effect = responses
181
182 return request
183
184 @classmethod
185 def assert_token_request_kwargs(cls, request_kwargs, headers, request_data):
186 assert request_kwargs["url"] == cls.TOKEN_URL
187 assert request_kwargs["method"] == "POST"
188 assert request_kwargs["headers"] == headers
189 assert request_kwargs["body"] is not None
190 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
191 for (k, v) in body_tuples:
192 assert v.decode("utf-8") == request_data[k.decode("utf-8")]
193 assert len(body_tuples) == len(request_data.keys())
194
195 @classmethod
196 def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data):
197 assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
198 assert request_kwargs["method"] == "POST"
199 assert request_kwargs["headers"] == headers
200 assert request_kwargs["body"] is not None
201 body_json = json.loads(request_kwargs["body"].decode("utf-8"))
202 assert body_json == request_data
203
204 @classmethod
205 def assert_resource_manager_request_kwargs(
206 cls, request_kwargs, project_number, headers
207 ):
208 assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
209 assert request_kwargs["method"] == "GET"
210 assert request_kwargs["headers"] == headers
211 assert "body" not in request_kwargs
212
213 def test_default_state(self):
214 credentials = self.make_credentials()
215
216 # Not token acquired yet
217 assert not credentials.token
218 assert not credentials.valid
219 # Expiration hasn't been set yet
220 assert not credentials.expiry
221 assert not credentials.expired
222 # Scopes are required
223 assert not credentials.scopes
224 assert credentials.requires_scopes
225 assert not credentials.quota_project_id
226
227 def test_with_scopes(self):
228 credentials = self.make_credentials()
229
230 assert not credentials.scopes
231 assert credentials.requires_scopes
232
233 scoped_credentials = credentials.with_scopes(["email"])
234
235 assert scoped_credentials.has_scopes(["email"])
236 assert not scoped_credentials.requires_scopes
237
238 def test_with_scopes_using_user_and_default_scopes(self):
239 credentials = self.make_credentials()
240
241 assert not credentials.scopes
242 assert credentials.requires_scopes
243
244 scoped_credentials = credentials.with_scopes(
245 ["email"], default_scopes=["profile"]
246 )
247
248 assert scoped_credentials.has_scopes(["email"])
249 assert not scoped_credentials.has_scopes(["profile"])
250 assert not scoped_credentials.requires_scopes
251 assert scoped_credentials.scopes == ["email"]
252 assert scoped_credentials.default_scopes == ["profile"]
253
254 def test_with_scopes_using_default_scopes_only(self):
255 credentials = self.make_credentials()
256
257 assert not credentials.scopes
258 assert credentials.requires_scopes
259
260 scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
261
262 assert scoped_credentials.has_scopes(["profile"])
263 assert not scoped_credentials.requires_scopes
264
265 def test_with_scopes_full_options_propagated(self):
266 credentials = self.make_credentials(
267 client_id=CLIENT_ID,
268 client_secret=CLIENT_SECRET,
269 quota_project_id=self.QUOTA_PROJECT_ID,
270 scopes=self.SCOPES,
271 default_scopes=["default1"],
272 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
273 )
274
275 with mock.patch.object(
276 external_account.Credentials, "__init__", return_value=None
277 ) as mock_init:
278 credentials.with_scopes(["email"], ["default2"])
279
280 # Confirm with_scopes initialized the credential with the expected
281 # parameters and scopes.
282 mock_init.assert_called_once_with(
283 audience=self.AUDIENCE,
284 subject_token_type=self.SUBJECT_TOKEN_TYPE,
285 token_url=self.TOKEN_URL,
286 credential_source=self.CREDENTIAL_SOURCE,
287 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
288 client_id=CLIENT_ID,
289 client_secret=CLIENT_SECRET,
290 quota_project_id=self.QUOTA_PROJECT_ID,
291 scopes=["email"],
292 default_scopes=["default2"],
293 )
294
295 def test_with_quota_project(self):
296 credentials = self.make_credentials()
297
298 assert not credentials.scopes
299 assert not credentials.quota_project_id
300
301 quota_project_creds = credentials.with_quota_project("project-foo")
302
303 assert quota_project_creds.quota_project_id == "project-foo"
304
305 def test_with_quota_project_full_options_propagated(self):
306 credentials = self.make_credentials(
307 client_id=CLIENT_ID,
308 client_secret=CLIENT_SECRET,
309 quota_project_id=self.QUOTA_PROJECT_ID,
310 scopes=self.SCOPES,
311 default_scopes=["default1"],
312 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
313 )
314
315 with mock.patch.object(
316 external_account.Credentials, "__init__", return_value=None
317 ) as mock_init:
318 credentials.with_quota_project("project-foo")
319
320 # Confirm with_quota_project initialized the credential with the
321 # expected parameters and quota project ID.
322 mock_init.assert_called_once_with(
323 audience=self.AUDIENCE,
324 subject_token_type=self.SUBJECT_TOKEN_TYPE,
325 token_url=self.TOKEN_URL,
326 credential_source=self.CREDENTIAL_SOURCE,
327 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
328 client_id=CLIENT_ID,
329 client_secret=CLIENT_SECRET,
330 quota_project_id="project-foo",
331 scopes=self.SCOPES,
332 default_scopes=["default1"],
333 )
334
335 def test_with_invalid_impersonation_target_principal(self):
336 invalid_url = "https://iamcredentials.googleapis.com/v1/invalid"
337
338 with pytest.raises(exceptions.RefreshError) as excinfo:
339 self.make_credentials(service_account_impersonation_url=invalid_url)
340
341 assert excinfo.match(
342 r"Unable to determine target principal from service account impersonation URL."
343 )
344
345 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
346 def test_refresh_without_client_auth_success(self, unused_utcnow):
347 response = self.SUCCESS_RESPONSE.copy()
348 # Test custom expiration to confirm expiry is set correctly.
349 response["expires_in"] = 2800
350 expected_expiry = datetime.datetime.min + datetime.timedelta(
351 seconds=response["expires_in"]
352 )
353 headers = {"Content-Type": "application/x-www-form-urlencoded"}
354 request_data = {
355 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
356 "audience": self.AUDIENCE,
357 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
358 "subject_token": "subject_token_0",
359 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
360 }
361 request = self.make_mock_request(status=http_client.OK, data=response)
362 credentials = self.make_credentials()
363
364 credentials.refresh(request)
365
366 self.assert_token_request_kwargs(
367 request.call_args.kwargs, headers, request_data
368 )
369 assert credentials.valid
370 assert credentials.expiry == expected_expiry
371 assert not credentials.expired
372 assert credentials.token == response["access_token"]
373
374 def test_refresh_impersonation_without_client_auth_success(self):
375 # Simulate service account access token expires in 2800 seconds.
376 expire_time = (
377 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
378 ).isoformat("T") + "Z"
379 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
380 # STS token exchange request/response.
381 token_response = self.SUCCESS_RESPONSE.copy()
382 token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
383 token_request_data = {
384 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
385 "audience": self.AUDIENCE,
386 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
387 "subject_token": "subject_token_0",
388 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
389 "scope": "https://www.googleapis.com/auth/iam",
390 }
391 # Service account impersonation request/response.
392 impersonation_response = {
393 "accessToken": "SA_ACCESS_TOKEN",
394 "expireTime": expire_time,
395 }
396 impersonation_headers = {
397 "Content-Type": "application/json",
398 "authorization": "Bearer {}".format(token_response["access_token"]),
399 }
400 impersonation_request_data = {
401 "delegates": None,
402 "scope": self.SCOPES,
403 "lifetime": "3600s",
404 }
405 # Initialize mock request to handle token exchange and service account
406 # impersonation request.
407 request = self.make_mock_request(
408 status=http_client.OK,
409 data=token_response,
410 impersonation_status=http_client.OK,
411 impersonation_data=impersonation_response,
412 )
413 # Initialize credentials with service account impersonation.
414 credentials = self.make_credentials(
415 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
416 scopes=self.SCOPES,
417 )
418
419 credentials.refresh(request)
420
421 # Only 2 requests should be processed.
422 assert len(request.call_args_list) == 2
423 # Verify token exchange request parameters.
424 self.assert_token_request_kwargs(
425 request.call_args_list[0].kwargs, token_headers, token_request_data
426 )
427 # Verify service account impersonation request parameters.
428 self.assert_impersonation_request_kwargs(
429 request.call_args_list[1].kwargs,
430 impersonation_headers,
431 impersonation_request_data,
432 )
433 assert credentials.valid
434 assert credentials.expiry == expected_expiry
435 assert not credentials.expired
436 assert credentials.token == impersonation_response["accessToken"]
437
438 def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
439 self
440 ):
441 headers = {"Content-Type": "application/x-www-form-urlencoded"}
442 request_data = {
443 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
444 "audience": self.AUDIENCE,
445 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
446 "scope": "scope1 scope2",
447 "subject_token": "subject_token_0",
448 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
449 }
450 request = self.make_mock_request(
451 status=http_client.OK, data=self.SUCCESS_RESPONSE
452 )
453 credentials = self.make_credentials(
454 scopes=["scope1", "scope2"],
455 # Default scopes will be ignored in favor of user scopes.
456 default_scopes=["ignored"],
457 )
458
459 credentials.refresh(request)
460
461 self.assert_token_request_kwargs(
462 request.call_args.kwargs, headers, request_data
463 )
464 assert credentials.valid
465 assert not credentials.expired
466 assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
467 assert credentials.has_scopes(["scope1", "scope2"])
468 assert not credentials.has_scopes(["ignored"])
469
470 def test_refresh_without_client_auth_success_explicit_default_scopes_only(self):
471 headers = {"Content-Type": "application/x-www-form-urlencoded"}
472 request_data = {
473 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
474 "audience": self.AUDIENCE,
475 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
476 "scope": "scope1 scope2",
477 "subject_token": "subject_token_0",
478 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
479 }
480 request = self.make_mock_request(
481 status=http_client.OK, data=self.SUCCESS_RESPONSE
482 )
483 credentials = self.make_credentials(
484 scopes=None,
485 # Default scopes will be used since user scopes are none.
486 default_scopes=["scope1", "scope2"],
487 )
488
489 credentials.refresh(request)
490
491 self.assert_token_request_kwargs(
492 request.call_args.kwargs, headers, request_data
493 )
494 assert credentials.valid
495 assert not credentials.expired
496 assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
497 assert credentials.has_scopes(["scope1", "scope2"])
498
499 def test_refresh_without_client_auth_error(self):
500 request = self.make_mock_request(
501 status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
502 )
503 credentials = self.make_credentials()
504
505 with pytest.raises(exceptions.OAuthError) as excinfo:
506 credentials.refresh(request)
507
508 assert excinfo.match(
509 r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
510 )
511 assert not credentials.expired
512 assert credentials.token is None
513
514 def test_refresh_impersonation_without_client_auth_error(self):
515 request = self.make_mock_request(
516 status=http_client.OK,
517 data=self.SUCCESS_RESPONSE,
518 impersonation_status=http_client.BAD_REQUEST,
519 impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
520 )
521 credentials = self.make_credentials(
522 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
523 scopes=self.SCOPES,
524 )
525
526 with pytest.raises(exceptions.RefreshError) as excinfo:
527 credentials.refresh(request)
528
529 assert excinfo.match(r"Unable to acquire impersonated credentials")
530 assert not credentials.expired
531 assert credentials.token is None
532
533 def test_refresh_with_client_auth_success(self):
534 headers = {
535 "Content-Type": "application/x-www-form-urlencoded",
536 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
537 }
538 request_data = {
539 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
540 "audience": self.AUDIENCE,
541 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
542 "subject_token": "subject_token_0",
543 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
544 }
545 request = self.make_mock_request(
546 status=http_client.OK, data=self.SUCCESS_RESPONSE
547 )
548 credentials = self.make_credentials(
549 client_id=CLIENT_ID, client_secret=CLIENT_SECRET
550 )
551
552 credentials.refresh(request)
553
554 self.assert_token_request_kwargs(
555 request.call_args.kwargs, headers, request_data
556 )
557 assert credentials.valid
558 assert not credentials.expired
559 assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
560
561 def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(self):
562 # Simulate service account access token expires in 2800 seconds.
563 expire_time = (
564 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
565 ).isoformat("T") + "Z"
566 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
567 # STS token exchange request/response.
568 token_response = self.SUCCESS_RESPONSE.copy()
569 token_headers = {
570 "Content-Type": "application/x-www-form-urlencoded",
571 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
572 }
573 token_request_data = {
574 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
575 "audience": self.AUDIENCE,
576 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
577 "subject_token": "subject_token_0",
578 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
579 "scope": "https://www.googleapis.com/auth/iam",
580 }
581 # Service account impersonation request/response.
582 impersonation_response = {
583 "accessToken": "SA_ACCESS_TOKEN",
584 "expireTime": expire_time,
585 }
586 impersonation_headers = {
587 "Content-Type": "application/json",
588 "authorization": "Bearer {}".format(token_response["access_token"]),
589 }
590 impersonation_request_data = {
591 "delegates": None,
592 "scope": self.SCOPES,
593 "lifetime": "3600s",
594 }
595 # Initialize mock request to handle token exchange and service account
596 # impersonation request.
597 request = self.make_mock_request(
598 status=http_client.OK,
599 data=token_response,
600 impersonation_status=http_client.OK,
601 impersonation_data=impersonation_response,
602 )
603 # Initialize credentials with service account impersonation and basic auth.
604 credentials = self.make_credentials(
605 client_id=CLIENT_ID,
606 client_secret=CLIENT_SECRET,
607 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
608 scopes=self.SCOPES,
609 # Default scopes will be ignored since user scopes are specified.
610 default_scopes=["ignored"],
611 )
612
613 credentials.refresh(request)
614
615 # Only 2 requests should be processed.
616 assert len(request.call_args_list) == 2
617 # Verify token exchange request parameters.
618 self.assert_token_request_kwargs(
619 request.call_args_list[0].kwargs, token_headers, token_request_data
620 )
621 # Verify service account impersonation request parameters.
622 self.assert_impersonation_request_kwargs(
623 request.call_args_list[1].kwargs,
624 impersonation_headers,
625 impersonation_request_data,
626 )
627 assert credentials.valid
628 assert credentials.expiry == expected_expiry
629 assert not credentials.expired
630 assert credentials.token == impersonation_response["accessToken"]
631
632 def test_refresh_impersonation_with_client_auth_success_use_default_scopes(self):
633 # Simulate service account access token expires in 2800 seconds.
634 expire_time = (
635 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
636 ).isoformat("T") + "Z"
637 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
638 # STS token exchange request/response.
639 token_response = self.SUCCESS_RESPONSE.copy()
640 token_headers = {
641 "Content-Type": "application/x-www-form-urlencoded",
642 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
643 }
644 token_request_data = {
645 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
646 "audience": self.AUDIENCE,
647 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
648 "subject_token": "subject_token_0",
649 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
650 "scope": "https://www.googleapis.com/auth/iam",
651 }
652 # Service account impersonation request/response.
653 impersonation_response = {
654 "accessToken": "SA_ACCESS_TOKEN",
655 "expireTime": expire_time,
656 }
657 impersonation_headers = {
658 "Content-Type": "application/json",
659 "authorization": "Bearer {}".format(token_response["access_token"]),
660 }
661 impersonation_request_data = {
662 "delegates": None,
663 "scope": self.SCOPES,
664 "lifetime": "3600s",
665 }
666 # Initialize mock request to handle token exchange and service account
667 # impersonation request.
668 request = self.make_mock_request(
669 status=http_client.OK,
670 data=token_response,
671 impersonation_status=http_client.OK,
672 impersonation_data=impersonation_response,
673 )
674 # Initialize credentials with service account impersonation and basic auth.
675 credentials = self.make_credentials(
676 client_id=CLIENT_ID,
677 client_secret=CLIENT_SECRET,
678 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
679 scopes=None,
680 # Default scopes will be used since user specified scopes are none.
681 default_scopes=self.SCOPES,
682 )
683
684 credentials.refresh(request)
685
686 # Only 2 requests should be processed.
687 assert len(request.call_args_list) == 2
688 # Verify token exchange request parameters.
689 self.assert_token_request_kwargs(
690 request.call_args_list[0].kwargs, token_headers, token_request_data
691 )
692 # Verify service account impersonation request parameters.
693 self.assert_impersonation_request_kwargs(
694 request.call_args_list[1].kwargs,
695 impersonation_headers,
696 impersonation_request_data,
697 )
698 assert credentials.valid
699 assert credentials.expiry == expected_expiry
700 assert not credentials.expired
701 assert credentials.token == impersonation_response["accessToken"]
702
703 def test_apply_without_quota_project_id(self):
704 headers = {}
705 request = self.make_mock_request(
706 status=http_client.OK, data=self.SUCCESS_RESPONSE
707 )
708 credentials = self.make_credentials()
709
710 credentials.refresh(request)
711 credentials.apply(headers)
712
713 assert headers == {
714 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
715 }
716
717 def test_apply_impersonation_without_quota_project_id(self):
718 expire_time = (
719 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
720 ).isoformat("T") + "Z"
721 # Service account impersonation response.
722 impersonation_response = {
723 "accessToken": "SA_ACCESS_TOKEN",
724 "expireTime": expire_time,
725 }
726 # Initialize mock request to handle token exchange and service account
727 # impersonation request.
728 request = self.make_mock_request(
729 status=http_client.OK,
730 data=self.SUCCESS_RESPONSE.copy(),
731 impersonation_status=http_client.OK,
732 impersonation_data=impersonation_response,
733 )
734 # Initialize credentials with service account impersonation.
735 credentials = self.make_credentials(
736 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
737 scopes=self.SCOPES,
738 )
739 headers = {}
740
741 credentials.refresh(request)
742 credentials.apply(headers)
743
744 assert headers == {
745 "authorization": "Bearer {}".format(impersonation_response["accessToken"])
746 }
747
748 def test_apply_with_quota_project_id(self):
749 headers = {"other": "header-value"}
750 request = self.make_mock_request(
751 status=http_client.OK, data=self.SUCCESS_RESPONSE
752 )
753 credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
754
755 credentials.refresh(request)
756 credentials.apply(headers)
757
758 assert headers == {
759 "other": "header-value",
760 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
761 "x-goog-user-project": self.QUOTA_PROJECT_ID,
762 }
763
764 def test_apply_impersonation_with_quota_project_id(self):
765 expire_time = (
766 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
767 ).isoformat("T") + "Z"
768 # Service account impersonation response.
769 impersonation_response = {
770 "accessToken": "SA_ACCESS_TOKEN",
771 "expireTime": expire_time,
772 }
773 # Initialize mock request to handle token exchange and service account
774 # impersonation request.
775 request = self.make_mock_request(
776 status=http_client.OK,
777 data=self.SUCCESS_RESPONSE.copy(),
778 impersonation_status=http_client.OK,
779 impersonation_data=impersonation_response,
780 )
781 # Initialize credentials with service account impersonation.
782 credentials = self.make_credentials(
783 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
784 scopes=self.SCOPES,
785 quota_project_id=self.QUOTA_PROJECT_ID,
786 )
787 headers = {"other": "header-value"}
788
789 credentials.refresh(request)
790 credentials.apply(headers)
791
792 assert headers == {
793 "other": "header-value",
794 "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
795 "x-goog-user-project": self.QUOTA_PROJECT_ID,
796 }
797
798 def test_before_request(self):
799 headers = {"other": "header-value"}
800 request = self.make_mock_request(
801 status=http_client.OK, data=self.SUCCESS_RESPONSE
802 )
803 credentials = self.make_credentials()
804
805 # First call should call refresh, setting the token.
806 credentials.before_request(request, "POST", "https://example.com/api", headers)
807
808 assert headers == {
809 "other": "header-value",
810 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
811 }
812
813 # Second call shouldn't call refresh.
814 credentials.before_request(request, "POST", "https://example.com/api", headers)
815
816 assert headers == {
817 "other": "header-value",
818 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
819 }
820
821 def test_before_request_impersonation(self):
822 expire_time = (
823 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
824 ).isoformat("T") + "Z"
825 # Service account impersonation response.
826 impersonation_response = {
827 "accessToken": "SA_ACCESS_TOKEN",
828 "expireTime": expire_time,
829 }
830 # Initialize mock request to handle token exchange and service account
831 # impersonation request.
832 request = self.make_mock_request(
833 status=http_client.OK,
834 data=self.SUCCESS_RESPONSE.copy(),
835 impersonation_status=http_client.OK,
836 impersonation_data=impersonation_response,
837 )
838 headers = {"other": "header-value"}
839 credentials = self.make_credentials(
840 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
841 )
842
843 # First call should call refresh, setting the token.
844 credentials.before_request(request, "POST", "https://example.com/api", headers)
845
846 assert headers == {
847 "other": "header-value",
848 "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
849 }
850
851 # Second call shouldn't call refresh.
852 credentials.before_request(request, "POST", "https://example.com/api", headers)
853
854 assert headers == {
855 "other": "header-value",
856 "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
857 }
858
859 @mock.patch("google.auth._helpers.utcnow")
860 def test_before_request_expired(self, utcnow):
861 headers = {}
862 request = self.make_mock_request(
863 status=http_client.OK, data=self.SUCCESS_RESPONSE
864 )
865 credentials = self.make_credentials()
866 credentials.token = "token"
867 utcnow.return_value = datetime.datetime.min
868 # Set the expiration to one second more than now plus the clock skew
869 # accomodation. These credentials should be valid.
870 credentials.expiry = (
871 datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
872 )
873
874 assert credentials.valid
875 assert not credentials.expired
876
877 credentials.before_request(request, "POST", "https://example.com/api", headers)
878
879 # Cached token should be used.
880 assert headers == {"authorization": "Bearer token"}
881
882 # Next call should simulate 1 second passed.
883 utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
884
885 assert not credentials.valid
886 assert credentials.expired
887
888 credentials.before_request(request, "POST", "https://example.com/api", headers)
889
890 # New token should be retrieved.
891 assert headers == {
892 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
893 }
894
895 @mock.patch("google.auth._helpers.utcnow")
896 def test_before_request_impersonation_expired(self, utcnow):
897 headers = {}
898 expire_time = (
899 datetime.datetime.min + datetime.timedelta(seconds=3601)
900 ).isoformat("T") + "Z"
901 # Service account impersonation response.
902 impersonation_response = {
903 "accessToken": "SA_ACCESS_TOKEN",
904 "expireTime": expire_time,
905 }
906 # Initialize mock request to handle token exchange and service account
907 # impersonation request.
908 request = self.make_mock_request(
909 status=http_client.OK,
910 data=self.SUCCESS_RESPONSE.copy(),
911 impersonation_status=http_client.OK,
912 impersonation_data=impersonation_response,
913 )
914 credentials = self.make_credentials(
915 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
916 )
917 credentials.token = "token"
918 utcnow.return_value = datetime.datetime.min
919 # Set the expiration to one second more than now plus the clock skew
920 # accomodation. These credentials should be valid.
921 credentials.expiry = (
922 datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
923 )
924
925 assert credentials.valid
926 assert not credentials.expired
927
928 credentials.before_request(request, "POST", "https://example.com/api", headers)
929
930 # Cached token should be used.
931 assert headers == {"authorization": "Bearer token"}
932
933 # Next call should simulate 1 second passed. This will trigger the expiration
934 # threshold.
935 utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
936
937 assert not credentials.valid
938 assert credentials.expired
939
940 credentials.before_request(request, "POST", "https://example.com/api", headers)
941
942 # New token should be retrieved.
943 assert headers == {
944 "authorization": "Bearer {}".format(impersonation_response["accessToken"])
945 }
946
947 @pytest.mark.parametrize(
948 "audience",
949 [
950 # Legacy K8s audience format.
951 "identitynamespace:1f12345:my_provider",
952 # Unrealistic audiences.
953 "//iam.googleapis.com/projects",
954 "//iam.googleapis.com/projects/",
955 "//iam.googleapis.com/project/123456",
956 "//iam.googleapis.com/projects//123456",
957 "//iam.googleapis.com/prefix_projects/123456",
958 "//iam.googleapis.com/projects_suffix/123456",
959 ],
960 )
961 def test_project_number_indeterminable(self, audience):
962 credentials = CredentialsImpl(
963 audience=audience,
964 subject_token_type=self.SUBJECT_TOKEN_TYPE,
965 token_url=self.TOKEN_URL,
966 credential_source=self.CREDENTIAL_SOURCE,
967 )
968
969 assert credentials.project_number is None
970 assert credentials.get_project_id(None) is None
971
972 def test_project_number_determinable(self):
973 credentials = CredentialsImpl(
974 audience=self.AUDIENCE,
975 subject_token_type=self.SUBJECT_TOKEN_TYPE,
976 token_url=self.TOKEN_URL,
977 credential_source=self.CREDENTIAL_SOURCE,
978 )
979
980 assert credentials.project_number == self.PROJECT_NUMBER
981
982 def test_project_id_without_scopes(self):
983 # Initialize credentials with no scopes.
984 credentials = CredentialsImpl(
985 audience=self.AUDIENCE,
986 subject_token_type=self.SUBJECT_TOKEN_TYPE,
987 token_url=self.TOKEN_URL,
988 credential_source=self.CREDENTIAL_SOURCE,
989 )
990
991 assert credentials.get_project_id(None) is None
992
993 def test_get_project_id_cloud_resource_manager_success(self):
994 # STS token exchange request/response.
995 token_response = self.SUCCESS_RESPONSE.copy()
996 token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
997 token_request_data = {
998 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
999 "audience": self.AUDIENCE,
1000 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
1001 "subject_token": "subject_token_0",
1002 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
1003 "scope": "https://www.googleapis.com/auth/iam",
1004 }
1005 # Service account impersonation request/response.
1006 expire_time = (
1007 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
1008 ).isoformat("T") + "Z"
1009 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
1010 impersonation_response = {
1011 "accessToken": "SA_ACCESS_TOKEN",
1012 "expireTime": expire_time,
1013 }
1014 impersonation_headers = {
1015 "Content-Type": "application/json",
1016 "x-goog-user-project": self.QUOTA_PROJECT_ID,
1017 "authorization": "Bearer {}".format(token_response["access_token"]),
1018 }
1019 impersonation_request_data = {
1020 "delegates": None,
1021 "scope": self.SCOPES,
1022 "lifetime": "3600s",
1023 }
1024 # Initialize mock request to handle token exchange, service account
1025 # impersonation and cloud resource manager request.
1026 request = self.make_mock_request(
1027 status=http_client.OK,
1028 data=self.SUCCESS_RESPONSE.copy(),
1029 impersonation_status=http_client.OK,
1030 impersonation_data=impersonation_response,
1031 cloud_resource_manager_status=http_client.OK,
1032 cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
1033 )
1034 credentials = self.make_credentials(
1035 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
1036 scopes=self.SCOPES,
1037 quota_project_id=self.QUOTA_PROJECT_ID,
1038 )
1039
1040 # Expected project ID from cloud resource manager response should be returned.
1041 project_id = credentials.get_project_id(request)
1042
1043 assert project_id == self.PROJECT_ID
1044 # 3 requests should be processed.
1045 assert len(request.call_args_list) == 3
1046 # Verify token exchange request parameters.
1047 self.assert_token_request_kwargs(
1048 request.call_args_list[0].kwargs, token_headers, token_request_data
1049 )
1050 # Verify service account impersonation request parameters.
1051 self.assert_impersonation_request_kwargs(
1052 request.call_args_list[1].kwargs,
1053 impersonation_headers,
1054 impersonation_request_data,
1055 )
1056 # In the process of getting project ID, an access token should be
1057 # retrieved.
1058 assert credentials.valid
1059 assert credentials.expiry == expected_expiry
1060 assert not credentials.expired
1061 assert credentials.token == impersonation_response["accessToken"]
1062 # Verify cloud resource manager request parameters.
1063 self.assert_resource_manager_request_kwargs(
1064 request.call_args_list[2].kwargs,
1065 self.PROJECT_NUMBER,
1066 {
1067 "x-goog-user-project": self.QUOTA_PROJECT_ID,
1068 "authorization": "Bearer {}".format(
1069 impersonation_response["accessToken"]
1070 ),
1071 },
1072 )
1073
1074 # Calling get_project_id again should return the cached project_id.
1075 project_id = credentials.get_project_id(request)
1076
1077 assert project_id == self.PROJECT_ID
1078 # No additional requests.
1079 assert len(request.call_args_list) == 3
1080
1081 def test_get_project_id_cloud_resource_manager_error(self):
1082 # Simulate resource doesn't have sufficient permissions to access
1083 # cloud resource manager.
1084 request = self.make_mock_request(
1085 status=http_client.OK,
1086 data=self.SUCCESS_RESPONSE.copy(),
1087 cloud_resource_manager_status=http_client.UNAUTHORIZED,
1088 )
1089 credentials = self.make_credentials(scopes=self.SCOPES)
1090
1091 project_id = credentials.get_project_id(request)
1092
1093 assert project_id is None
1094 # Only 2 requests to STS and cloud resource manager should be sent.
1095 assert len(request.call_args_list) == 2