blob: 9945401d4ffc9f0864c858da4dc76b690178e11a [file] [log] [blame]
salrashid1231fbc6792018-11-09 11:05:34 -08001# Copyright 2018 Google Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import datetime
16import json
17import os
18
19import mock
20import pytest
21from six.moves import http_client
22
23from google.auth import _helpers
24from google.auth import crypt
25from google.auth import exceptions
26from google.auth import impersonated_credentials
27from google.auth import transport
28from google.auth.impersonated_credentials import Credentials
29from google.oauth2 import service_account
30
31DATA_DIR = os.path.join(os.path.dirname(__file__), '', 'data')
32
33with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
34 PRIVATE_KEY_BYTES = fh.read()
35
36SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
37
salrashid1237a8641a2019-08-07 14:31:33 -070038ID_TOKEN_DATA = ('eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew'
39 'Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc'
40 'zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle'
41 'HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L'
42 'y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN'
43 'zA4NTY4In0.redacted')
44ID_TOKEN_EXPIRY = 1564475051
45
salrashid1231fbc6792018-11-09 11:05:34 -080046with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
47 SERVICE_ACCOUNT_INFO = json.load(fh)
48
49SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
50TOKEN_URI = 'https://example.com/oauth2/token'
51
52
53@pytest.fixture
54def mock_donor_credentials():
55 with mock.patch('google.oauth2._client.jwt_grant', autospec=True) as grant:
56 grant.return_value = (
57 "source token",
58 _helpers.utcnow() + datetime.timedelta(seconds=500),
59 {})
60 yield grant
61
62
salrashid1237a8641a2019-08-07 14:31:33 -070063class MockResponse:
64 def __init__(self, json_data, status_code):
65 self.json_data = json_data
66 self.status_code = status_code
67
68 def json(self):
69 return self.json_data
70
71
72@pytest.fixture
73def mock_authorizedsession_sign():
74 with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
75 autospec=True) as auth_session:
76 data = {
77 "keyId": "1",
78 "signedBlob": "c2lnbmF0dXJl"
79 }
80 auth_session.return_value = MockResponse(data, http_client.OK)
81 yield auth_session
82
83
84@pytest.fixture
85def mock_authorizedsession_idtoken():
86 with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
87 autospec=True) as auth_session:
88 data = {
89 "token": ID_TOKEN_DATA
90 }
91 auth_session.return_value = MockResponse(data, http_client.OK)
92 yield auth_session
93
94
salrashid1231fbc6792018-11-09 11:05:34 -080095class TestImpersonatedCredentials(object):
96
97 SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
98 TARGET_PRINCIPAL = 'impersonated@project.iam.gserviceaccount.com'
99 TARGET_SCOPES = ['https://www.googleapis.com/auth/devstorage.read_only']
100 DELEGATES = []
101 LIFETIME = 3600
102 SOURCE_CREDENTIALS = service_account.Credentials(
103 SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI)
104
salrashid1237a8641a2019-08-07 14:31:33 -0700105 def make_credentials(self, lifetime=LIFETIME,
106 target_principal=TARGET_PRINCIPAL):
107
salrashid1231fbc6792018-11-09 11:05:34 -0800108 return Credentials(
109 source_credentials=self.SOURCE_CREDENTIALS,
salrashid1237a8641a2019-08-07 14:31:33 -0700110 target_principal=target_principal,
salrashid1231fbc6792018-11-09 11:05:34 -0800111 target_scopes=self.TARGET_SCOPES,
112 delegates=self.DELEGATES,
113 lifetime=lifetime)
114
115 def test_default_state(self):
116 credentials = self.make_credentials()
117 assert not credentials.valid
118 assert credentials.expired
119
120 def make_request(self, data, status=http_client.OK,
121 headers=None, side_effect=None):
122 response = mock.create_autospec(transport.Response, instance=False)
123 response.status = status
124 response.data = _helpers.to_bytes(data)
125 response.headers = headers or {}
126
127 request = mock.create_autospec(transport.Request, instance=False)
128 request.side_effect = side_effect
129 request.return_value = response
130
131 return request
132
133 def test_refresh_success(self, mock_donor_credentials):
134 credentials = self.make_credentials(lifetime=None)
135 token = 'token'
136
137 expire_time = (
138 _helpers.utcnow().replace(microsecond=0) +
139 datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
140 response_body = {
141 "accessToken": token,
142 "expireTime": expire_time
143 }
144
145 request = self.make_request(
146 data=json.dumps(response_body),
147 status=http_client.OK)
148
149 credentials.refresh(request)
150
151 assert credentials.valid
152 assert not credentials.expired
153
154 def test_refresh_failure_malformed_expire_time(
155 self, mock_donor_credentials):
156 credentials = self.make_credentials(lifetime=None)
157 token = 'token'
158
159 expire_time = (
160 _helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat('T')
161 response_body = {
162 "accessToken": token,
163 "expireTime": expire_time
164 }
165
166 request = self.make_request(
167 data=json.dumps(response_body),
168 status=http_client.OK)
169
170 with pytest.raises(exceptions.RefreshError) as excinfo:
171 credentials.refresh(request)
172
173 assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
174
175 assert not credentials.valid
176 assert credentials.expired
177
salrashid1231fbc6792018-11-09 11:05:34 -0800178 def test_refresh_failure_unauthorzed(self, mock_donor_credentials):
179 credentials = self.make_credentials(lifetime=None)
180
181 response_body = {
182 "error": {
183 "code": 403,
184 "message": "The caller does not have permission",
185 "status": "PERMISSION_DENIED"
186 }
187 }
188
189 request = self.make_request(
190 data=json.dumps(response_body),
191 status=http_client.UNAUTHORIZED)
192
193 with pytest.raises(exceptions.RefreshError) as excinfo:
194 credentials.refresh(request)
195
196 assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
197
198 assert not credentials.valid
199 assert credentials.expired
200
201 def test_refresh_failure_http_error(self, mock_donor_credentials):
202 credentials = self.make_credentials(lifetime=None)
203
204 response_body = {}
205
206 request = self.make_request(
207 data=json.dumps(response_body),
208 status=http_client.HTTPException)
209
210 with pytest.raises(exceptions.RefreshError) as excinfo:
211 credentials.refresh(request)
212
213 assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
214
215 assert not credentials.valid
216 assert credentials.expired
217
218 def test_expired(self):
219 credentials = self.make_credentials(lifetime=None)
220 assert credentials.expired
salrashid1237a8641a2019-08-07 14:31:33 -0700221
222 def test_signer(self):
223 credentials = self.make_credentials()
224 assert isinstance(credentials.signer,
225 impersonated_credentials.Credentials)
226
227 def test_signer_email(self):
228 credentials = self.make_credentials(
229 target_principal=self.TARGET_PRINCIPAL)
230 assert credentials.signer_email == self.TARGET_PRINCIPAL
231
232 def test_service_account_email(self):
233 credentials = self.make_credentials(
234 target_principal=self.TARGET_PRINCIPAL)
235 assert credentials.service_account_email == self.TARGET_PRINCIPAL
236
237 def test_sign_bytes(self, mock_donor_credentials,
238 mock_authorizedsession_sign):
239 credentials = self.make_credentials(lifetime=None)
240 token = 'token'
241
242 expire_time = (
243 _helpers.utcnow().replace(microsecond=0) +
244 datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
245 token_response_body = {
246 "accessToken": token,
247 "expireTime": expire_time
248 }
249
250 response = mock.create_autospec(transport.Response, instance=False)
251 response.status = http_client.OK
252 response.data = _helpers.to_bytes(json.dumps(token_response_body))
253
254 request = mock.create_autospec(transport.Request, instance=False)
255 request.return_value = response
256
257 credentials.refresh(request)
258
259 assert credentials.valid
260 assert not credentials.expired
261
262 signature = credentials.sign_bytes(b'signed bytes')
263 assert signature == b'signature'
264
265 def test_id_token_success(self, mock_donor_credentials,
266 mock_authorizedsession_idtoken):
267 credentials = self.make_credentials(lifetime=None)
268 token = 'token'
269 target_audience = 'https://foo.bar'
270
271 expire_time = (
272 _helpers.utcnow().replace(microsecond=0) +
273 datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
274 response_body = {
275 "accessToken": token,
276 "expireTime": expire_time
277 }
278
279 request = self.make_request(
280 data=json.dumps(response_body),
281 status=http_client.OK)
282
283 credentials.refresh(request)
284
285 assert credentials.valid
286 assert not credentials.expired
287
288 id_creds = impersonated_credentials.IDTokenCredentials(
289 credentials, target_audience=target_audience)
290 id_creds.refresh(request)
291
292 assert id_creds.token == ID_TOKEN_DATA
293 assert id_creds.expiry == datetime.datetime.fromtimestamp(
294 ID_TOKEN_EXPIRY)
295
296 def test_id_token_from_credential(self, mock_donor_credentials,
297 mock_authorizedsession_idtoken):
298 credentials = self.make_credentials(lifetime=None)
299 token = 'token'
300 target_audience = 'https://foo.bar'
301
302 expire_time = (
303 _helpers.utcnow().replace(microsecond=0) +
304 datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
305 response_body = {
306 "accessToken": token,
307 "expireTime": expire_time
308 }
309
310 request = self.make_request(
311 data=json.dumps(response_body),
312 status=http_client.OK)
313
314 credentials.refresh(request)
315
316 assert credentials.valid
317 assert not credentials.expired
318
319 id_creds = impersonated_credentials.IDTokenCredentials(
320 credentials, target_audience=target_audience)
321 id_creds = id_creds.from_credentials(target_credentials=credentials)
322 id_creds.refresh(request)
323
324 assert id_creds.token == ID_TOKEN_DATA
325
326 def test_id_token_with_target_audience(self, mock_donor_credentials,
327 mock_authorizedsession_idtoken):
328 credentials = self.make_credentials(lifetime=None)
329 token = 'token'
330 target_audience = 'https://foo.bar'
331
332 expire_time = (
333 _helpers.utcnow().replace(microsecond=0) +
334 datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
335 response_body = {
336 "accessToken": token,
337 "expireTime": expire_time
338 }
339
340 request = self.make_request(
341 data=json.dumps(response_body),
342 status=http_client.OK)
343
344 credentials.refresh(request)
345
346 assert credentials.valid
347 assert not credentials.expired
348
349 id_creds = impersonated_credentials.IDTokenCredentials(
350 credentials)
351 id_creds = id_creds.with_target_audience(
352 target_audience=target_audience)
353 id_creds.refresh(request)
354
355 assert id_creds.token == ID_TOKEN_DATA
356 assert id_creds.expiry == datetime.datetime.fromtimestamp(
357 ID_TOKEN_EXPIRY)
358
359 def test_id_token_invalid_cred(self, mock_donor_credentials,
360 mock_authorizedsession_idtoken):
361 credentials = None
362
363 with pytest.raises(exceptions.GoogleAuthError) as excinfo:
364 impersonated_credentials.IDTokenCredentials(credentials)
365
366 assert excinfo.match('Provided Credential must be'
367 ' impersonated_credentials')
368
369 def test_id_token_with_include_email(self, mock_donor_credentials,
370 mock_authorizedsession_idtoken):
371 credentials = self.make_credentials(lifetime=None)
372 token = 'token'
373 target_audience = 'https://foo.bar'
374
375 expire_time = (
376 _helpers.utcnow().replace(microsecond=0) +
377 datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
378 response_body = {
379 "accessToken": token,
380 "expireTime": expire_time
381 }
382
383 request = self.make_request(
384 data=json.dumps(response_body),
385 status=http_client.OK)
386
387 credentials.refresh(request)
388
389 assert credentials.valid
390 assert not credentials.expired
391
392 id_creds = impersonated_credentials.IDTokenCredentials(
393 credentials, target_audience=target_audience)
394 id_creds = id_creds.with_include_email(True)
395 id_creds.refresh(request)
396
397 assert id_creds.token == ID_TOKEN_DATA