blob: 8f8d98009c1f4632a5bdba2d4cf2cb51bf17a31c [file] [log] [blame]
bojeil-googled4d7f382021-02-16 12:33:20 -08001# Copyright 2020 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17
18import mock
19import pytest
20from six.moves import http_client
21from six.moves import urllib
22
23from google.auth import _helpers
24from google.auth import exceptions
25from google.auth import external_account
26from google.auth import transport
27
28
29CLIENT_ID = "username"
30CLIENT_SECRET = "password"
31# Base64 encoding of "username:password"
32BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
33SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
34
35
36class CredentialsImpl(external_account.Credentials):
37 def __init__(
38 self,
39 audience,
40 subject_token_type,
41 token_url,
42 credential_source,
43 service_account_impersonation_url=None,
44 client_id=None,
45 client_secret=None,
46 quota_project_id=None,
47 scopes=None,
48 default_scopes=None,
49 ):
50 super(CredentialsImpl, self).__init__(
51 audience=audience,
52 subject_token_type=subject_token_type,
53 token_url=token_url,
54 credential_source=credential_source,
55 service_account_impersonation_url=service_account_impersonation_url,
56 client_id=client_id,
57 client_secret=client_secret,
58 quota_project_id=quota_project_id,
59 scopes=scopes,
60 default_scopes=default_scopes,
61 )
62 self._counter = 0
63
64 def retrieve_subject_token(self, request):
65 counter = self._counter
66 self._counter += 1
67 return "subject_token_{}".format(counter)
68
69
70class TestCredentials(object):
71 TOKEN_URL = "https://sts.googleapis.com/v1/token"
72 PROJECT_NUMBER = "123456"
73 POOL_ID = "POOL_ID"
74 PROVIDER_ID = "PROVIDER_ID"
75 AUDIENCE = (
76 "//iam.googleapis.com/projects/{}"
77 "/locations/global/workloadIdentityPools/{}"
78 "/providers/{}"
79 ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
80 SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
81 CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
82 SUCCESS_RESPONSE = {
83 "access_token": "ACCESS_TOKEN",
84 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
85 "token_type": "Bearer",
86 "expires_in": 3600,
87 "scope": "scope1 scope2",
88 }
89 ERROR_RESPONSE = {
90 "error": "invalid_request",
91 "error_description": "Invalid subject token",
92 "error_uri": "https://tools.ietf.org/html/rfc6749",
93 }
94 QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
95 SERVICE_ACCOUNT_IMPERSONATION_URL = (
96 "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
97 + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
98 )
99 SCOPES = ["scope1", "scope2"]
100 IMPERSONATION_ERROR_RESPONSE = {
101 "error": {
102 "code": 400,
103 "message": "Request contains an invalid argument",
104 "status": "INVALID_ARGUMENT",
105 }
106 }
107 PROJECT_ID = "my-proj-id"
108 CLOUD_RESOURCE_MANAGER_URL = (
109 "https://cloudresourcemanager.googleapis.com/v1/projects/"
110 )
111 CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = {
112 "projectNumber": PROJECT_NUMBER,
113 "projectId": PROJECT_ID,
114 "lifecycleState": "ACTIVE",
115 "name": "project-name",
116 "createTime": "2018-11-06T04:42:54.109Z",
117 "parent": {"type": "folder", "id": "12345678901"},
118 }
119
120 @classmethod
121 def make_credentials(
122 cls,
123 client_id=None,
124 client_secret=None,
125 quota_project_id=None,
126 scopes=None,
127 default_scopes=None,
128 service_account_impersonation_url=None,
129 ):
130 return CredentialsImpl(
131 audience=cls.AUDIENCE,
132 subject_token_type=cls.SUBJECT_TOKEN_TYPE,
133 token_url=cls.TOKEN_URL,
134 service_account_impersonation_url=service_account_impersonation_url,
135 credential_source=cls.CREDENTIAL_SOURCE,
136 client_id=client_id,
137 client_secret=client_secret,
138 quota_project_id=quota_project_id,
139 scopes=scopes,
140 default_scopes=default_scopes,
141 )
142
143 @classmethod
144 def make_mock_request(
145 cls,
146 status=http_client.OK,
147 data=None,
148 impersonation_status=None,
149 impersonation_data=None,
150 cloud_resource_manager_status=None,
151 cloud_resource_manager_data=None,
152 ):
153 # STS token exchange request.
154 token_response = mock.create_autospec(transport.Response, instance=True)
155 token_response.status = status
156 token_response.data = json.dumps(data).encode("utf-8")
157 responses = [token_response]
158
159 # If service account impersonation is requested, mock the expected response.
160 if impersonation_status:
161 impersonation_response = mock.create_autospec(
162 transport.Response, instance=True
163 )
164 impersonation_response.status = impersonation_status
165 impersonation_response.data = json.dumps(impersonation_data).encode("utf-8")
166 responses.append(impersonation_response)
167
168 # If cloud resource manager is requested, mock the expected response.
169 if cloud_resource_manager_status:
170 cloud_resource_manager_response = mock.create_autospec(
171 transport.Response, instance=True
172 )
173 cloud_resource_manager_response.status = cloud_resource_manager_status
174 cloud_resource_manager_response.data = json.dumps(
175 cloud_resource_manager_data
176 ).encode("utf-8")
177 responses.append(cloud_resource_manager_response)
178
179 request = mock.create_autospec(transport.Request)
180 request.side_effect = responses
181
182 return request
183
184 @classmethod
185 def assert_token_request_kwargs(cls, request_kwargs, headers, request_data):
186 assert request_kwargs["url"] == cls.TOKEN_URL
187 assert request_kwargs["method"] == "POST"
188 assert request_kwargs["headers"] == headers
189 assert request_kwargs["body"] is not None
190 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
191 for (k, v) in body_tuples:
192 assert v.decode("utf-8") == request_data[k.decode("utf-8")]
193 assert len(body_tuples) == len(request_data.keys())
194
195 @classmethod
196 def assert_impersonation_request_kwargs(cls, request_kwargs, headers, request_data):
197 assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL
198 assert request_kwargs["method"] == "POST"
199 assert request_kwargs["headers"] == headers
200 assert request_kwargs["body"] is not None
201 body_json = json.loads(request_kwargs["body"].decode("utf-8"))
202 assert body_json == request_data
203
204 @classmethod
205 def assert_resource_manager_request_kwargs(
206 cls, request_kwargs, project_number, headers
207 ):
208 assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number
209 assert request_kwargs["method"] == "GET"
210 assert request_kwargs["headers"] == headers
211 assert "body" not in request_kwargs
212
213 def test_default_state(self):
214 credentials = self.make_credentials()
215
216 # Not token acquired yet
217 assert not credentials.token
218 assert not credentials.valid
219 # Expiration hasn't been set yet
220 assert not credentials.expiry
221 assert not credentials.expired
222 # Scopes are required
223 assert not credentials.scopes
224 assert credentials.requires_scopes
225 assert not credentials.quota_project_id
226
227 def test_with_scopes(self):
228 credentials = self.make_credentials()
229
230 assert not credentials.scopes
231 assert credentials.requires_scopes
232
233 scoped_credentials = credentials.with_scopes(["email"])
234
235 assert scoped_credentials.has_scopes(["email"])
236 assert not scoped_credentials.requires_scopes
237
238 def test_with_scopes_using_user_and_default_scopes(self):
239 credentials = self.make_credentials()
240
241 assert not credentials.scopes
242 assert credentials.requires_scopes
243
244 scoped_credentials = credentials.with_scopes(
245 ["email"], default_scopes=["profile"]
246 )
247
248 assert scoped_credentials.has_scopes(["email"])
249 assert not scoped_credentials.has_scopes(["profile"])
250 assert not scoped_credentials.requires_scopes
251 assert scoped_credentials.scopes == ["email"]
252 assert scoped_credentials.default_scopes == ["profile"]
253
254 def test_with_scopes_using_default_scopes_only(self):
255 credentials = self.make_credentials()
256
257 assert not credentials.scopes
258 assert credentials.requires_scopes
259
260 scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"])
261
262 assert scoped_credentials.has_scopes(["profile"])
263 assert not scoped_credentials.requires_scopes
264
265 def test_with_scopes_full_options_propagated(self):
266 credentials = self.make_credentials(
267 client_id=CLIENT_ID,
268 client_secret=CLIENT_SECRET,
269 quota_project_id=self.QUOTA_PROJECT_ID,
270 scopes=self.SCOPES,
271 default_scopes=["default1"],
272 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
273 )
274
275 with mock.patch.object(
276 external_account.Credentials, "__init__", return_value=None
277 ) as mock_init:
278 credentials.with_scopes(["email"], ["default2"])
279
280 # Confirm with_scopes initialized the credential with the expected
281 # parameters and scopes.
282 mock_init.assert_called_once_with(
283 audience=self.AUDIENCE,
284 subject_token_type=self.SUBJECT_TOKEN_TYPE,
285 token_url=self.TOKEN_URL,
286 credential_source=self.CREDENTIAL_SOURCE,
287 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
288 client_id=CLIENT_ID,
289 client_secret=CLIENT_SECRET,
290 quota_project_id=self.QUOTA_PROJECT_ID,
291 scopes=["email"],
292 default_scopes=["default2"],
293 )
294
295 def test_with_quota_project(self):
296 credentials = self.make_credentials()
297
298 assert not credentials.scopes
299 assert not credentials.quota_project_id
300
301 quota_project_creds = credentials.with_quota_project("project-foo")
302
303 assert quota_project_creds.quota_project_id == "project-foo"
304
305 def test_with_quota_project_full_options_propagated(self):
306 credentials = self.make_credentials(
307 client_id=CLIENT_ID,
308 client_secret=CLIENT_SECRET,
309 quota_project_id=self.QUOTA_PROJECT_ID,
310 scopes=self.SCOPES,
311 default_scopes=["default1"],
312 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
313 )
314
315 with mock.patch.object(
316 external_account.Credentials, "__init__", return_value=None
317 ) as mock_init:
318 credentials.with_quota_project("project-foo")
319
320 # Confirm with_quota_project initialized the credential with the
321 # expected parameters and quota project ID.
322 mock_init.assert_called_once_with(
323 audience=self.AUDIENCE,
324 subject_token_type=self.SUBJECT_TOKEN_TYPE,
325 token_url=self.TOKEN_URL,
326 credential_source=self.CREDENTIAL_SOURCE,
327 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
328 client_id=CLIENT_ID,
329 client_secret=CLIENT_SECRET,
330 quota_project_id="project-foo",
331 scopes=self.SCOPES,
332 default_scopes=["default1"],
333 )
334
335 def test_with_invalid_impersonation_target_principal(self):
336 invalid_url = "https://iamcredentials.googleapis.com/v1/invalid"
337
338 with pytest.raises(exceptions.RefreshError) as excinfo:
339 self.make_credentials(service_account_impersonation_url=invalid_url)
340
341 assert excinfo.match(
342 r"Unable to determine target principal from service account impersonation URL."
343 )
344
345 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
346 def test_refresh_without_client_auth_success(self, unused_utcnow):
347 response = self.SUCCESS_RESPONSE.copy()
348 # Test custom expiration to confirm expiry is set correctly.
349 response["expires_in"] = 2800
350 expected_expiry = datetime.datetime.min + datetime.timedelta(
351 seconds=response["expires_in"]
352 )
353 headers = {"Content-Type": "application/x-www-form-urlencoded"}
354 request_data = {
355 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
356 "audience": self.AUDIENCE,
357 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
358 "subject_token": "subject_token_0",
359 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
360 }
361 request = self.make_mock_request(status=http_client.OK, data=response)
362 credentials = self.make_credentials()
363
364 credentials.refresh(request)
365
arithmetic1728d80c85f2021-03-08 13:35:44 -0800366 self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
bojeil-googled4d7f382021-02-16 12:33:20 -0800367 assert credentials.valid
368 assert credentials.expiry == expected_expiry
369 assert not credentials.expired
370 assert credentials.token == response["access_token"]
371
372 def test_refresh_impersonation_without_client_auth_success(self):
373 # Simulate service account access token expires in 2800 seconds.
374 expire_time = (
375 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
376 ).isoformat("T") + "Z"
377 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
378 # STS token exchange request/response.
379 token_response = self.SUCCESS_RESPONSE.copy()
380 token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
381 token_request_data = {
382 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
383 "audience": self.AUDIENCE,
384 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
385 "subject_token": "subject_token_0",
386 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
387 "scope": "https://www.googleapis.com/auth/iam",
388 }
389 # Service account impersonation request/response.
390 impersonation_response = {
391 "accessToken": "SA_ACCESS_TOKEN",
392 "expireTime": expire_time,
393 }
394 impersonation_headers = {
395 "Content-Type": "application/json",
396 "authorization": "Bearer {}".format(token_response["access_token"]),
397 }
398 impersonation_request_data = {
399 "delegates": None,
400 "scope": self.SCOPES,
401 "lifetime": "3600s",
402 }
403 # Initialize mock request to handle token exchange and service account
404 # impersonation request.
405 request = self.make_mock_request(
406 status=http_client.OK,
407 data=token_response,
408 impersonation_status=http_client.OK,
409 impersonation_data=impersonation_response,
410 )
411 # Initialize credentials with service account impersonation.
412 credentials = self.make_credentials(
413 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
414 scopes=self.SCOPES,
415 )
416
417 credentials.refresh(request)
418
419 # Only 2 requests should be processed.
420 assert len(request.call_args_list) == 2
421 # Verify token exchange request parameters.
422 self.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800423 request.call_args_list[0][1], token_headers, token_request_data
bojeil-googled4d7f382021-02-16 12:33:20 -0800424 )
425 # Verify service account impersonation request parameters.
426 self.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800427 request.call_args_list[1][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800428 impersonation_headers,
429 impersonation_request_data,
430 )
431 assert credentials.valid
432 assert credentials.expiry == expected_expiry
433 assert not credentials.expired
434 assert credentials.token == impersonation_response["accessToken"]
435
436 def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800437 self,
bojeil-googled4d7f382021-02-16 12:33:20 -0800438 ):
439 headers = {"Content-Type": "application/x-www-form-urlencoded"}
440 request_data = {
441 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
442 "audience": self.AUDIENCE,
443 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
444 "scope": "scope1 scope2",
445 "subject_token": "subject_token_0",
446 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
447 }
448 request = self.make_mock_request(
449 status=http_client.OK, data=self.SUCCESS_RESPONSE
450 )
451 credentials = self.make_credentials(
452 scopes=["scope1", "scope2"],
453 # Default scopes will be ignored in favor of user scopes.
454 default_scopes=["ignored"],
455 )
456
457 credentials.refresh(request)
458
arithmetic1728d80c85f2021-03-08 13:35:44 -0800459 self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
bojeil-googled4d7f382021-02-16 12:33:20 -0800460 assert credentials.valid
461 assert not credentials.expired
462 assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
463 assert credentials.has_scopes(["scope1", "scope2"])
464 assert not credentials.has_scopes(["ignored"])
465
466 def test_refresh_without_client_auth_success_explicit_default_scopes_only(self):
467 headers = {"Content-Type": "application/x-www-form-urlencoded"}
468 request_data = {
469 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
470 "audience": self.AUDIENCE,
471 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
472 "scope": "scope1 scope2",
473 "subject_token": "subject_token_0",
474 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
475 }
476 request = self.make_mock_request(
477 status=http_client.OK, data=self.SUCCESS_RESPONSE
478 )
479 credentials = self.make_credentials(
480 scopes=None,
481 # Default scopes will be used since user scopes are none.
482 default_scopes=["scope1", "scope2"],
483 )
484
485 credentials.refresh(request)
486
arithmetic1728d80c85f2021-03-08 13:35:44 -0800487 self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
bojeil-googled4d7f382021-02-16 12:33:20 -0800488 assert credentials.valid
489 assert not credentials.expired
490 assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
491 assert credentials.has_scopes(["scope1", "scope2"])
492
493 def test_refresh_without_client_auth_error(self):
494 request = self.make_mock_request(
495 status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE
496 )
497 credentials = self.make_credentials()
498
499 with pytest.raises(exceptions.OAuthError) as excinfo:
500 credentials.refresh(request)
501
502 assert excinfo.match(
503 r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749"
504 )
505 assert not credentials.expired
506 assert credentials.token is None
507
508 def test_refresh_impersonation_without_client_auth_error(self):
509 request = self.make_mock_request(
510 status=http_client.OK,
511 data=self.SUCCESS_RESPONSE,
512 impersonation_status=http_client.BAD_REQUEST,
513 impersonation_data=self.IMPERSONATION_ERROR_RESPONSE,
514 )
515 credentials = self.make_credentials(
516 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
517 scopes=self.SCOPES,
518 )
519
520 with pytest.raises(exceptions.RefreshError) as excinfo:
521 credentials.refresh(request)
522
523 assert excinfo.match(r"Unable to acquire impersonated credentials")
524 assert not credentials.expired
525 assert credentials.token is None
526
527 def test_refresh_with_client_auth_success(self):
528 headers = {
529 "Content-Type": "application/x-www-form-urlencoded",
530 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
531 }
532 request_data = {
533 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
534 "audience": self.AUDIENCE,
535 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
536 "subject_token": "subject_token_0",
537 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
538 }
539 request = self.make_mock_request(
540 status=http_client.OK, data=self.SUCCESS_RESPONSE
541 )
542 credentials = self.make_credentials(
543 client_id=CLIENT_ID, client_secret=CLIENT_SECRET
544 )
545
546 credentials.refresh(request)
547
arithmetic1728d80c85f2021-03-08 13:35:44 -0800548 self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
bojeil-googled4d7f382021-02-16 12:33:20 -0800549 assert credentials.valid
550 assert not credentials.expired
551 assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
552
553 def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes(self):
554 # Simulate service account access token expires in 2800 seconds.
555 expire_time = (
556 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
557 ).isoformat("T") + "Z"
558 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
559 # STS token exchange request/response.
560 token_response = self.SUCCESS_RESPONSE.copy()
561 token_headers = {
562 "Content-Type": "application/x-www-form-urlencoded",
563 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
564 }
565 token_request_data = {
566 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
567 "audience": self.AUDIENCE,
568 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
569 "subject_token": "subject_token_0",
570 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
571 "scope": "https://www.googleapis.com/auth/iam",
572 }
573 # Service account impersonation request/response.
574 impersonation_response = {
575 "accessToken": "SA_ACCESS_TOKEN",
576 "expireTime": expire_time,
577 }
578 impersonation_headers = {
579 "Content-Type": "application/json",
580 "authorization": "Bearer {}".format(token_response["access_token"]),
581 }
582 impersonation_request_data = {
583 "delegates": None,
584 "scope": self.SCOPES,
585 "lifetime": "3600s",
586 }
587 # Initialize mock request to handle token exchange and service account
588 # impersonation request.
589 request = self.make_mock_request(
590 status=http_client.OK,
591 data=token_response,
592 impersonation_status=http_client.OK,
593 impersonation_data=impersonation_response,
594 )
595 # Initialize credentials with service account impersonation and basic auth.
596 credentials = self.make_credentials(
597 client_id=CLIENT_ID,
598 client_secret=CLIENT_SECRET,
599 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
600 scopes=self.SCOPES,
601 # Default scopes will be ignored since user scopes are specified.
602 default_scopes=["ignored"],
603 )
604
605 credentials.refresh(request)
606
607 # Only 2 requests should be processed.
608 assert len(request.call_args_list) == 2
609 # Verify token exchange request parameters.
610 self.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800611 request.call_args_list[0][1], token_headers, token_request_data
bojeil-googled4d7f382021-02-16 12:33:20 -0800612 )
613 # Verify service account impersonation request parameters.
614 self.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800615 request.call_args_list[1][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800616 impersonation_headers,
617 impersonation_request_data,
618 )
619 assert credentials.valid
620 assert credentials.expiry == expected_expiry
621 assert not credentials.expired
622 assert credentials.token == impersonation_response["accessToken"]
623
624 def test_refresh_impersonation_with_client_auth_success_use_default_scopes(self):
625 # Simulate service account access token expires in 2800 seconds.
626 expire_time = (
627 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
628 ).isoformat("T") + "Z"
629 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
630 # STS token exchange request/response.
631 token_response = self.SUCCESS_RESPONSE.copy()
632 token_headers = {
633 "Content-Type": "application/x-www-form-urlencoded",
634 "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
635 }
636 token_request_data = {
637 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
638 "audience": self.AUDIENCE,
639 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
640 "subject_token": "subject_token_0",
641 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
642 "scope": "https://www.googleapis.com/auth/iam",
643 }
644 # Service account impersonation request/response.
645 impersonation_response = {
646 "accessToken": "SA_ACCESS_TOKEN",
647 "expireTime": expire_time,
648 }
649 impersonation_headers = {
650 "Content-Type": "application/json",
651 "authorization": "Bearer {}".format(token_response["access_token"]),
652 }
653 impersonation_request_data = {
654 "delegates": None,
655 "scope": self.SCOPES,
656 "lifetime": "3600s",
657 }
658 # Initialize mock request to handle token exchange and service account
659 # impersonation request.
660 request = self.make_mock_request(
661 status=http_client.OK,
662 data=token_response,
663 impersonation_status=http_client.OK,
664 impersonation_data=impersonation_response,
665 )
666 # Initialize credentials with service account impersonation and basic auth.
667 credentials = self.make_credentials(
668 client_id=CLIENT_ID,
669 client_secret=CLIENT_SECRET,
670 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
671 scopes=None,
672 # Default scopes will be used since user specified scopes are none.
673 default_scopes=self.SCOPES,
674 )
675
676 credentials.refresh(request)
677
678 # Only 2 requests should be processed.
679 assert len(request.call_args_list) == 2
680 # Verify token exchange request parameters.
681 self.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800682 request.call_args_list[0][1], token_headers, token_request_data
bojeil-googled4d7f382021-02-16 12:33:20 -0800683 )
684 # Verify service account impersonation request parameters.
685 self.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -0800686 request.call_args_list[1][1],
bojeil-googled4d7f382021-02-16 12:33:20 -0800687 impersonation_headers,
688 impersonation_request_data,
689 )
690 assert credentials.valid
691 assert credentials.expiry == expected_expiry
692 assert not credentials.expired
693 assert credentials.token == impersonation_response["accessToken"]
694
695 def test_apply_without_quota_project_id(self):
696 headers = {}
697 request = self.make_mock_request(
698 status=http_client.OK, data=self.SUCCESS_RESPONSE
699 )
700 credentials = self.make_credentials()
701
702 credentials.refresh(request)
703 credentials.apply(headers)
704
705 assert headers == {
706 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
707 }
708
709 def test_apply_impersonation_without_quota_project_id(self):
710 expire_time = (
711 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
712 ).isoformat("T") + "Z"
713 # Service account impersonation response.
714 impersonation_response = {
715 "accessToken": "SA_ACCESS_TOKEN",
716 "expireTime": expire_time,
717 }
718 # Initialize mock request to handle token exchange and service account
719 # impersonation request.
720 request = self.make_mock_request(
721 status=http_client.OK,
722 data=self.SUCCESS_RESPONSE.copy(),
723 impersonation_status=http_client.OK,
724 impersonation_data=impersonation_response,
725 )
726 # Initialize credentials with service account impersonation.
727 credentials = self.make_credentials(
728 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
729 scopes=self.SCOPES,
730 )
731 headers = {}
732
733 credentials.refresh(request)
734 credentials.apply(headers)
735
736 assert headers == {
737 "authorization": "Bearer {}".format(impersonation_response["accessToken"])
738 }
739
740 def test_apply_with_quota_project_id(self):
741 headers = {"other": "header-value"}
742 request = self.make_mock_request(
743 status=http_client.OK, data=self.SUCCESS_RESPONSE
744 )
745 credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID)
746
747 credentials.refresh(request)
748 credentials.apply(headers)
749
750 assert headers == {
751 "other": "header-value",
752 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
753 "x-goog-user-project": self.QUOTA_PROJECT_ID,
754 }
755
756 def test_apply_impersonation_with_quota_project_id(self):
757 expire_time = (
758 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
759 ).isoformat("T") + "Z"
760 # Service account impersonation response.
761 impersonation_response = {
762 "accessToken": "SA_ACCESS_TOKEN",
763 "expireTime": expire_time,
764 }
765 # Initialize mock request to handle token exchange and service account
766 # impersonation request.
767 request = self.make_mock_request(
768 status=http_client.OK,
769 data=self.SUCCESS_RESPONSE.copy(),
770 impersonation_status=http_client.OK,
771 impersonation_data=impersonation_response,
772 )
773 # Initialize credentials with service account impersonation.
774 credentials = self.make_credentials(
775 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
776 scopes=self.SCOPES,
777 quota_project_id=self.QUOTA_PROJECT_ID,
778 )
779 headers = {"other": "header-value"}
780
781 credentials.refresh(request)
782 credentials.apply(headers)
783
784 assert headers == {
785 "other": "header-value",
786 "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
787 "x-goog-user-project": self.QUOTA_PROJECT_ID,
788 }
789
790 def test_before_request(self):
791 headers = {"other": "header-value"}
792 request = self.make_mock_request(
793 status=http_client.OK, data=self.SUCCESS_RESPONSE
794 )
795 credentials = self.make_credentials()
796
797 # First call should call refresh, setting the token.
798 credentials.before_request(request, "POST", "https://example.com/api", headers)
799
800 assert headers == {
801 "other": "header-value",
802 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
803 }
804
805 # Second call shouldn't call refresh.
806 credentials.before_request(request, "POST", "https://example.com/api", headers)
807
808 assert headers == {
809 "other": "header-value",
810 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
811 }
812
813 def test_before_request_impersonation(self):
814 expire_time = (
815 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
816 ).isoformat("T") + "Z"
817 # Service account impersonation response.
818 impersonation_response = {
819 "accessToken": "SA_ACCESS_TOKEN",
820 "expireTime": expire_time,
821 }
822 # Initialize mock request to handle token exchange and service account
823 # impersonation request.
824 request = self.make_mock_request(
825 status=http_client.OK,
826 data=self.SUCCESS_RESPONSE.copy(),
827 impersonation_status=http_client.OK,
828 impersonation_data=impersonation_response,
829 )
830 headers = {"other": "header-value"}
831 credentials = self.make_credentials(
832 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
833 )
834
835 # First call should call refresh, setting the token.
836 credentials.before_request(request, "POST", "https://example.com/api", headers)
837
838 assert headers == {
839 "other": "header-value",
840 "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
841 }
842
843 # Second call shouldn't call refresh.
844 credentials.before_request(request, "POST", "https://example.com/api", headers)
845
846 assert headers == {
847 "other": "header-value",
848 "authorization": "Bearer {}".format(impersonation_response["accessToken"]),
849 }
850
851 @mock.patch("google.auth._helpers.utcnow")
852 def test_before_request_expired(self, utcnow):
853 headers = {}
854 request = self.make_mock_request(
855 status=http_client.OK, data=self.SUCCESS_RESPONSE
856 )
857 credentials = self.make_credentials()
858 credentials.token = "token"
859 utcnow.return_value = datetime.datetime.min
860 # Set the expiration to one second more than now plus the clock skew
861 # accomodation. These credentials should be valid.
862 credentials.expiry = (
863 datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
864 )
865
866 assert credentials.valid
867 assert not credentials.expired
868
869 credentials.before_request(request, "POST", "https://example.com/api", headers)
870
871 # Cached token should be used.
872 assert headers == {"authorization": "Bearer token"}
873
874 # Next call should simulate 1 second passed.
875 utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
876
877 assert not credentials.valid
878 assert credentials.expired
879
880 credentials.before_request(request, "POST", "https://example.com/api", headers)
881
882 # New token should be retrieved.
883 assert headers == {
884 "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
885 }
886
887 @mock.patch("google.auth._helpers.utcnow")
888 def test_before_request_impersonation_expired(self, utcnow):
889 headers = {}
890 expire_time = (
891 datetime.datetime.min + datetime.timedelta(seconds=3601)
892 ).isoformat("T") + "Z"
893 # Service account impersonation response.
894 impersonation_response = {
895 "accessToken": "SA_ACCESS_TOKEN",
896 "expireTime": expire_time,
897 }
898 # Initialize mock request to handle token exchange and service account
899 # impersonation request.
900 request = self.make_mock_request(
901 status=http_client.OK,
902 data=self.SUCCESS_RESPONSE.copy(),
903 impersonation_status=http_client.OK,
904 impersonation_data=impersonation_response,
905 )
906 credentials = self.make_credentials(
907 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
908 )
909 credentials.token = "token"
910 utcnow.return_value = datetime.datetime.min
911 # Set the expiration to one second more than now plus the clock skew
912 # accomodation. These credentials should be valid.
913 credentials.expiry = (
914 datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
915 )
916
917 assert credentials.valid
918 assert not credentials.expired
919
920 credentials.before_request(request, "POST", "https://example.com/api", headers)
921
922 # Cached token should be used.
923 assert headers == {"authorization": "Bearer token"}
924
925 # Next call should simulate 1 second passed. This will trigger the expiration
926 # threshold.
927 utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
928
929 assert not credentials.valid
930 assert credentials.expired
931
932 credentials.before_request(request, "POST", "https://example.com/api", headers)
933
934 # New token should be retrieved.
935 assert headers == {
936 "authorization": "Bearer {}".format(impersonation_response["accessToken"])
937 }
938
939 @pytest.mark.parametrize(
940 "audience",
941 [
942 # Legacy K8s audience format.
943 "identitynamespace:1f12345:my_provider",
944 # Unrealistic audiences.
945 "//iam.googleapis.com/projects",
946 "//iam.googleapis.com/projects/",
947 "//iam.googleapis.com/project/123456",
948 "//iam.googleapis.com/projects//123456",
949 "//iam.googleapis.com/prefix_projects/123456",
950 "//iam.googleapis.com/projects_suffix/123456",
951 ],
952 )
953 def test_project_number_indeterminable(self, audience):
954 credentials = CredentialsImpl(
955 audience=audience,
956 subject_token_type=self.SUBJECT_TOKEN_TYPE,
957 token_url=self.TOKEN_URL,
958 credential_source=self.CREDENTIAL_SOURCE,
959 )
960
961 assert credentials.project_number is None
962 assert credentials.get_project_id(None) is None
963
964 def test_project_number_determinable(self):
965 credentials = CredentialsImpl(
966 audience=self.AUDIENCE,
967 subject_token_type=self.SUBJECT_TOKEN_TYPE,
968 token_url=self.TOKEN_URL,
969 credential_source=self.CREDENTIAL_SOURCE,
970 )
971
972 assert credentials.project_number == self.PROJECT_NUMBER
973
974 def test_project_id_without_scopes(self):
975 # Initialize credentials with no scopes.
976 credentials = CredentialsImpl(
977 audience=self.AUDIENCE,
978 subject_token_type=self.SUBJECT_TOKEN_TYPE,
979 token_url=self.TOKEN_URL,
980 credential_source=self.CREDENTIAL_SOURCE,
981 )
982
983 assert credentials.get_project_id(None) is None
984
985 def test_get_project_id_cloud_resource_manager_success(self):
986 # STS token exchange request/response.
987 token_response = self.SUCCESS_RESPONSE.copy()
988 token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
989 token_request_data = {
990 "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
991 "audience": self.AUDIENCE,
992 "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
993 "subject_token": "subject_token_0",
994 "subject_token_type": self.SUBJECT_TOKEN_TYPE,
995 "scope": "https://www.googleapis.com/auth/iam",
996 }
997 # Service account impersonation request/response.
998 expire_time = (
999 _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
1000 ).isoformat("T") + "Z"
1001 expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
1002 impersonation_response = {
1003 "accessToken": "SA_ACCESS_TOKEN",
1004 "expireTime": expire_time,
1005 }
1006 impersonation_headers = {
1007 "Content-Type": "application/json",
1008 "x-goog-user-project": self.QUOTA_PROJECT_ID,
1009 "authorization": "Bearer {}".format(token_response["access_token"]),
1010 }
1011 impersonation_request_data = {
1012 "delegates": None,
1013 "scope": self.SCOPES,
1014 "lifetime": "3600s",
1015 }
1016 # Initialize mock request to handle token exchange, service account
1017 # impersonation and cloud resource manager request.
1018 request = self.make_mock_request(
1019 status=http_client.OK,
1020 data=self.SUCCESS_RESPONSE.copy(),
1021 impersonation_status=http_client.OK,
1022 impersonation_data=impersonation_response,
1023 cloud_resource_manager_status=http_client.OK,
1024 cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
1025 )
1026 credentials = self.make_credentials(
1027 service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
1028 scopes=self.SCOPES,
1029 quota_project_id=self.QUOTA_PROJECT_ID,
1030 )
1031
1032 # Expected project ID from cloud resource manager response should be returned.
1033 project_id = credentials.get_project_id(request)
1034
1035 assert project_id == self.PROJECT_ID
1036 # 3 requests should be processed.
1037 assert len(request.call_args_list) == 3
1038 # Verify token exchange request parameters.
1039 self.assert_token_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -08001040 request.call_args_list[0][1], token_headers, token_request_data
bojeil-googled4d7f382021-02-16 12:33:20 -08001041 )
1042 # Verify service account impersonation request parameters.
1043 self.assert_impersonation_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -08001044 request.call_args_list[1][1],
bojeil-googled4d7f382021-02-16 12:33:20 -08001045 impersonation_headers,
1046 impersonation_request_data,
1047 )
1048 # In the process of getting project ID, an access token should be
1049 # retrieved.
1050 assert credentials.valid
1051 assert credentials.expiry == expected_expiry
1052 assert not credentials.expired
1053 assert credentials.token == impersonation_response["accessToken"]
1054 # Verify cloud resource manager request parameters.
1055 self.assert_resource_manager_request_kwargs(
arithmetic1728d80c85f2021-03-08 13:35:44 -08001056 request.call_args_list[2][1],
bojeil-googled4d7f382021-02-16 12:33:20 -08001057 self.PROJECT_NUMBER,
1058 {
1059 "x-goog-user-project": self.QUOTA_PROJECT_ID,
1060 "authorization": "Bearer {}".format(
1061 impersonation_response["accessToken"]
1062 ),
1063 },
1064 )
1065
1066 # Calling get_project_id again should return the cached project_id.
1067 project_id = credentials.get_project_id(request)
1068
1069 assert project_id == self.PROJECT_ID
1070 # No additional requests.
1071 assert len(request.call_args_list) == 3
1072
1073 def test_get_project_id_cloud_resource_manager_error(self):
1074 # Simulate resource doesn't have sufficient permissions to access
1075 # cloud resource manager.
1076 request = self.make_mock_request(
1077 status=http_client.OK,
1078 data=self.SUCCESS_RESPONSE.copy(),
1079 cloud_resource_manager_status=http_client.UNAUTHORIZED,
1080 )
1081 credentials = self.make_credentials(scopes=self.SCOPES)
1082
1083 project_id = credentials.get_project_id(request)
1084
1085 assert project_id is None
1086 # Only 2 requests to STS and cloud resource manager should be sent.
1087 assert len(request.call_args_list) == 2