| # Copyright 2018 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import datetime |
| import json |
| import os |
| |
| import mock |
| import pytest |
| from six.moves import http_client |
| |
| from google.auth import _helpers |
| from google.auth import crypt |
| from google.auth import exceptions |
| from google.auth import impersonated_credentials |
| from google.auth import transport |
| from google.auth.impersonated_credentials import Credentials |
| from google.oauth2 import service_account |
| |
| DATA_DIR = os.path.join(os.path.dirname(__file__), '', 'data') |
| |
| with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh: |
| PRIVATE_KEY_BYTES = fh.read() |
| |
| SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json') |
| |
| with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh: |
| SERVICE_ACCOUNT_INFO = json.load(fh) |
| |
| SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1') |
| TOKEN_URI = 'https://example.com/oauth2/token' |
| |
| |
| @pytest.fixture |
| def mock_donor_credentials(): |
| with mock.patch('google.oauth2._client.jwt_grant', autospec=True) as grant: |
| grant.return_value = ( |
| "source token", |
| _helpers.utcnow() + datetime.timedelta(seconds=500), |
| {}) |
| yield grant |
| |
| |
| class TestImpersonatedCredentials(object): |
| |
| SERVICE_ACCOUNT_EMAIL = 'service-account@example.com' |
| TARGET_PRINCIPAL = 'impersonated@project.iam.gserviceaccount.com' |
| TARGET_SCOPES = ['https://www.googleapis.com/auth/devstorage.read_only'] |
| DELEGATES = [] |
| LIFETIME = 3600 |
| SOURCE_CREDENTIALS = service_account.Credentials( |
| SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI) |
| |
| def make_credentials(self, lifetime=LIFETIME): |
| return Credentials( |
| source_credentials=self.SOURCE_CREDENTIALS, |
| target_principal=self.TARGET_PRINCIPAL, |
| target_scopes=self.TARGET_SCOPES, |
| delegates=self.DELEGATES, |
| lifetime=lifetime) |
| |
| def test_default_state(self): |
| credentials = self.make_credentials() |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def make_request(self, data, status=http_client.OK, |
| headers=None, side_effect=None): |
| response = mock.create_autospec(transport.Response, instance=False) |
| response.status = status |
| response.data = _helpers.to_bytes(data) |
| response.headers = headers or {} |
| |
| request = mock.create_autospec(transport.Request, instance=False) |
| request.side_effect = side_effect |
| request.return_value = response |
| |
| return request |
| |
| def test_refresh_success(self, mock_donor_credentials): |
| credentials = self.make_credentials(lifetime=None) |
| token = 'token' |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + |
| datetime.timedelta(seconds=500)).isoformat('T') + 'Z' |
| response_body = { |
| "accessToken": token, |
| "expireTime": expire_time |
| } |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.OK) |
| |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| |
| def test_refresh_failure_malformed_expire_time( |
| self, mock_donor_credentials): |
| credentials = self.make_credentials(lifetime=None) |
| token = 'token' |
| |
| expire_time = ( |
| _helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat('T') |
| response_body = { |
| "accessToken": token, |
| "expireTime": expire_time |
| } |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.OK) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match(impersonated_credentials._REFRESH_ERROR) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_refresh_failure_lifetime_specified(self, mock_donor_credentials): |
| credentials = self.make_credentials(lifetime=500) |
| token = 'token' |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + |
| datetime.timedelta(seconds=500)).isoformat('T') + 'Z' |
| response_body = { |
| "accessToken": token, |
| "expireTime": expire_time |
| } |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.OK) |
| |
| credentials.refresh(request) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match(impersonated_credentials._LIFETIME_ERROR) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_refresh_failure_unauthorzed(self, mock_donor_credentials): |
| credentials = self.make_credentials(lifetime=None) |
| |
| response_body = { |
| "error": { |
| "code": 403, |
| "message": "The caller does not have permission", |
| "status": "PERMISSION_DENIED" |
| } |
| } |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.UNAUTHORIZED) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match(impersonated_credentials._REFRESH_ERROR) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_refresh_failure_http_error(self, mock_donor_credentials): |
| credentials = self.make_credentials(lifetime=None) |
| |
| response_body = {} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.HTTPException) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match(impersonated_credentials._REFRESH_ERROR) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_expired(self): |
| credentials = self.make_credentials(lifetime=None) |
| assert credentials.expired |