Add google.auth.impersonated_credentials (#299)
diff --git a/docs/index.rst b/docs/index.rst
index 56e3eca..1eb3d86 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -16,6 +16,7 @@
- Support for signing and verifying :mod:`JWTs <google.auth.jwt>`.
- Support for verifying and decoding :mod:`ID Tokens <google.oauth2.id_token>`.
- Support for Google :mod:`Service Account credentials <google.oauth2.service_account>`.
+- Support for Google :mod:`Impersonated Credentials <google.auth.impersonated_credentials>`.
- Support for :mod:`Google Compute Engine credentials <google.auth.compute_engine>`.
- Support for :mod:`Google App Engine standard credentials <google.auth.app_engine>`.
- Support for various transports, including
diff --git a/docs/reference/google.auth.impersonated_credentials.rst b/docs/reference/google.auth.impersonated_credentials.rst
new file mode 100644
index 0000000..653708e
--- /dev/null
+++ b/docs/reference/google.auth.impersonated_credentials.rst
@@ -0,0 +1,7 @@
+google.auth.impersonated\_credentials module
+============================================
+
+.. automodule:: google.auth.impersonated_credentials
+ :members:
+ :inherited-members:
+ :show-inheritance:
diff --git a/docs/reference/google.auth.rst b/docs/reference/google.auth.rst
index 244d0bb..bc6740b 100644
--- a/docs/reference/google.auth.rst
+++ b/docs/reference/google.auth.rst
@@ -25,5 +25,6 @@
google.auth.environment_vars
google.auth.exceptions
google.auth.iam
+ google.auth.impersonated_credentials
google.auth.jwt
diff --git a/docs/user-guide.rst b/docs/user-guide.rst
index 060d9b8..7587917 100644
--- a/docs/user-guide.rst
+++ b/docs/user-guide.rst
@@ -205,6 +205,35 @@
.. _requests-oauthlib:
https://requests-oauthlib.readthedocs.io/en/latest/
+Impersonated credentials
+++++++++++++++++++++++++
+
+Impersonated Credentials allows one set of credentials issued to a user or service account
+to impersonate another. The target service account must grant the source credential
+the "Service Account Token Creator" IAM role::
+
+ from google.auth import impersonated_credentials
+
+ target_scopes = ['https://www.googleapis.com/auth/devstorage.read_only']
+ source_credentials = service_account.Credentials.from_service_account_file(
+ '/path/to/svc_account.json',
+ scopes=target_scopes)
+
+ target_credentials = impersonated_credentials.Credentials(
+ source_credentials=source_credentials,
+ target_principal='impersonated-account@_project_.iam.gserviceaccount.com',
+ target_scopes=target_scopes,
+ lifetime=500)
+ client = storage.Client(credentials=target_credentials)
+ buckets = client.list_buckets(project='your_project')
+ for bucket in buckets:
+ print bucket.name
+
+
+In the example above `source_credentials` does not have direct access to list buckets
+in the target project. Using `ImpersonatedCredentials` will allow the source_credentials
+to assume the identity of a target_principal that does have access
+
Making authenticated requests
-----------------------------
diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py
new file mode 100644
index 0000000..ca625b8
--- /dev/null
+++ b/google/auth/impersonated_credentials.py
@@ -0,0 +1,239 @@
+# Copyright 2018 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Google Cloud Impersonated credentials.
+
+This module provides authentication for applications where local credentials
+impersonates a remote service account using `IAM Credentials API`_.
+
+This class can be used to impersonate a service account as long as the original
+Credential object has the "Service Account Token Creator" role on the target
+service account.
+
+ .. _IAM Credentials API:
+ https://cloud.google.com/iam/credentials/reference/rest/
+"""
+
+import copy
+from datetime import datetime
+import json
+
+import six
+from six.moves import http_client
+
+from google.auth import _helpers
+from google.auth import credentials
+from google.auth import exceptions
+
+_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
+
+_IAM_SCOPE = ['https://www.googleapis.com/auth/iam']
+
+_IAM_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/projects/-' +
+ '/serviceAccounts/{}:generateAccessToken')
+
+_REFRESH_ERROR = 'Unable to acquire impersonated credentials'
+_LIFETIME_ERROR = 'Credentials with lifetime set cannot be renewed'
+
+
+def _make_iam_token_request(request, principal, headers, body):
+ """Makes a request to the Google Cloud IAM service for an access token.
+ Args:
+ request (Request): The Request object to use.
+ principal (str): The principal to request an access token for.
+ headers (Mapping[str, str]): Map of headers to transmit.
+ body (Mapping[str, str]): JSON Payload body for the iamcredentials
+ API call.
+
+ Raises:
+ TransportError: Raised if there is an underlying HTTP connection
+ Error
+ DefaultCredentialsError: Raised if the impersonated credentials
+ are not available. Common reasons are
+ `iamcredentials.googleapis.com` is not enabled or the
+ `Service Account Token Creator` is not assigned
+ """
+ iam_endpoint = _IAM_ENDPOINT.format(principal)
+
+ body = json.dumps(body)
+
+ response = request(
+ url=iam_endpoint,
+ method='POST',
+ headers=headers,
+ body=body)
+
+ response_body = response.data.decode('utf-8')
+
+ if response.status != http_client.OK:
+ exceptions.RefreshError(_REFRESH_ERROR, response_body)
+
+ try:
+ token_response = json.loads(response.data.decode('utf-8'))
+ token = token_response['accessToken']
+ expiry = datetime.strptime(
+ token_response['expireTime'], '%Y-%m-%dT%H:%M:%SZ')
+
+ return token, expiry
+
+ except (KeyError, ValueError) as caught_exc:
+ new_exc = exceptions.RefreshError(
+ '{}: No access token or invalid expiration in response.'.format(
+ _REFRESH_ERROR),
+ response_body)
+ six.raise_from(new_exc, caught_exc)
+
+
+class Credentials(credentials.Credentials):
+ """This module defines impersonated credentials which are essentially
+ impersonated identities.
+
+ Impersonated Credentials allows credentials issued to a user or
+ service account to impersonate another. The target service account must
+ grant the originating credential principal the
+ `Service Account Token Creator`_ IAM role:
+
+ For more information about Token Creator IAM role and
+ IAMCredentials API, see
+ `Creating Short-Lived Service Account Credentials`_.
+
+ .. _Service Account Token Creator:
+ https://cloud.google.com/iam/docs/service-accounts#the_service_account_token_creator_role
+
+ .. _Creating Short-Lived Service Account Credentials:
+ https://cloud.google.com/iam/docs/creating-short-lived-service-account-credentials
+
+ Usage:
+
+ First grant source_credentials the `Service Account Token Creator`
+ role on the target account to impersonate. In this example, the
+ service account represented by svc_account.json has the
+ token creator role on
+ `impersonated-account@_project_.iam.gserviceaccount.com`.
+
+ Initialize a source credential which does not have access to
+ list bucket::
+
+ from google.oauth2 import service_acccount
+
+ target_scopes = [
+ 'https://www.googleapis.com/auth/devstorage.read_only']
+
+ source_credentials = (
+ service_account.Credentials.from_service_account_file(
+ '/path/to/svc_account.json',
+ scopes=target_scopes))
+
+ Now use the source credentials to acquire credentials to impersonate
+ another service account::
+
+ from google.auth import impersonated_credentials
+
+ target_credentials = impersonated_credentials.Credentials(
+ source_credentials=source_credentials,
+ target_principal='impersonated-account@_project_.iam.gserviceaccount.com',
+ target_scopes = target_scopes,
+ lifetime=500)
+
+ Resource access is granted::
+
+ client = storage.Client(credentials=target_credentials)
+ buckets = client.list_buckets(project='your_project')
+ for bucket in buckets:
+ print bucket.name
+ """
+
+ def __init__(self, source_credentials, target_principal,
+ target_scopes, delegates=None,
+ lifetime=None):
+ """
+ Args:
+ source_credentials (google.auth.Credentials): The source credential
+ used as to acquire the impersonated credentials.
+ target_principal (str): The service account to impersonate.
+ target_scopes (Sequence[str]): Scopes to request during the
+ authorization grant.
+ delegates (Sequence[str]): The chained list of delegates required
+ to grant the final access_token. If set, the sequence of
+ identities must have "Service Account Token Creator" capability
+ granted to the prceeding identity. For example, if set to
+ [serviceAccountB, serviceAccountC], the source_credential
+ must have the Token Creator role on serviceAccountB.
+ serviceAccountB must have the Token Creator on serviceAccountC.
+ Finally, C must have Token Creator on target_principal.
+ If left unset, source_credential must have that role on
+ target_principal.
+ lifetime (int): Number of seconds the delegated credential should
+ be valid for (upto 3600). If set, the credentials will
+ **not** get refreshed after expiration. If not set, the
+ credentials will be refreshed every 3600s.
+ """
+
+ super(Credentials, self).__init__()
+
+ self._source_credentials = copy.copy(source_credentials)
+ self._source_credentials._scopes = _IAM_SCOPE
+ self._target_principal = target_principal
+ self._target_scopes = target_scopes
+ self._delegates = delegates
+ self._lifetime = lifetime
+ self.token = None
+ self.expiry = _helpers.utcnow()
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ if (self.token is not None and self._lifetime is not None):
+ self.expiry = _helpers.utcnow()
+ raise exceptions.RefreshError(_LIFETIME_ERROR)
+ self._source_credentials.refresh(request)
+ self._update_token(request)
+
+ @property
+ def expired(self):
+ return _helpers.utcnow() >= self.expiry
+
+ def _update_token(self, request):
+ """Updates credentials with a new access_token representing
+ the impersonated account.
+
+ Args:
+ request (google.auth.transport.requests.Request): Request object
+ to use for refreshing credentials.
+ """
+
+ # Refresh our source credentials.
+ self._source_credentials.refresh(request)
+
+ lifetime = self._lifetime
+ if (self._lifetime is None):
+ lifetime = _DEFAULT_TOKEN_LIFETIME_SECS
+
+ body = {
+ "delegates": self._delegates,
+ "scope": self._target_scopes,
+ "lifetime": str(lifetime) + "s"
+ }
+
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+
+ # Apply the source credentials authentication info.
+ self._source_credentials.apply(headers)
+
+ self.token, self.expiry = _make_iam_token_request(
+ request=request,
+ principal=self._target_principal,
+ headers=headers,
+ body=body)
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
new file mode 100644
index 0000000..74342ce
--- /dev/null
+++ b/tests/test_impersonated_credentials.py
@@ -0,0 +1,204 @@
+# Copyright 2018 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import json
+import os
+
+import mock
+import pytest
+from six.moves import http_client
+
+from google.auth import _helpers
+from google.auth import crypt
+from google.auth import exceptions
+from google.auth import impersonated_credentials
+from google.auth import transport
+from google.auth.impersonated_credentials import Credentials
+from google.oauth2 import service_account
+
+DATA_DIR = os.path.join(os.path.dirname(__file__), '', 'data')
+
+with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+ PRIVATE_KEY_BYTES = fh.read()
+
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+
+with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+ SERVICE_ACCOUNT_INFO = json.load(fh)
+
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+TOKEN_URI = 'https://example.com/oauth2/token'
+
+
+@pytest.fixture
+def mock_donor_credentials():
+ with mock.patch('google.oauth2._client.jwt_grant', autospec=True) as grant:
+ grant.return_value = (
+ "source token",
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {})
+ yield grant
+
+
+class TestImpersonatedCredentials(object):
+
+ SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
+ TARGET_PRINCIPAL = 'impersonated@project.iam.gserviceaccount.com'
+ TARGET_SCOPES = ['https://www.googleapis.com/auth/devstorage.read_only']
+ DELEGATES = []
+ LIFETIME = 3600
+ SOURCE_CREDENTIALS = service_account.Credentials(
+ SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI)
+
+ def make_credentials(self, lifetime=LIFETIME):
+ return Credentials(
+ source_credentials=self.SOURCE_CREDENTIALS,
+ target_principal=self.TARGET_PRINCIPAL,
+ target_scopes=self.TARGET_SCOPES,
+ delegates=self.DELEGATES,
+ lifetime=lifetime)
+
+ def test_default_state(self):
+ credentials = self.make_credentials()
+ assert not credentials.valid
+ assert credentials.expired
+
+ def make_request(self, data, status=http_client.OK,
+ headers=None, side_effect=None):
+ response = mock.create_autospec(transport.Response, instance=False)
+ response.status = status
+ response.data = _helpers.to_bytes(data)
+ response.headers = headers or {}
+
+ request = mock.create_autospec(transport.Request, instance=False)
+ request.side_effect = side_effect
+ request.return_value = response
+
+ return request
+
+ def test_refresh_success(self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ assert credentials.valid
+ assert not credentials.expired
+
+ def test_refresh_failure_malformed_expire_time(
+ self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=None)
+ token = 'token'
+
+ expire_time = (
+ _helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat('T')
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
+
+ assert not credentials.valid
+ assert credentials.expired
+
+ def test_refresh_failure_lifetime_specified(self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=500)
+ token = 'token'
+
+ expire_time = (
+ _helpers.utcnow().replace(microsecond=0) +
+ datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
+ response_body = {
+ "accessToken": token,
+ "expireTime": expire_time
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.OK)
+
+ credentials.refresh(request)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(impersonated_credentials._LIFETIME_ERROR)
+
+ assert not credentials.valid
+ assert credentials.expired
+
+ def test_refresh_failure_unauthorzed(self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=None)
+
+ response_body = {
+ "error": {
+ "code": 403,
+ "message": "The caller does not have permission",
+ "status": "PERMISSION_DENIED"
+ }
+ }
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.UNAUTHORIZED)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
+
+ assert not credentials.valid
+ assert credentials.expired
+
+ def test_refresh_failure_http_error(self, mock_donor_credentials):
+ credentials = self.make_credentials(lifetime=None)
+
+ response_body = {}
+
+ request = self.make_request(
+ data=json.dumps(response_body),
+ status=http_client.HTTPException)
+
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ credentials.refresh(request)
+
+ assert excinfo.match(impersonated_credentials._REFRESH_ERROR)
+
+ assert not credentials.valid
+ assert credentials.expired
+
+ def test_expired(self):
+ credentials = self.make_credentials(lifetime=None)
+ assert credentials.expired