blob: 814369e43903bf8e68fb4699869eb16cace44fa0 [file] [log] [blame]
Jon Wayne Parrott71ce2a02016-10-14 14:08:10 -07001# Copyright 2016 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16
17from google.auth import credentials
18
19
20class CredentialsImpl(credentials.Credentials):
21 def refresh(self, request):
22 self.token = request
23
24
25def test_credentials_constructor():
26 credentials = CredentialsImpl()
27 assert not credentials.token
28 assert not credentials.expiry
29 assert not credentials.expired
30 assert not credentials.valid
31
32
33def test_expired_and_valid():
34 credentials = CredentialsImpl()
35 credentials.token = 'token'
36
37 assert credentials.valid
38 assert not credentials.expired
39
40 credentials.expiry = (
41 datetime.datetime.utcnow() - datetime.timedelta(seconds=60))
42
43 assert not credentials.valid
44 assert credentials.expired
45
46
47def test_before_request():
48 credentials = CredentialsImpl()
49 request = 'token'
50 headers = {}
51
52 # First call should call refresh, setting the token.
53 credentials.before_request(request, 'http://example.com', 'GET', headers)
54 assert credentials.valid
55 assert credentials.token == 'token'
56 assert headers['authorization'] == 'Bearer token'
57
58 request = 'token2'
59 headers = {}
60
61 # Second call shouldn't call refresh.
62 credentials.before_request(request, 'http://example.com', 'GET', headers)
63 assert credentials.valid
64 assert credentials.token == 'token'
65 assert headers['authorization'] == 'Bearer token'
66
67
68class ScopedCredentialsImpl(credentials.Scoped, CredentialsImpl):
69 @property
70 def requires_scopes(self):
71 return super(ScopedCredentialsImpl, self).requires_scopes
72
73 def with_scopes(self, scopes):
74 raise NotImplementedError
75
76
77def test_scoped_credentials_constructor():
78 credentials = ScopedCredentialsImpl()
79 assert credentials._scopes is None
80
81
82def test_scoped_credentials_scopes():
83 credentials = ScopedCredentialsImpl()
84 credentials._scopes = ['one', 'two']
85 assert credentials.scopes == ['one', 'two']
86 assert credentials.has_scopes(['one'])
87 assert credentials.has_scopes(['two'])
88 assert credentials.has_scopes(['one', 'two'])
89 assert not credentials.has_scopes(['three'])
90
91
92def test_scoped_credentials_requires_scopes():
93 credentials = ScopedCredentialsImpl()
94 assert not credentials.requires_scopes
Jon Wayne Parrottf89a3cf2016-10-31 10:52:57 -070095
96
97class RequiresScopedCredentialsImpl(credentials.Scoped, CredentialsImpl):
98 def __init__(self, scopes=None):
99 super(RequiresScopedCredentialsImpl, self).__init__()
100 self._scopes = scopes
101
102 @property
103 def requires_scopes(self):
104 return not self.scopes
105
106 def with_scopes(self, scopes):
107 return RequiresScopedCredentialsImpl(scopes=scopes)
108
109
110def test_create_scoped_if_required_scoped():
111 unscoped_credentials = RequiresScopedCredentialsImpl()
112 scoped_credentials = credentials.with_scopes_if_required(
113 unscoped_credentials, ['one', 'two'])
114
115 assert scoped_credentials is not unscoped_credentials
116 assert not scoped_credentials.requires_scopes
117 assert scoped_credentials.has_scopes(['one', 'two'])
118
119
120def test_create_scoped_if_required_not_scopes():
121 unscoped_credentials = CredentialsImpl()
122 scoped_credentials = credentials.with_scopes_if_required(
123 unscoped_credentials, ['one', 'two'])
124
125 assert scoped_credentials is unscoped_credentials