Add support for imersonated_credentials.Sign, IDToken (#348)
diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py
index 32dfe83..bb2bbf2 100644
--- a/google/auth/impersonated_credentials.py
+++ b/google/auth/impersonated_credentials.py
@@ -25,6 +25,7 @@
https://cloud.google.com/iam/credentials/reference/rest/
"""
+import base64
import copy
from datetime import datetime
import json
@@ -35,6 +36,8 @@
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
+from google.auth import jwt
+from google.auth.transport.requests import AuthorizedSession
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
@@ -43,8 +46,18 @@
_IAM_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/projects/-' +
'/serviceAccounts/{}:generateAccessToken')
+_IAM_SIGN_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/projects/-' +
+ '/serviceAccounts/{}:signBlob')
+
+_IAM_IDTOKEN_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/' +
+ 'projects/-/serviceAccounts/{}:generateIdToken')
+
_REFRESH_ERROR = 'Unable to acquire impersonated credentials'
+_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
+
+_DEFAULT_TOKEN_URI = 'https://oauth2.googleapis.com/token'
+
def _make_iam_token_request(request, principal, headers, body):
"""Makes a request to the Google Cloud IAM service for an access token.
@@ -94,7 +107,7 @@
six.raise_from(new_exc, caught_exc)
-class Credentials(credentials.Credentials):
+class Credentials(credentials.Credentials, credentials.Signing):
"""This module defines impersonated credentials which are essentially
impersonated identities.
@@ -153,7 +166,7 @@
client = storage.Client(credentials=target_credentials)
buckets = client.list_buckets(project='your_project')
for bucket in buckets:
- print bucket.name
+ print(bucket.name)
"""
def __init__(self, source_credentials, target_principal,
@@ -172,7 +185,8 @@
granted to the prceeding identity. For example, if set to
[serviceAccountB, serviceAccountC], the source_credential
must have the Token Creator role on serviceAccountB.
- serviceAccountB must have the Token Creator on serviceAccountC.
+ serviceAccountB must have the Token Creator on
+ serviceAccountC.
Finally, C must have Token Creator on target_principal.
If left unset, source_credential must have that role on
target_principal.
@@ -229,3 +243,108 @@
principal=self._target_principal,
headers=headers,
body=body)
+
+ def sign_bytes(self, message):
+
+ iam_sign_endpoint = _IAM_SIGN_ENDPOINT.format(self._target_principal)
+
+ body = {
+ "payload": base64.b64encode(message),
+ "delegates": self._delegates
+ }
+
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+
+ authed_session = AuthorizedSession(self._source_credentials)
+
+ response = authed_session.post(
+ url=iam_sign_endpoint,
+ headers=headers,
+ json=body)
+
+ return base64.b64decode(response.json()['signedBlob'])
+
+ @property
+ def signer_email(self):
+ return self._target_principal
+
+ @property
+ def service_account_email(self):
+ return self._target_principal
+
+ @property
+ def signer(self):
+ return self
+
+
+class IDTokenCredentials(credentials.Credentials):
+ """Open ID Connect ID Token-based service account credentials.
+
+ """
+ def __init__(self, target_credentials,
+ target_audience=None, include_email=False):
+ """
+ Args:
+ target_credentials (google.auth.Credentials): The target
+ credential used as to acquire the id tokens for.
+ target_audience (string): Audience to issue the token for.
+ include_email (bool): Include email in IdToken
+ """
+ super(IDTokenCredentials, self).__init__()
+
+ if not isinstance(target_credentials,
+ Credentials):
+ raise exceptions.GoogleAuthError("Provided Credential must be "
+ "impersonated_credentials")
+ self._target_credentials = target_credentials
+ self._target_audience = target_audience
+ self._include_email = include_email
+
+ def from_credentials(self, target_credentials,
+ target_audience=None):
+ return self.__class__(
+ target_credentials=self._target_credentials,
+ target_audience=target_audience)
+
+ def with_target_audience(self, target_audience):
+ return self.__class__(
+ target_credentials=self._target_credentials,
+ target_audience=target_audience)
+
+ def with_include_email(self, include_email):
+ return self.__class__(
+ target_credentials=self._target_credentials,
+ target_audience=self._target_audience,
+ include_email=include_email)
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+
+ iam_sign_endpoint = _IAM_IDTOKEN_ENDPOINT.format(self.
+ _target_credentials.
+ signer_email)
+
+ body = {
+ "audience": self._target_audience,
+ "delegates": self._target_credentials._delegates,
+ "includeEmail": self._include_email
+ }
+
+ headers = {
+ 'Content-Type': 'application/json',
+ }
+
+ authed_session = AuthorizedSession(self._target_credentials.
+ _source_credentials)
+
+ response = authed_session.post(
+ url=iam_sign_endpoint,
+ headers=headers,
+ data=json.dumps(body).encode('utf-8'))
+
+ id_token = response.json()['token']
+ self.token = id_token
+ self.expiry = datetime.fromtimestamp(jwt.decode(id_token,
+ verify=False)['exp'])