blob: 9ca95f5aa8564f49d022b44a4a7c9717426eca88 [file] [log] [blame]
bojeil-googled8839212021-07-08 10:56:22 -07001# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
bojeil-google2f5c3a62021-07-09 13:27:02 -070015import datetime
16import json
bojeil-googled8839212021-07-08 10:56:22 -070017
bojeil-google2f5c3a62021-07-09 13:27:02 -070018import mock
19import pytest
20from six.moves import http_client
21from six.moves import urllib
22
23from google.auth import _helpers
24from google.auth import credentials
bojeil-googled8839212021-07-08 10:56:22 -070025from google.auth import downscoped
bojeil-google2f5c3a62021-07-09 13:27:02 -070026from google.auth import exceptions
27from google.auth import transport
bojeil-googled8839212021-07-08 10:56:22 -070028
29
30EXPRESSION = (
31 "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
32)
33TITLE = "customer-a-objects"
34DESCRIPTION = (
35 "Condition to make permissions available for objects starting with customer-a"
36)
37AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket"
38AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"]
39
40OTHER_EXPRESSION = (
41 "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')"
42)
43OTHER_TITLE = "customer-b-objects"
44OTHER_DESCRIPTION = (
45 "Condition to make permissions available for objects starting with customer-b"
46)
47OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
48OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
bojeil-google2f5c3a62021-07-09 13:27:02 -070049QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
50GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
51REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
52TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token"
53SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
54SUCCESS_RESPONSE = {
55 "access_token": "ACCESS_TOKEN",
56 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
57 "token_type": "Bearer",
58 "expires_in": 3600,
59}
60ERROR_RESPONSE = {
61 "error": "invalid_grant",
62 "error_description": "Subject token is invalid.",
63 "error_uri": "https://tools.ietf.org/html/rfc6749",
64}
65CREDENTIAL_ACCESS_BOUNDARY_JSON = {
66 "accessBoundary": {
67 "accessBoundaryRules": [
68 {
69 "availablePermissions": AVAILABLE_PERMISSIONS,
70 "availableResource": AVAILABLE_RESOURCE,
71 "availabilityCondition": {
72 "expression": EXPRESSION,
73 "title": TITLE,
74 "description": DESCRIPTION,
75 },
76 }
77 ]
78 }
79}
80
81
82class SourceCredentials(credentials.Credentials):
bojeil-googledfad6612021-07-20 10:43:13 -070083 def __init__(self, raise_error=False, expires_in=3600):
bojeil-google2f5c3a62021-07-09 13:27:02 -070084 super(SourceCredentials, self).__init__()
85 self._counter = 0
86 self._raise_error = raise_error
bojeil-googledfad6612021-07-20 10:43:13 -070087 self._expires_in = expires_in
bojeil-google2f5c3a62021-07-09 13:27:02 -070088
89 def refresh(self, request):
90 if self._raise_error:
91 raise exceptions.RefreshError(
92 "Failed to refresh access token in source credentials."
93 )
94 now = _helpers.utcnow()
95 self._counter += 1
96 self.token = "ACCESS_TOKEN_{}".format(self._counter)
bojeil-googledfad6612021-07-20 10:43:13 -070097 self.expiry = now + datetime.timedelta(seconds=self._expires_in)
bojeil-googled8839212021-07-08 10:56:22 -070098
99
100def make_availability_condition(expression, title=None, description=None):
101 return downscoped.AvailabilityCondition(expression, title, description)
102
103
104def make_access_boundary_rule(
105 available_resource, available_permissions, availability_condition=None
106):
107 return downscoped.AccessBoundaryRule(
108 available_resource, available_permissions, availability_condition
109 )
110
111
112def make_credential_access_boundary(rules):
113 return downscoped.CredentialAccessBoundary(rules)
114
115
116class TestAvailabilityCondition(object):
117 def test_constructor(self):
118 availability_condition = make_availability_condition(
119 EXPRESSION, TITLE, DESCRIPTION
120 )
121
122 assert availability_condition.expression == EXPRESSION
123 assert availability_condition.title == TITLE
124 assert availability_condition.description == DESCRIPTION
125
126 def test_constructor_required_params_only(self):
127 availability_condition = make_availability_condition(EXPRESSION)
128
129 assert availability_condition.expression == EXPRESSION
130 assert availability_condition.title is None
131 assert availability_condition.description is None
132
133 def test_setters(self):
134 availability_condition = make_availability_condition(
135 EXPRESSION, TITLE, DESCRIPTION
136 )
137 availability_condition.expression = OTHER_EXPRESSION
138 availability_condition.title = OTHER_TITLE
139 availability_condition.description = OTHER_DESCRIPTION
140
141 assert availability_condition.expression == OTHER_EXPRESSION
142 assert availability_condition.title == OTHER_TITLE
143 assert availability_condition.description == OTHER_DESCRIPTION
144
145 def test_invalid_expression_type(self):
146 with pytest.raises(TypeError) as excinfo:
147 make_availability_condition([EXPRESSION], TITLE, DESCRIPTION)
148
149 assert excinfo.match("The provided expression is not a string.")
150
151 def test_invalid_title_type(self):
152 with pytest.raises(TypeError) as excinfo:
153 make_availability_condition(EXPRESSION, False, DESCRIPTION)
154
155 assert excinfo.match("The provided title is not a string or None.")
156
157 def test_invalid_description_type(self):
158 with pytest.raises(TypeError) as excinfo:
159 make_availability_condition(EXPRESSION, TITLE, False)
160
161 assert excinfo.match("The provided description is not a string or None.")
162
163 def test_to_json_required_params_only(self):
164 availability_condition = make_availability_condition(EXPRESSION)
165
166 assert availability_condition.to_json() == {"expression": EXPRESSION}
167
168 def test_to_json_(self):
169 availability_condition = make_availability_condition(
170 EXPRESSION, TITLE, DESCRIPTION
171 )
172
173 assert availability_condition.to_json() == {
174 "expression": EXPRESSION,
175 "title": TITLE,
176 "description": DESCRIPTION,
177 }
178
179
180class TestAccessBoundaryRule(object):
181 def test_constructor(self):
182 availability_condition = make_availability_condition(
183 EXPRESSION, TITLE, DESCRIPTION
184 )
185 access_boundary_rule = make_access_boundary_rule(
186 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
187 )
188
189 assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
190 assert access_boundary_rule.available_permissions == tuple(
191 AVAILABLE_PERMISSIONS
192 )
193 assert access_boundary_rule.availability_condition == availability_condition
194
195 def test_constructor_required_params_only(self):
196 access_boundary_rule = make_access_boundary_rule(
197 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
198 )
199
200 assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
201 assert access_boundary_rule.available_permissions == tuple(
202 AVAILABLE_PERMISSIONS
203 )
204 assert access_boundary_rule.availability_condition is None
205
206 def test_setters(self):
207 availability_condition = make_availability_condition(
208 EXPRESSION, TITLE, DESCRIPTION
209 )
210 other_availability_condition = make_availability_condition(
211 OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
212 )
213 access_boundary_rule = make_access_boundary_rule(
214 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
215 )
216 access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE
217 access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS
218 access_boundary_rule.availability_condition = other_availability_condition
219
220 assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE
221 assert access_boundary_rule.available_permissions == tuple(
222 OTHER_AVAILABLE_PERMISSIONS
223 )
224 assert (
225 access_boundary_rule.availability_condition == other_availability_condition
226 )
227
228 def test_invalid_available_resource_type(self):
229 availability_condition = make_availability_condition(
230 EXPRESSION, TITLE, DESCRIPTION
231 )
232 with pytest.raises(TypeError) as excinfo:
233 make_access_boundary_rule(
234 None, AVAILABLE_PERMISSIONS, availability_condition
235 )
236
237 assert excinfo.match("The provided available_resource is not a string.")
238
239 def test_invalid_available_permissions_type(self):
240 availability_condition = make_availability_condition(
241 EXPRESSION, TITLE, DESCRIPTION
242 )
243 with pytest.raises(TypeError) as excinfo:
244 make_access_boundary_rule(
245 AVAILABLE_RESOURCE, [0, 1, 2], availability_condition
246 )
247
248 assert excinfo.match(
249 "Provided available_permissions are not a list of strings."
250 )
251
252 def test_invalid_available_permissions_value(self):
253 availability_condition = make_availability_condition(
254 EXPRESSION, TITLE, DESCRIPTION
255 )
256 with pytest.raises(ValueError) as excinfo:
257 make_access_boundary_rule(
258 AVAILABLE_RESOURCE,
259 ["roles/storage.objectViewer"],
260 availability_condition,
261 )
262
263 assert excinfo.match("available_permissions must be prefixed with 'inRole:'.")
264
265 def test_invalid_availability_condition_type(self):
266 with pytest.raises(TypeError) as excinfo:
267 make_access_boundary_rule(
268 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"}
269 )
270
271 assert excinfo.match(
272 "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
273 )
274
275 def test_to_json(self):
276 availability_condition = make_availability_condition(
277 EXPRESSION, TITLE, DESCRIPTION
278 )
279 access_boundary_rule = make_access_boundary_rule(
280 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
281 )
282
283 assert access_boundary_rule.to_json() == {
284 "availablePermissions": AVAILABLE_PERMISSIONS,
285 "availableResource": AVAILABLE_RESOURCE,
286 "availabilityCondition": {
287 "expression": EXPRESSION,
288 "title": TITLE,
289 "description": DESCRIPTION,
290 },
291 }
292
293 def test_to_json_required_params_only(self):
294 access_boundary_rule = make_access_boundary_rule(
295 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
296 )
297
298 assert access_boundary_rule.to_json() == {
299 "availablePermissions": AVAILABLE_PERMISSIONS,
300 "availableResource": AVAILABLE_RESOURCE,
301 }
302
303
304class TestCredentialAccessBoundary(object):
305 def test_constructor(self):
306 availability_condition = make_availability_condition(
307 EXPRESSION, TITLE, DESCRIPTION
308 )
309 access_boundary_rule = make_access_boundary_rule(
310 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
311 )
312 rules = [access_boundary_rule]
313 credential_access_boundary = make_credential_access_boundary(rules)
314
315 assert credential_access_boundary.rules == tuple(rules)
316
317 def test_setters(self):
318 availability_condition = make_availability_condition(
319 EXPRESSION, TITLE, DESCRIPTION
320 )
321 access_boundary_rule = make_access_boundary_rule(
322 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
323 )
324 rules = [access_boundary_rule]
325 other_availability_condition = make_availability_condition(
326 OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
327 )
328 other_access_boundary_rule = make_access_boundary_rule(
329 OTHER_AVAILABLE_RESOURCE,
330 OTHER_AVAILABLE_PERMISSIONS,
331 other_availability_condition,
332 )
333 other_rules = [other_access_boundary_rule]
334 credential_access_boundary = make_credential_access_boundary(rules)
335 credential_access_boundary.rules = other_rules
336
337 assert credential_access_boundary.rules == tuple(other_rules)
338
339 def test_add_rule(self):
340 availability_condition = make_availability_condition(
341 EXPRESSION, TITLE, DESCRIPTION
342 )
343 access_boundary_rule = make_access_boundary_rule(
344 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
345 )
346 rules = [access_boundary_rule] * 9
347 credential_access_boundary = make_credential_access_boundary(rules)
348
349 # Add one more rule. This should not raise an error.
350 additional_access_boundary_rule = make_access_boundary_rule(
351 OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS
352 )
353 credential_access_boundary.add_rule(additional_access_boundary_rule)
354
355 assert len(credential_access_boundary.rules) == 10
356 assert credential_access_boundary.rules[9] == additional_access_boundary_rule
357
358 def test_add_rule_invalid_value(self):
359 availability_condition = make_availability_condition(
360 EXPRESSION, TITLE, DESCRIPTION
361 )
362 access_boundary_rule = make_access_boundary_rule(
363 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
364 )
365 rules = [access_boundary_rule] * 10
366 credential_access_boundary = make_credential_access_boundary(rules)
367
368 # Add one more rule to exceed maximum allowed rules.
369 with pytest.raises(ValueError) as excinfo:
370 credential_access_boundary.add_rule(access_boundary_rule)
371
372 assert excinfo.match(
373 "Credential access boundary rules can have a maximum of 10 rules."
374 )
375 assert len(credential_access_boundary.rules) == 10
376
377 def test_add_rule_invalid_type(self):
378 availability_condition = make_availability_condition(
379 EXPRESSION, TITLE, DESCRIPTION
380 )
381 access_boundary_rule = make_access_boundary_rule(
382 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
383 )
384 rules = [access_boundary_rule]
385 credential_access_boundary = make_credential_access_boundary(rules)
386
387 # Add an invalid rule to exceed maximum allowed rules.
388 with pytest.raises(TypeError) as excinfo:
389 credential_access_boundary.add_rule("invalid")
390
391 assert excinfo.match(
392 "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
393 )
394 assert len(credential_access_boundary.rules) == 1
395 assert credential_access_boundary.rules[0] == access_boundary_rule
396
397 def test_invalid_rules_type(self):
398 with pytest.raises(TypeError) as excinfo:
399 make_credential_access_boundary(["invalid"])
400
401 assert excinfo.match(
402 "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
403 )
404
405 def test_invalid_rules_value(self):
406 availability_condition = make_availability_condition(
407 EXPRESSION, TITLE, DESCRIPTION
408 )
409 access_boundary_rule = make_access_boundary_rule(
410 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
411 )
412 too_many_rules = [access_boundary_rule] * 11
413 with pytest.raises(ValueError) as excinfo:
414 make_credential_access_boundary(too_many_rules)
415
416 assert excinfo.match(
417 "Credential access boundary rules can have a maximum of 10 rules."
418 )
419
420 def test_to_json(self):
421 availability_condition = make_availability_condition(
422 EXPRESSION, TITLE, DESCRIPTION
423 )
424 access_boundary_rule = make_access_boundary_rule(
425 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
426 )
427 rules = [access_boundary_rule]
428 credential_access_boundary = make_credential_access_boundary(rules)
429
430 assert credential_access_boundary.to_json() == {
431 "accessBoundary": {
432 "accessBoundaryRules": [
433 {
434 "availablePermissions": AVAILABLE_PERMISSIONS,
435 "availableResource": AVAILABLE_RESOURCE,
436 "availabilityCondition": {
437 "expression": EXPRESSION,
438 "title": TITLE,
439 "description": DESCRIPTION,
440 },
441 }
442 ]
443 }
444 }
bojeil-google2f5c3a62021-07-09 13:27:02 -0700445
446
447class TestCredentials(object):
448 @staticmethod
449 def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None):
450 availability_condition = make_availability_condition(
451 EXPRESSION, TITLE, DESCRIPTION
452 )
453 access_boundary_rule = make_access_boundary_rule(
454 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
455 )
456 rules = [access_boundary_rule]
457 credential_access_boundary = make_credential_access_boundary(rules)
458
459 return downscoped.Credentials(
460 source_credentials, credential_access_boundary, quota_project_id
461 )
462
463 @staticmethod
464 def make_mock_request(data, status=http_client.OK):
465 response = mock.create_autospec(transport.Response, instance=True)
466 response.status = status
467 response.data = json.dumps(data).encode("utf-8")
468
469 request = mock.create_autospec(transport.Request)
470 request.return_value = response
471
472 return request
473
474 @staticmethod
475 def assert_request_kwargs(request_kwargs, headers, request_data):
476 """Asserts the request was called with the expected parameters.
477 """
478 assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT
479 assert request_kwargs["method"] == "POST"
480 assert request_kwargs["headers"] == headers
481 assert request_kwargs["body"] is not None
482 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
483 for (k, v) in body_tuples:
484 assert v.decode("utf-8") == request_data[k.decode("utf-8")]
485 assert len(body_tuples) == len(request_data.keys())
486
487 def test_default_state(self):
488 credentials = self.make_credentials()
489
490 # No token acquired yet.
491 assert not credentials.token
492 assert not credentials.valid
493 # Expiration hasn't been set yet.
494 assert not credentials.expiry
495 assert not credentials.expired
496 # No quota project ID set.
497 assert not credentials.quota_project_id
498
499 def test_with_quota_project(self):
500 credentials = self.make_credentials()
501
502 assert not credentials.quota_project_id
503
504 quota_project_creds = credentials.with_quota_project("project-foo")
505
506 assert quota_project_creds.quota_project_id == "project-foo"
507
508 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
509 def test_refresh(self, unused_utcnow):
510 response = SUCCESS_RESPONSE.copy()
511 # Test custom expiration to confirm expiry is set correctly.
512 response["expires_in"] = 2800
513 expected_expiry = datetime.datetime.min + datetime.timedelta(
514 seconds=response["expires_in"]
515 )
516 headers = {"Content-Type": "application/x-www-form-urlencoded"}
517 request_data = {
518 "grant_type": GRANT_TYPE,
519 "subject_token": "ACCESS_TOKEN_1",
520 "subject_token_type": SUBJECT_TOKEN_TYPE,
521 "requested_token_type": REQUESTED_TOKEN_TYPE,
522 "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
523 }
524 request = self.make_mock_request(status=http_client.OK, data=response)
525 source_credentials = SourceCredentials()
526 credentials = self.make_credentials(source_credentials=source_credentials)
527
528 # Spy on calls to source credentials refresh to confirm the expected request
529 # instance is used.
530 with mock.patch.object(
531 source_credentials, "refresh", wraps=source_credentials.refresh
532 ) as wrapped_souce_cred_refresh:
533 credentials.refresh(request)
534
535 self.assert_request_kwargs(request.call_args[1], headers, request_data)
536 assert credentials.valid
537 assert credentials.expiry == expected_expiry
538 assert not credentials.expired
539 assert credentials.token == response["access_token"]
540 # Confirm source credentials called with the same request instance.
541 wrapped_souce_cred_refresh.assert_called_with(request)
542
bojeil-googledfad6612021-07-20 10:43:13 -0700543 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
544 def test_refresh_without_response_expires_in(self, unused_utcnow):
545 response = SUCCESS_RESPONSE.copy()
546 # Simulate the response is missing the expires_in field.
547 # The downscoped token expiration should match the source credentials
548 # expiration.
549 del response["expires_in"]
550 expected_expires_in = 1800
551 # Simulate the source credentials generates a token with 1800 second
552 # expiration time. The generated downscoped token should have the same
553 # expiration time.
554 source_credentials = SourceCredentials(expires_in=expected_expires_in)
555 expected_expiry = datetime.datetime.min + datetime.timedelta(
556 seconds=expected_expires_in
557 )
558 headers = {"Content-Type": "application/x-www-form-urlencoded"}
559 request_data = {
560 "grant_type": GRANT_TYPE,
561 "subject_token": "ACCESS_TOKEN_1",
562 "subject_token_type": SUBJECT_TOKEN_TYPE,
563 "requested_token_type": REQUESTED_TOKEN_TYPE,
564 "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
565 }
566 request = self.make_mock_request(status=http_client.OK, data=response)
567 credentials = self.make_credentials(source_credentials=source_credentials)
568
569 # Spy on calls to source credentials refresh to confirm the expected request
570 # instance is used.
571 with mock.patch.object(
572 source_credentials, "refresh", wraps=source_credentials.refresh
573 ) as wrapped_souce_cred_refresh:
574 credentials.refresh(request)
575
576 self.assert_request_kwargs(request.call_args[1], headers, request_data)
577 assert credentials.valid
578 assert credentials.expiry == expected_expiry
579 assert not credentials.expired
580 assert credentials.token == response["access_token"]
581 # Confirm source credentials called with the same request instance.
582 wrapped_souce_cred_refresh.assert_called_with(request)
583
bojeil-google2f5c3a62021-07-09 13:27:02 -0700584 def test_refresh_token_exchange_error(self):
585 request = self.make_mock_request(
586 status=http_client.BAD_REQUEST, data=ERROR_RESPONSE
587 )
588 credentials = self.make_credentials()
589
590 with pytest.raises(exceptions.OAuthError) as excinfo:
591 credentials.refresh(request)
592
593 assert excinfo.match(
594 r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749"
595 )
596 assert not credentials.expired
597 assert credentials.token is None
598
599 def test_refresh_source_credentials_refresh_error(self):
600 # Initialize downscoped credentials with source credentials that raise
601 # an error on refresh.
602 credentials = self.make_credentials(
603 source_credentials=SourceCredentials(raise_error=True)
604 )
605
606 with pytest.raises(exceptions.RefreshError) as excinfo:
607 credentials.refresh(mock.sentinel.request)
608
609 assert excinfo.match(r"Failed to refresh access token in source credentials.")
610 assert not credentials.expired
611 assert credentials.token is None
612
613 def test_apply_without_quota_project_id(self):
614 headers = {}
615 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
616 credentials = self.make_credentials()
617
618 credentials.refresh(request)
619 credentials.apply(headers)
620
621 assert headers == {
622 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
623 }
624
625 def test_apply_with_quota_project_id(self):
626 headers = {"other": "header-value"}
627 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
628 credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID)
629
630 credentials.refresh(request)
631 credentials.apply(headers)
632
633 assert headers == {
634 "other": "header-value",
635 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
636 "x-goog-user-project": QUOTA_PROJECT_ID,
637 }
638
639 def test_before_request(self):
640 headers = {"other": "header-value"}
641 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
642 credentials = self.make_credentials()
643
644 # First call should call refresh, setting the token.
645 credentials.before_request(request, "POST", "https://example.com/api", headers)
646
647 assert headers == {
648 "other": "header-value",
649 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
650 }
651
652 # Second call shouldn't call refresh (request should be untouched).
653 credentials.before_request(
654 mock.sentinel.request, "POST", "https://example.com/api", headers
655 )
656
657 assert headers == {
658 "other": "header-value",
659 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
660 }
661
662 @mock.patch("google.auth._helpers.utcnow")
663 def test_before_request_expired(self, utcnow):
664 headers = {}
665 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
666 credentials = self.make_credentials()
667 credentials.token = "token"
668 utcnow.return_value = datetime.datetime.min
669 # Set the expiration to one second more than now plus the clock skew
670 # accommodation. These credentials should be valid.
671 credentials.expiry = (
arithmetic1728738611b2021-09-09 17:09:54 -0700672 datetime.datetime.min
673 + _helpers.REFRESH_THRESHOLD
674 + datetime.timedelta(seconds=1)
bojeil-google2f5c3a62021-07-09 13:27:02 -0700675 )
676
677 assert credentials.valid
678 assert not credentials.expired
679
680 credentials.before_request(request, "POST", "https://example.com/api", headers)
681
682 # Cached token should be used.
683 assert headers == {"authorization": "Bearer token"}
684
685 # Next call should simulate 1 second passed.
686 utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
687
688 assert not credentials.valid
689 assert credentials.expired
690
691 credentials.before_request(request, "POST", "https://example.com/api", headers)
692
693 # New token should be retrieved.
694 assert headers == {
695 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
696 }