blob: 74342ce03f68d4fc1d46949c1483e94219ae3f2c [file] [log] [blame]
# Copyright 2018 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import os
import mock
import pytest
from six.moves import http_client
from google.auth import _helpers
from google.auth import crypt
from google.auth import exceptions
from google.auth import impersonated_credentials
from google.auth import transport
from google.auth.impersonated_credentials import Credentials
from google.oauth2 import service_account
DATA_DIR = os.path.join(os.path.dirname(__file__), '', 'data')
with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
PRIVATE_KEY_BYTES = fh.read()
SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
TOKEN_URI = 'https://example.com/oauth2/token'
@pytest.fixture
def mock_donor_credentials():
with mock.patch('google.oauth2._client.jwt_grant', autospec=True) as grant:
grant.return_value = (
"source token",
_helpers.utcnow() + datetime.timedelta(seconds=500),
{})
yield grant
class TestImpersonatedCredentials(object):
SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
TARGET_PRINCIPAL = 'impersonated@project.iam.gserviceaccount.com'
TARGET_SCOPES = ['https://www.googleapis.com/auth/devstorage.read_only']
DELEGATES = []
LIFETIME = 3600
SOURCE_CREDENTIALS = service_account.Credentials(
SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI)
def make_credentials(self, lifetime=LIFETIME):
return Credentials(
source_credentials=self.SOURCE_CREDENTIALS,
target_principal=self.TARGET_PRINCIPAL,
target_scopes=self.TARGET_SCOPES,
delegates=self.DELEGATES,
lifetime=lifetime)
def test_default_state(self):
credentials = self.make_credentials()
assert not credentials.valid
assert credentials.expired
def make_request(self, data, status=http_client.OK,
headers=None, side_effect=None):
response = mock.create_autospec(transport.Response, instance=False)
response.status = status
response.data = _helpers.to_bytes(data)
response.headers = headers or {}
request = mock.create_autospec(transport.Request, instance=False)
request.side_effect = side_effect
request.return_value = response
return request
def test_refresh_success(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
token = 'token'
expire_time = (
_helpers.utcnow().replace(microsecond=0) +
datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
response_body = {
"accessToken": token,
"expireTime": expire_time
}
request = self.make_request(
data=json.dumps(response_body),
status=http_client.OK)
credentials.refresh(request)
assert credentials.valid
assert not credentials.expired
def test_refresh_failure_malformed_expire_time(
self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
token = 'token'
expire_time = (
_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat('T')
response_body = {
"accessToken": token,
"expireTime": expire_time
}
request = self.make_request(
data=json.dumps(response_body),
status=http_client.OK)
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
assert not credentials.valid
assert credentials.expired
def test_refresh_failure_lifetime_specified(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=500)
token = 'token'
expire_time = (
_helpers.utcnow().replace(microsecond=0) +
datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
response_body = {
"accessToken": token,
"expireTime": expire_time
}
request = self.make_request(
data=json.dumps(response_body),
status=http_client.OK)
credentials.refresh(request)
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
assert excinfo.match(impersonated_credentials._LIFETIME_ERROR)
assert not credentials.valid
assert credentials.expired
def test_refresh_failure_unauthorzed(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
response_body = {
"error": {
"code": 403,
"message": "The caller does not have permission",
"status": "PERMISSION_DENIED"
}
}
request = self.make_request(
data=json.dumps(response_body),
status=http_client.UNAUTHORIZED)
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
assert not credentials.valid
assert credentials.expired
def test_refresh_failure_http_error(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
response_body = {}
request = self.make_request(
data=json.dumps(response_body),
status=http_client.HTTPException)
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
assert not credentials.valid
assert credentials.expired
def test_expired(self):
credentials = self.make_credentials(lifetime=None)
assert credentials.expired