Add `google.oauth2.service_account.IDTokenCredentials`. (#234)
diff --git a/google/auth/app_engine.py b/google/auth/app_engine.py
index fa13f8e..f47dae1 100644
--- a/google/auth/app_engine.py
+++ b/google/auth/app_engine.py
@@ -136,7 +136,7 @@
@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes):
- return Credentials(
+ return self.__class__(
scopes=scopes, service_account_id=self._service_account_id)
@_helpers.copy_docstring(credentials.Signing)
diff --git a/google/auth/jwt.py b/google/auth/jwt.py
index 0253376..6957374 100644
--- a/google/auth/jwt.py
+++ b/google/auth/jwt.py
@@ -438,7 +438,7 @@
new_additional_claims = copy.deepcopy(self._additional_claims)
new_additional_claims.update(additional_claims or {})
- return Credentials(
+ return self.__class__(
self._signer,
issuer=issuer if issuer is not None else self._issuer,
subject=subject if subject is not None else self._subject,
@@ -643,7 +643,7 @@
new_additional_claims = copy.deepcopy(self._additional_claims)
new_additional_claims.update(additional_claims or {})
- return OnDemandCredentials(
+ return self.__class__(
self._signer,
issuer=issuer if issuer is not None else self._issuer,
subject=subject if subject is not None else self._subject,
diff --git a/google/oauth2/_client.py b/google/oauth2/_client.py
index 66251df..dc35be2 100644
--- a/google/oauth2/_client.py
+++ b/google/oauth2/_client.py
@@ -32,6 +32,7 @@
from google.auth import _helpers
from google.auth import exceptions
+from google.auth import jwt
_URLENCODED_CONTENT_TYPE = 'application/x-www-form-urlencoded'
_JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer'
@@ -155,6 +156,51 @@
return access_token, expiry, response_data
+def id_token_jwt_grant(request, token_uri, assertion):
+ """Implements the JWT Profile for OAuth 2.0 Authorization Grants, but
+ requests an OpenID Connect ID Token instead of an access token.
+
+ This is a variant on the standard JWT Profile that is currently unique
+ to Google. This was added for the benefit of authenticating to services
+ that require ID Tokens instead of access tokens or JWT bearer tokens.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorization server's token endpoint
+ URI.
+ assertion (str): JWT token signed by a service account. The token's
+ payload must include a ``target_audience`` claim.
+
+ Returns:
+ Tuple[str, Optional[datetime], Mapping[str, str]]:
+ The (encoded) Open ID Connect ID Token, expiration, and additional
+ data returned by the endpoint.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+ """
+ body = {
+ 'assertion': assertion,
+ 'grant_type': _JWT_GRANT_TYPE,
+ }
+
+ response_data = _token_endpoint_request(request, token_uri, body)
+
+ try:
+ id_token = response_data['id_token']
+ except KeyError as caught_exc:
+ new_exc = exceptions.RefreshError(
+ 'No ID token in response.', response_data)
+ six.raise_from(new_exc, caught_exc)
+
+ payload = jwt.decode(id_token, verify=False)
+ expiry = datetime.datetime.utcfromtimestamp(payload['exp'])
+
+ return id_token, expiry, response_data
+
+
def refresh_grant(request, token_uri, refresh_token, client_id, client_secret):
"""Implements the OAuth 2.0 refresh token grant.
diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py
index 54bd8d6..c60c565 100644
--- a/google/oauth2/service_account.py
+++ b/google/oauth2/service_account.py
@@ -230,7 +230,7 @@
@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes):
- return Credentials(
+ return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=scopes,
@@ -249,7 +249,7 @@
google.auth.service_account.Credentials: A new credentials
instance.
"""
- return Credentials(
+ return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=self._scopes,
@@ -273,7 +273,7 @@
new_additional_claims = copy.deepcopy(self._additional_claims)
new_additional_claims.update(additional_claims or {})
- return Credentials(
+ return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=self._scopes,
@@ -336,3 +336,207 @@
@_helpers.copy_docstring(credentials.Signing)
def signer_email(self):
return self._service_account_email
+
+
+class IDTokenCredentials(credentials.Signing, credentials.Credentials):
+ """Open ID Connect ID Token-based service account credentials.
+
+ These credentials are largely similar to :class:`.Credentials`, but instead
+ of using an OAuth 2.0 Access Token as the bearer token, they use an Open
+ ID Connect ID Token as the bearer token. These credentials are useful when
+ communicating to services that require ID Tokens and can not accept access
+ tokens.
+
+ Usually, you'll create these credentials with one of the helper
+ constructors. To create credentials using a Google service account
+ private key JSON file::
+
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_file(
+ 'service-account.json'))
+
+ Or if you already have the service account file loaded::
+
+ service_account_info = json.load(open('service_account.json'))
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_info(
+ service_account_info))
+
+ Both helper methods pass on arguments to the constructor, so you can
+ specify additional scopes and a subject if necessary::
+
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_file(
+ 'service-account.json',
+ scopes=['email'],
+ subject='user@example.com'))
+`
+ The credentials are considered immutable. If you want to modify the scopes
+ or the subject used for delegation, use :meth:`with_scopes` or
+ :meth:`with_subject`::
+
+ scoped_credentials = credentials.with_scopes(['email'])
+ delegated_credentials = credentials.with_subject(subject)
+
+ """
+ def __init__(self, signer, service_account_email, token_uri,
+ target_audience, additional_claims=None):
+ """
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ service_account_email (str): The service account's email.
+ token_uri (str): The OAuth 2.0 Token URI.
+ target_audience (str): The intended audience for these credentials,
+ used when requesting the ID Token. The ID Token's ``aud`` claim
+ will be set to this string.
+ additional_claims (Mapping[str, str]): Any additional claims for
+ the JWT assertion used in the authorization grant.
+
+ .. note:: Typically one of the helper constructors
+ :meth:`from_service_account_file` or
+ :meth:`from_service_account_info` are used instead of calling the
+ constructor directly.
+ """
+ super(IDTokenCredentials, self).__init__()
+ self._signer = signer
+ self._service_account_email = service_account_email
+ self._token_uri = token_uri
+ self._target_audience = target_audience
+
+ if additional_claims is not None:
+ self._additional_claims = additional_claims
+ else:
+ self._additional_claims = {}
+
+ @classmethod
+ def _from_signer_and_info(cls, signer, info, **kwargs):
+ """Creates a credentials instance from a signer and service account
+ info.
+
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ info (Mapping[str, str]): The service account info.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.jwt.IDTokenCredentials: The constructed credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ kwargs.setdefault('service_account_email', info['client_email'])
+ kwargs.setdefault('token_uri', info['token_uri'])
+ return cls(signer, **kwargs)
+
+ @classmethod
+ def from_service_account_info(cls, info, **kwargs):
+ """Creates a credentials instance from parsed service account info.
+
+ Args:
+ info (Mapping[str, str]): The service account info in Google
+ format.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ signer = _service_account_info.from_dict(
+ info, require=['client_email', 'token_uri'])
+ return cls._from_signer_and_info(signer, info, **kwargs)
+
+ @classmethod
+ def from_service_account_file(cls, filename, **kwargs):
+ """Creates a credentials instance from a service account json file.
+
+ Args:
+ filename (str): The path to the service account json file.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: The constructed
+ credentials.
+ """
+ info, signer = _service_account_info.from_filename(
+ filename, require=['client_email', 'token_uri'])
+ return cls._from_signer_and_info(signer, info, **kwargs)
+
+ def with_target_audience(self, target_audience):
+ """Create a copy of these credentials with the specified target
+ audience.
+
+ Args:
+ target_audience (str): The intended audience for these credentials,
+ used when requesting the ID Token.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: A new credentials
+ instance.
+ """
+ return self.__class__(
+ self._signer,
+ service_account_email=self._service_account_email,
+ token_uri=self._token_uri,
+ target_audience=target_audience,
+ additional_claims=self._additional_claims.copy())
+
+ def _make_authorization_grant_assertion(self):
+ """Create the OAuth 2.0 assertion.
+
+ This assertion is used during the OAuth 2.0 grant to acquire an
+ ID token.
+
+ Returns:
+ bytes: The authorization grant assertion.
+ """
+ now = _helpers.utcnow()
+ lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
+ expiry = now + lifetime
+
+ payload = {
+ 'iat': _helpers.datetime_to_secs(now),
+ 'exp': _helpers.datetime_to_secs(expiry),
+ # The issuer must be the service account email.
+ 'iss': self.service_account_email,
+ # The audience must be the auth token endpoint's URI
+ 'aud': self._token_uri,
+ # The target audience specifies which service the ID token is
+ # intended for.
+ 'target_audience': self._target_audience
+ }
+
+ payload.update(self._additional_claims)
+
+ token = jwt.encode(self._signer, payload)
+
+ return token
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ assertion = self._make_authorization_grant_assertion()
+ access_token, expiry, _ = _client.id_token_jwt_grant(
+ request, self._token_uri, assertion)
+ self.token = access_token
+ self.expiry = expiry
+
+ @property
+ def service_account_email(self):
+ """The service account email."""
+ return self._service_account_email
+
+ @_helpers.copy_docstring(credentials.Signing)
+ def sign_bytes(self, message):
+ return self._signer.sign(message)
+
+ @property
+ @_helpers.copy_docstring(credentials.Signing)
+ def signer(self):
+ return self._signer
+
+ @property
+ @_helpers.copy_docstring(credentials.Signing)
+ def signer_email(self):
+ return self._service_account_email