Add google.auth.impersonated_credentials (#299)
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
new file mode 100644
index 0000000..74342ce
--- /dev/null
+++ b/tests/test_impersonated_credentials.py
@@ -0,0 +1,204 @@
+# Copyright 2018 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import json
+import os
+
+import mock
+import pytest
+from six.moves import http_client
+
+from google.auth import _helpers
+from google.auth import crypt
+from google.auth import exceptions
+from google.auth import impersonated_credentials
+from google.auth import transport
+from google.auth.impersonated_credentials import Credentials
+from google.oauth2 import service_account
+
+DATA_DIR = os.path.join(os.path.dirname(__file__), '', 'data')
+
+with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+ PRIVATE_KEY_BYTES = fh.read()
+
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+
+with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+ SERVICE_ACCOUNT_INFO = json.load(fh)
+
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+TOKEN_URI = 'https://example.com/oauth2/token'
+
+
+@pytest.fixture
+def mock_donor_credentials():
+ with mock.patch('google.oauth2._client.jwt_grant', autospec=True) as grant:
+ grant.return_value = (
+ "source token",
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {})
+ yield grant
+
+
+class TestImpersonatedCredentials(object):
+
+ SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
+ TARGET_PRINCIPAL = 'impersonated@project.iam.gserviceaccount.com'
+ TARGET_SCOPES = ['https://www.googleapis.com/auth/devstorage.read_only']
+ DELEGATES = []
+ LIFETIME = 3600
+ SOURCE_CREDENTIALS = service_account.Credentials(
+ SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI)
+
+ def make_credentials(self, lifetime=LIFETIME):
+ return Credentials(
+ source_credentials=self.SOURCE_CREDENTIALS,
+ target_principal=self.TARGET_PRINCIPAL,
+ target_scopes=self.TARGET_SCOPES,
+ delegates=self.DELEGATES,
+ lifetime=lifetime)
+
+ def test_default_state(self):
+ credentials = self.make_credentials()
+ assert not credentials.valid
+ assert credentials.expired
+
+ def make_request(self, data, status=http_client.OK,
+ headers=None, side_effect=None):
+ response = mock.create_autospec(transport.Response, instance=False)
+ response.status = status
+ response.data = _helpers.to_bytes(data)
+ response.headers = headers or {}
+
+ request = mock.create_autospec(transport.Request, instance=False)
+ request.side_effect = side_effect
+ request.return_value = response
+
+ return request
+
+ def test_refresh_success(self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ def test_refresh_failure_malformed_expire_time(
+ self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+
+ expire_time = (
+ _helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat('T')
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
+
+ assert not credentials.valid
+ assert credentials.expired
+
+ def test_refresh_failure_lifetime_specified(self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=500)
+ token = 'token'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(impersonated_credentials._LIFETIME_ERROR)
+
+ assert not credentials.valid
+ assert credentials.expired
+
+ def test_refresh_failure_unauthorzed(self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=None)
+
+ response_body = {
+ "error": {
+ "code": 403,
+ "message": "The caller does not have permission",
+ "status": "PERMISSION_DENIED"
+ }
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.UNAUTHORIZED)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
+
+ assert not credentials.valid
+ assert credentials.expired
+
+ def test_refresh_failure_http_error(self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=None)
+
+ response_body = {}
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.HTTPException)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
+
+ assert not credentials.valid
+ assert credentials.expired
+
+ def test_expired(self):
+ credentials = self.make_credentials(lifetime=None)
+ assert credentials.expired