blob: ac60e5b00d56185530a8986b1900aa01d9381af3 [file] [log] [blame]
bojeil-googled8839212021-07-08 10:56:22 -07001# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
bojeil-google2f5c3a62021-07-09 13:27:02 -070015import datetime
16import json
bojeil-googled8839212021-07-08 10:56:22 -070017
bojeil-google2f5c3a62021-07-09 13:27:02 -070018import mock
19import pytest
20from six.moves import http_client
21from six.moves import urllib
22
23from google.auth import _helpers
24from google.auth import credentials
bojeil-googled8839212021-07-08 10:56:22 -070025from google.auth import downscoped
bojeil-google2f5c3a62021-07-09 13:27:02 -070026from google.auth import exceptions
27from google.auth import transport
bojeil-googled8839212021-07-08 10:56:22 -070028
29
30EXPRESSION = (
31 "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
32)
33TITLE = "customer-a-objects"
34DESCRIPTION = (
35 "Condition to make permissions available for objects starting with customer-a"
36)
37AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/example-bucket"
38AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectViewer"]
39
40OTHER_EXPRESSION = (
41 "resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-b')"
42)
43OTHER_TITLE = "customer-b-objects"
44OTHER_DESCRIPTION = (
45 "Condition to make permissions available for objects starting with customer-b"
46)
47OTHER_AVAILABLE_RESOURCE = "//storage.googleapis.com/projects/_/buckets/other-bucket"
48OTHER_AVAILABLE_PERMISSIONS = ["inRole:roles/storage.objectCreator"]
bojeil-google2f5c3a62021-07-09 13:27:02 -070049QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID"
50GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
51REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
52TOKEN_EXCHANGE_ENDPOINT = "https://sts.googleapis.com/v1/token"
53SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
54SUCCESS_RESPONSE = {
55 "access_token": "ACCESS_TOKEN",
56 "issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
57 "token_type": "Bearer",
58 "expires_in": 3600,
59}
60ERROR_RESPONSE = {
61 "error": "invalid_grant",
62 "error_description": "Subject token is invalid.",
63 "error_uri": "https://tools.ietf.org/html/rfc6749",
64}
65CREDENTIAL_ACCESS_BOUNDARY_JSON = {
66 "accessBoundary": {
67 "accessBoundaryRules": [
68 {
69 "availablePermissions": AVAILABLE_PERMISSIONS,
70 "availableResource": AVAILABLE_RESOURCE,
71 "availabilityCondition": {
72 "expression": EXPRESSION,
73 "title": TITLE,
74 "description": DESCRIPTION,
75 },
76 }
77 ]
78 }
79}
80
81
82class SourceCredentials(credentials.Credentials):
83 def __init__(self, raise_error=False):
84 super(SourceCredentials, self).__init__()
85 self._counter = 0
86 self._raise_error = raise_error
87
88 def refresh(self, request):
89 if self._raise_error:
90 raise exceptions.RefreshError(
91 "Failed to refresh access token in source credentials."
92 )
93 now = _helpers.utcnow()
94 self._counter += 1
95 self.token = "ACCESS_TOKEN_{}".format(self._counter)
96 self.expiry = now + datetime.timedelta(seconds=3600)
bojeil-googled8839212021-07-08 10:56:22 -070097
98
99def make_availability_condition(expression, title=None, description=None):
100 return downscoped.AvailabilityCondition(expression, title, description)
101
102
103def make_access_boundary_rule(
104 available_resource, available_permissions, availability_condition=None
105):
106 return downscoped.AccessBoundaryRule(
107 available_resource, available_permissions, availability_condition
108 )
109
110
111def make_credential_access_boundary(rules):
112 return downscoped.CredentialAccessBoundary(rules)
113
114
115class TestAvailabilityCondition(object):
116 def test_constructor(self):
117 availability_condition = make_availability_condition(
118 EXPRESSION, TITLE, DESCRIPTION
119 )
120
121 assert availability_condition.expression == EXPRESSION
122 assert availability_condition.title == TITLE
123 assert availability_condition.description == DESCRIPTION
124
125 def test_constructor_required_params_only(self):
126 availability_condition = make_availability_condition(EXPRESSION)
127
128 assert availability_condition.expression == EXPRESSION
129 assert availability_condition.title is None
130 assert availability_condition.description is None
131
132 def test_setters(self):
133 availability_condition = make_availability_condition(
134 EXPRESSION, TITLE, DESCRIPTION
135 )
136 availability_condition.expression = OTHER_EXPRESSION
137 availability_condition.title = OTHER_TITLE
138 availability_condition.description = OTHER_DESCRIPTION
139
140 assert availability_condition.expression == OTHER_EXPRESSION
141 assert availability_condition.title == OTHER_TITLE
142 assert availability_condition.description == OTHER_DESCRIPTION
143
144 def test_invalid_expression_type(self):
145 with pytest.raises(TypeError) as excinfo:
146 make_availability_condition([EXPRESSION], TITLE, DESCRIPTION)
147
148 assert excinfo.match("The provided expression is not a string.")
149
150 def test_invalid_title_type(self):
151 with pytest.raises(TypeError) as excinfo:
152 make_availability_condition(EXPRESSION, False, DESCRIPTION)
153
154 assert excinfo.match("The provided title is not a string or None.")
155
156 def test_invalid_description_type(self):
157 with pytest.raises(TypeError) as excinfo:
158 make_availability_condition(EXPRESSION, TITLE, False)
159
160 assert excinfo.match("The provided description is not a string or None.")
161
162 def test_to_json_required_params_only(self):
163 availability_condition = make_availability_condition(EXPRESSION)
164
165 assert availability_condition.to_json() == {"expression": EXPRESSION}
166
167 def test_to_json_(self):
168 availability_condition = make_availability_condition(
169 EXPRESSION, TITLE, DESCRIPTION
170 )
171
172 assert availability_condition.to_json() == {
173 "expression": EXPRESSION,
174 "title": TITLE,
175 "description": DESCRIPTION,
176 }
177
178
179class TestAccessBoundaryRule(object):
180 def test_constructor(self):
181 availability_condition = make_availability_condition(
182 EXPRESSION, TITLE, DESCRIPTION
183 )
184 access_boundary_rule = make_access_boundary_rule(
185 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
186 )
187
188 assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
189 assert access_boundary_rule.available_permissions == tuple(
190 AVAILABLE_PERMISSIONS
191 )
192 assert access_boundary_rule.availability_condition == availability_condition
193
194 def test_constructor_required_params_only(self):
195 access_boundary_rule = make_access_boundary_rule(
196 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
197 )
198
199 assert access_boundary_rule.available_resource == AVAILABLE_RESOURCE
200 assert access_boundary_rule.available_permissions == tuple(
201 AVAILABLE_PERMISSIONS
202 )
203 assert access_boundary_rule.availability_condition is None
204
205 def test_setters(self):
206 availability_condition = make_availability_condition(
207 EXPRESSION, TITLE, DESCRIPTION
208 )
209 other_availability_condition = make_availability_condition(
210 OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
211 )
212 access_boundary_rule = make_access_boundary_rule(
213 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
214 )
215 access_boundary_rule.available_resource = OTHER_AVAILABLE_RESOURCE
216 access_boundary_rule.available_permissions = OTHER_AVAILABLE_PERMISSIONS
217 access_boundary_rule.availability_condition = other_availability_condition
218
219 assert access_boundary_rule.available_resource == OTHER_AVAILABLE_RESOURCE
220 assert access_boundary_rule.available_permissions == tuple(
221 OTHER_AVAILABLE_PERMISSIONS
222 )
223 assert (
224 access_boundary_rule.availability_condition == other_availability_condition
225 )
226
227 def test_invalid_available_resource_type(self):
228 availability_condition = make_availability_condition(
229 EXPRESSION, TITLE, DESCRIPTION
230 )
231 with pytest.raises(TypeError) as excinfo:
232 make_access_boundary_rule(
233 None, AVAILABLE_PERMISSIONS, availability_condition
234 )
235
236 assert excinfo.match("The provided available_resource is not a string.")
237
238 def test_invalid_available_permissions_type(self):
239 availability_condition = make_availability_condition(
240 EXPRESSION, TITLE, DESCRIPTION
241 )
242 with pytest.raises(TypeError) as excinfo:
243 make_access_boundary_rule(
244 AVAILABLE_RESOURCE, [0, 1, 2], availability_condition
245 )
246
247 assert excinfo.match(
248 "Provided available_permissions are not a list of strings."
249 )
250
251 def test_invalid_available_permissions_value(self):
252 availability_condition = make_availability_condition(
253 EXPRESSION, TITLE, DESCRIPTION
254 )
255 with pytest.raises(ValueError) as excinfo:
256 make_access_boundary_rule(
257 AVAILABLE_RESOURCE,
258 ["roles/storage.objectViewer"],
259 availability_condition,
260 )
261
262 assert excinfo.match("available_permissions must be prefixed with 'inRole:'.")
263
264 def test_invalid_availability_condition_type(self):
265 with pytest.raises(TypeError) as excinfo:
266 make_access_boundary_rule(
267 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, {"foo": "bar"}
268 )
269
270 assert excinfo.match(
271 "The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
272 )
273
274 def test_to_json(self):
275 availability_condition = make_availability_condition(
276 EXPRESSION, TITLE, DESCRIPTION
277 )
278 access_boundary_rule = make_access_boundary_rule(
279 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
280 )
281
282 assert access_boundary_rule.to_json() == {
283 "availablePermissions": AVAILABLE_PERMISSIONS,
284 "availableResource": AVAILABLE_RESOURCE,
285 "availabilityCondition": {
286 "expression": EXPRESSION,
287 "title": TITLE,
288 "description": DESCRIPTION,
289 },
290 }
291
292 def test_to_json_required_params_only(self):
293 access_boundary_rule = make_access_boundary_rule(
294 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS
295 )
296
297 assert access_boundary_rule.to_json() == {
298 "availablePermissions": AVAILABLE_PERMISSIONS,
299 "availableResource": AVAILABLE_RESOURCE,
300 }
301
302
303class TestCredentialAccessBoundary(object):
304 def test_constructor(self):
305 availability_condition = make_availability_condition(
306 EXPRESSION, TITLE, DESCRIPTION
307 )
308 access_boundary_rule = make_access_boundary_rule(
309 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
310 )
311 rules = [access_boundary_rule]
312 credential_access_boundary = make_credential_access_boundary(rules)
313
314 assert credential_access_boundary.rules == tuple(rules)
315
316 def test_setters(self):
317 availability_condition = make_availability_condition(
318 EXPRESSION, TITLE, DESCRIPTION
319 )
320 access_boundary_rule = make_access_boundary_rule(
321 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
322 )
323 rules = [access_boundary_rule]
324 other_availability_condition = make_availability_condition(
325 OTHER_EXPRESSION, OTHER_TITLE, OTHER_DESCRIPTION
326 )
327 other_access_boundary_rule = make_access_boundary_rule(
328 OTHER_AVAILABLE_RESOURCE,
329 OTHER_AVAILABLE_PERMISSIONS,
330 other_availability_condition,
331 )
332 other_rules = [other_access_boundary_rule]
333 credential_access_boundary = make_credential_access_boundary(rules)
334 credential_access_boundary.rules = other_rules
335
336 assert credential_access_boundary.rules == tuple(other_rules)
337
338 def test_add_rule(self):
339 availability_condition = make_availability_condition(
340 EXPRESSION, TITLE, DESCRIPTION
341 )
342 access_boundary_rule = make_access_boundary_rule(
343 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
344 )
345 rules = [access_boundary_rule] * 9
346 credential_access_boundary = make_credential_access_boundary(rules)
347
348 # Add one more rule. This should not raise an error.
349 additional_access_boundary_rule = make_access_boundary_rule(
350 OTHER_AVAILABLE_RESOURCE, OTHER_AVAILABLE_PERMISSIONS
351 )
352 credential_access_boundary.add_rule(additional_access_boundary_rule)
353
354 assert len(credential_access_boundary.rules) == 10
355 assert credential_access_boundary.rules[9] == additional_access_boundary_rule
356
357 def test_add_rule_invalid_value(self):
358 availability_condition = make_availability_condition(
359 EXPRESSION, TITLE, DESCRIPTION
360 )
361 access_boundary_rule = make_access_boundary_rule(
362 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
363 )
364 rules = [access_boundary_rule] * 10
365 credential_access_boundary = make_credential_access_boundary(rules)
366
367 # Add one more rule to exceed maximum allowed rules.
368 with pytest.raises(ValueError) as excinfo:
369 credential_access_boundary.add_rule(access_boundary_rule)
370
371 assert excinfo.match(
372 "Credential access boundary rules can have a maximum of 10 rules."
373 )
374 assert len(credential_access_boundary.rules) == 10
375
376 def test_add_rule_invalid_type(self):
377 availability_condition = make_availability_condition(
378 EXPRESSION, TITLE, DESCRIPTION
379 )
380 access_boundary_rule = make_access_boundary_rule(
381 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
382 )
383 rules = [access_boundary_rule]
384 credential_access_boundary = make_credential_access_boundary(rules)
385
386 # Add an invalid rule to exceed maximum allowed rules.
387 with pytest.raises(TypeError) as excinfo:
388 credential_access_boundary.add_rule("invalid")
389
390 assert excinfo.match(
391 "The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
392 )
393 assert len(credential_access_boundary.rules) == 1
394 assert credential_access_boundary.rules[0] == access_boundary_rule
395
396 def test_invalid_rules_type(self):
397 with pytest.raises(TypeError) as excinfo:
398 make_credential_access_boundary(["invalid"])
399
400 assert excinfo.match(
401 "List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
402 )
403
404 def test_invalid_rules_value(self):
405 availability_condition = make_availability_condition(
406 EXPRESSION, TITLE, DESCRIPTION
407 )
408 access_boundary_rule = make_access_boundary_rule(
409 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
410 )
411 too_many_rules = [access_boundary_rule] * 11
412 with pytest.raises(ValueError) as excinfo:
413 make_credential_access_boundary(too_many_rules)
414
415 assert excinfo.match(
416 "Credential access boundary rules can have a maximum of 10 rules."
417 )
418
419 def test_to_json(self):
420 availability_condition = make_availability_condition(
421 EXPRESSION, TITLE, DESCRIPTION
422 )
423 access_boundary_rule = make_access_boundary_rule(
424 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
425 )
426 rules = [access_boundary_rule]
427 credential_access_boundary = make_credential_access_boundary(rules)
428
429 assert credential_access_boundary.to_json() == {
430 "accessBoundary": {
431 "accessBoundaryRules": [
432 {
433 "availablePermissions": AVAILABLE_PERMISSIONS,
434 "availableResource": AVAILABLE_RESOURCE,
435 "availabilityCondition": {
436 "expression": EXPRESSION,
437 "title": TITLE,
438 "description": DESCRIPTION,
439 },
440 }
441 ]
442 }
443 }
bojeil-google2f5c3a62021-07-09 13:27:02 -0700444
445
446class TestCredentials(object):
447 @staticmethod
448 def make_credentials(source_credentials=SourceCredentials(), quota_project_id=None):
449 availability_condition = make_availability_condition(
450 EXPRESSION, TITLE, DESCRIPTION
451 )
452 access_boundary_rule = make_access_boundary_rule(
453 AVAILABLE_RESOURCE, AVAILABLE_PERMISSIONS, availability_condition
454 )
455 rules = [access_boundary_rule]
456 credential_access_boundary = make_credential_access_boundary(rules)
457
458 return downscoped.Credentials(
459 source_credentials, credential_access_boundary, quota_project_id
460 )
461
462 @staticmethod
463 def make_mock_request(data, status=http_client.OK):
464 response = mock.create_autospec(transport.Response, instance=True)
465 response.status = status
466 response.data = json.dumps(data).encode("utf-8")
467
468 request = mock.create_autospec(transport.Request)
469 request.return_value = response
470
471 return request
472
473 @staticmethod
474 def assert_request_kwargs(request_kwargs, headers, request_data):
475 """Asserts the request was called with the expected parameters.
476 """
477 assert request_kwargs["url"] == TOKEN_EXCHANGE_ENDPOINT
478 assert request_kwargs["method"] == "POST"
479 assert request_kwargs["headers"] == headers
480 assert request_kwargs["body"] is not None
481 body_tuples = urllib.parse.parse_qsl(request_kwargs["body"])
482 for (k, v) in body_tuples:
483 assert v.decode("utf-8") == request_data[k.decode("utf-8")]
484 assert len(body_tuples) == len(request_data.keys())
485
486 def test_default_state(self):
487 credentials = self.make_credentials()
488
489 # No token acquired yet.
490 assert not credentials.token
491 assert not credentials.valid
492 # Expiration hasn't been set yet.
493 assert not credentials.expiry
494 assert not credentials.expired
495 # No quota project ID set.
496 assert not credentials.quota_project_id
497
498 def test_with_quota_project(self):
499 credentials = self.make_credentials()
500
501 assert not credentials.quota_project_id
502
503 quota_project_creds = credentials.with_quota_project("project-foo")
504
505 assert quota_project_creds.quota_project_id == "project-foo"
506
507 @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
508 def test_refresh(self, unused_utcnow):
509 response = SUCCESS_RESPONSE.copy()
510 # Test custom expiration to confirm expiry is set correctly.
511 response["expires_in"] = 2800
512 expected_expiry = datetime.datetime.min + datetime.timedelta(
513 seconds=response["expires_in"]
514 )
515 headers = {"Content-Type": "application/x-www-form-urlencoded"}
516 request_data = {
517 "grant_type": GRANT_TYPE,
518 "subject_token": "ACCESS_TOKEN_1",
519 "subject_token_type": SUBJECT_TOKEN_TYPE,
520 "requested_token_type": REQUESTED_TOKEN_TYPE,
521 "options": urllib.parse.quote(json.dumps(CREDENTIAL_ACCESS_BOUNDARY_JSON)),
522 }
523 request = self.make_mock_request(status=http_client.OK, data=response)
524 source_credentials = SourceCredentials()
525 credentials = self.make_credentials(source_credentials=source_credentials)
526
527 # Spy on calls to source credentials refresh to confirm the expected request
528 # instance is used.
529 with mock.patch.object(
530 source_credentials, "refresh", wraps=source_credentials.refresh
531 ) as wrapped_souce_cred_refresh:
532 credentials.refresh(request)
533
534 self.assert_request_kwargs(request.call_args[1], headers, request_data)
535 assert credentials.valid
536 assert credentials.expiry == expected_expiry
537 assert not credentials.expired
538 assert credentials.token == response["access_token"]
539 # Confirm source credentials called with the same request instance.
540 wrapped_souce_cred_refresh.assert_called_with(request)
541
542 def test_refresh_token_exchange_error(self):
543 request = self.make_mock_request(
544 status=http_client.BAD_REQUEST, data=ERROR_RESPONSE
545 )
546 credentials = self.make_credentials()
547
548 with pytest.raises(exceptions.OAuthError) as excinfo:
549 credentials.refresh(request)
550
551 assert excinfo.match(
552 r"Error code invalid_grant: Subject token is invalid. - https://tools.ietf.org/html/rfc6749"
553 )
554 assert not credentials.expired
555 assert credentials.token is None
556
557 def test_refresh_source_credentials_refresh_error(self):
558 # Initialize downscoped credentials with source credentials that raise
559 # an error on refresh.
560 credentials = self.make_credentials(
561 source_credentials=SourceCredentials(raise_error=True)
562 )
563
564 with pytest.raises(exceptions.RefreshError) as excinfo:
565 credentials.refresh(mock.sentinel.request)
566
567 assert excinfo.match(r"Failed to refresh access token in source credentials.")
568 assert not credentials.expired
569 assert credentials.token is None
570
571 def test_apply_without_quota_project_id(self):
572 headers = {}
573 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
574 credentials = self.make_credentials()
575
576 credentials.refresh(request)
577 credentials.apply(headers)
578
579 assert headers == {
580 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
581 }
582
583 def test_apply_with_quota_project_id(self):
584 headers = {"other": "header-value"}
585 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
586 credentials = self.make_credentials(quota_project_id=QUOTA_PROJECT_ID)
587
588 credentials.refresh(request)
589 credentials.apply(headers)
590
591 assert headers == {
592 "other": "header-value",
593 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
594 "x-goog-user-project": QUOTA_PROJECT_ID,
595 }
596
597 def test_before_request(self):
598 headers = {"other": "header-value"}
599 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
600 credentials = self.make_credentials()
601
602 # First call should call refresh, setting the token.
603 credentials.before_request(request, "POST", "https://example.com/api", headers)
604
605 assert headers == {
606 "other": "header-value",
607 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
608 }
609
610 # Second call shouldn't call refresh (request should be untouched).
611 credentials.before_request(
612 mock.sentinel.request, "POST", "https://example.com/api", headers
613 )
614
615 assert headers == {
616 "other": "header-value",
617 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"]),
618 }
619
620 @mock.patch("google.auth._helpers.utcnow")
621 def test_before_request_expired(self, utcnow):
622 headers = {}
623 request = self.make_mock_request(status=http_client.OK, data=SUCCESS_RESPONSE)
624 credentials = self.make_credentials()
625 credentials.token = "token"
626 utcnow.return_value = datetime.datetime.min
627 # Set the expiration to one second more than now plus the clock skew
628 # accommodation. These credentials should be valid.
629 credentials.expiry = (
630 datetime.datetime.min + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
631 )
632
633 assert credentials.valid
634 assert not credentials.expired
635
636 credentials.before_request(request, "POST", "https://example.com/api", headers)
637
638 # Cached token should be used.
639 assert headers == {"authorization": "Bearer token"}
640
641 # Next call should simulate 1 second passed.
642 utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1)
643
644 assert not credentials.valid
645 assert credentials.expired
646
647 credentials.before_request(request, "POST", "https://example.com/api", headers)
648
649 # New token should be retrieved.
650 assert headers == {
651 "authorization": "Bearer {}".format(SUCCESS_RESPONSE["access_token"])
652 }