Add `google.oauth2.service_account.IDTokenCredentials`. (#234)
diff --git a/google/auth/app_engine.py b/google/auth/app_engine.py
index fa13f8e..f47dae1 100644
--- a/google/auth/app_engine.py
+++ b/google/auth/app_engine.py
@@ -136,7 +136,7 @@
@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes):
- return Credentials(
+ return self.__class__(
scopes=scopes, service_account_id=self._service_account_id)
@_helpers.copy_docstring(credentials.Signing)
diff --git a/google/auth/jwt.py b/google/auth/jwt.py
index 0253376..6957374 100644
--- a/google/auth/jwt.py
+++ b/google/auth/jwt.py
@@ -438,7 +438,7 @@
new_additional_claims = copy.deepcopy(self._additional_claims)
new_additional_claims.update(additional_claims or {})
- return Credentials(
+ return self.__class__(
self._signer,
issuer=issuer if issuer is not None else self._issuer,
subject=subject if subject is not None else self._subject,
@@ -643,7 +643,7 @@
new_additional_claims = copy.deepcopy(self._additional_claims)
new_additional_claims.update(additional_claims or {})
- return OnDemandCredentials(
+ return self.__class__(
self._signer,
issuer=issuer if issuer is not None else self._issuer,
subject=subject if subject is not None else self._subject,
diff --git a/google/oauth2/_client.py b/google/oauth2/_client.py
index 66251df..dc35be2 100644
--- a/google/oauth2/_client.py
+++ b/google/oauth2/_client.py
@@ -32,6 +32,7 @@
from google.auth import _helpers
from google.auth import exceptions
+from google.auth import jwt
_URLENCODED_CONTENT_TYPE = 'application/x-www-form-urlencoded'
_JWT_GRANT_TYPE = 'urn:ietf:params:oauth:grant-type:jwt-bearer'
@@ -155,6 +156,51 @@
return access_token, expiry, response_data
+def id_token_jwt_grant(request, token_uri, assertion):
+ """Implements the JWT Profile for OAuth 2.0 Authorization Grants, but
+ requests an OpenID Connect ID Token instead of an access token.
+
+ This is a variant on the standard JWT Profile that is currently unique
+ to Google. This was added for the benefit of authenticating to services
+ that require ID Tokens instead of access tokens or JWT bearer tokens.
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ token_uri (str): The OAuth 2.0 authorization server's token endpoint
+ URI.
+ assertion (str): JWT token signed by a service account. The token's
+ payload must include a ``target_audience`` claim.
+
+ Returns:
+ Tuple[str, Optional[datetime], Mapping[str, str]]:
+ The (encoded) Open ID Connect ID Token, expiration, and additional
+ data returned by the endpoint.
+
+ Raises:
+ google.auth.exceptions.RefreshError: If the token endpoint returned
+ an error.
+ """
+ body = {
+ 'assertion': assertion,
+ 'grant_type': _JWT_GRANT_TYPE,
+ }
+
+ response_data = _token_endpoint_request(request, token_uri, body)
+
+ try:
+ id_token = response_data['id_token']
+ except KeyError as caught_exc:
+ new_exc = exceptions.RefreshError(
+ 'No ID token in response.', response_data)
+ six.raise_from(new_exc, caught_exc)
+
+ payload = jwt.decode(id_token, verify=False)
+ expiry = datetime.datetime.utcfromtimestamp(payload['exp'])
+
+ return id_token, expiry, response_data
+
+
def refresh_grant(request, token_uri, refresh_token, client_id, client_secret):
"""Implements the OAuth 2.0 refresh token grant.
diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py
index 54bd8d6..c60c565 100644
--- a/google/oauth2/service_account.py
+++ b/google/oauth2/service_account.py
@@ -230,7 +230,7 @@
@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes):
- return Credentials(
+ return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=scopes,
@@ -249,7 +249,7 @@
google.auth.service_account.Credentials: A new credentials
instance.
"""
- return Credentials(
+ return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=self._scopes,
@@ -273,7 +273,7 @@
new_additional_claims = copy.deepcopy(self._additional_claims)
new_additional_claims.update(additional_claims or {})
- return Credentials(
+ return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=self._scopes,
@@ -336,3 +336,207 @@
@_helpers.copy_docstring(credentials.Signing)
def signer_email(self):
return self._service_account_email
+
+
+class IDTokenCredentials(credentials.Signing, credentials.Credentials):
+ """Open ID Connect ID Token-based service account credentials.
+
+ These credentials are largely similar to :class:`.Credentials`, but instead
+ of using an OAuth 2.0 Access Token as the bearer token, they use an Open
+ ID Connect ID Token as the bearer token. These credentials are useful when
+ communicating to services that require ID Tokens and can not accept access
+ tokens.
+
+ Usually, you'll create these credentials with one of the helper
+ constructors. To create credentials using a Google service account
+ private key JSON file::
+
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_file(
+ 'service-account.json'))
+
+ Or if you already have the service account file loaded::
+
+ service_account_info = json.load(open('service_account.json'))
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_info(
+ service_account_info))
+
+ Both helper methods pass on arguments to the constructor, so you can
+ specify additional scopes and a subject if necessary::
+
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_file(
+ 'service-account.json',
+ scopes=['email'],
+ subject='user@example.com'))
+`
+ The credentials are considered immutable. If you want to modify the scopes
+ or the subject used for delegation, use :meth:`with_scopes` or
+ :meth:`with_subject`::
+
+ scoped_credentials = credentials.with_scopes(['email'])
+ delegated_credentials = credentials.with_subject(subject)
+
+ """
+ def __init__(self, signer, service_account_email, token_uri,
+ target_audience, additional_claims=None):
+ """
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ service_account_email (str): The service account's email.
+ token_uri (str): The OAuth 2.0 Token URI.
+ target_audience (str): The intended audience for these credentials,
+ used when requesting the ID Token. The ID Token's ``aud`` claim
+ will be set to this string.
+ additional_claims (Mapping[str, str]): Any additional claims for
+ the JWT assertion used in the authorization grant.
+
+ .. note:: Typically one of the helper constructors
+ :meth:`from_service_account_file` or
+ :meth:`from_service_account_info` are used instead of calling the
+ constructor directly.
+ """
+ super(IDTokenCredentials, self).__init__()
+ self._signer = signer
+ self._service_account_email = service_account_email
+ self._token_uri = token_uri
+ self._target_audience = target_audience
+
+ if additional_claims is not None:
+ self._additional_claims = additional_claims
+ else:
+ self._additional_claims = {}
+
+ @classmethod
+ def _from_signer_and_info(cls, signer, info, **kwargs):
+ """Creates a credentials instance from a signer and service account
+ info.
+
+ Args:
+ signer (google.auth.crypt.Signer): The signer used to sign JWTs.
+ info (Mapping[str, str]): The service account info.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.jwt.IDTokenCredentials: The constructed credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ kwargs.setdefault('service_account_email', info['client_email'])
+ kwargs.setdefault('token_uri', info['token_uri'])
+ return cls(signer, **kwargs)
+
+ @classmethod
+ def from_service_account_info(cls, info, **kwargs):
+ """Creates a credentials instance from parsed service account info.
+
+ Args:
+ info (Mapping[str, str]): The service account info in Google
+ format.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: The constructed
+ credentials.
+
+ Raises:
+ ValueError: If the info is not in the expected format.
+ """
+ signer = _service_account_info.from_dict(
+ info, require=['client_email', 'token_uri'])
+ return cls._from_signer_and_info(signer, info, **kwargs)
+
+ @classmethod
+ def from_service_account_file(cls, filename, **kwargs):
+ """Creates a credentials instance from a service account json file.
+
+ Args:
+ filename (str): The path to the service account json file.
+ kwargs: Additional arguments to pass to the constructor.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: The constructed
+ credentials.
+ """
+ info, signer = _service_account_info.from_filename(
+ filename, require=['client_email', 'token_uri'])
+ return cls._from_signer_and_info(signer, info, **kwargs)
+
+ def with_target_audience(self, target_audience):
+ """Create a copy of these credentials with the specified target
+ audience.
+
+ Args:
+ target_audience (str): The intended audience for these credentials,
+ used when requesting the ID Token.
+
+ Returns:
+ google.auth.service_account.IDTokenCredentials: A new credentials
+ instance.
+ """
+ return self.__class__(
+ self._signer,
+ service_account_email=self._service_account_email,
+ token_uri=self._token_uri,
+ target_audience=target_audience,
+ additional_claims=self._additional_claims.copy())
+
+ def _make_authorization_grant_assertion(self):
+ """Create the OAuth 2.0 assertion.
+
+ This assertion is used during the OAuth 2.0 grant to acquire an
+ ID token.
+
+ Returns:
+ bytes: The authorization grant assertion.
+ """
+ now = _helpers.utcnow()
+ lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
+ expiry = now + lifetime
+
+ payload = {
+ 'iat': _helpers.datetime_to_secs(now),
+ 'exp': _helpers.datetime_to_secs(expiry),
+ # The issuer must be the service account email.
+ 'iss': self.service_account_email,
+ # The audience must be the auth token endpoint's URI
+ 'aud': self._token_uri,
+ # The target audience specifies which service the ID token is
+ # intended for.
+ 'target_audience': self._target_audience
+ }
+
+ payload.update(self._additional_claims)
+
+ token = jwt.encode(self._signer, payload)
+
+ return token
+
+ @_helpers.copy_docstring(credentials.Credentials)
+ def refresh(self, request):
+ assertion = self._make_authorization_grant_assertion()
+ access_token, expiry, _ = _client.id_token_jwt_grant(
+ request, self._token_uri, assertion)
+ self.token = access_token
+ self.expiry = expiry
+
+ @property
+ def service_account_email(self):
+ """The service account email."""
+ return self._service_account_email
+
+ @_helpers.copy_docstring(credentials.Signing)
+ def sign_bytes(self, message):
+ return self._signer.sign(message)
+
+ @property
+ @_helpers.copy_docstring(credentials.Signing)
+ def signer(self):
+ return self._signer
+
+ @property
+ @_helpers.copy_docstring(credentials.Signing)
+ def signer_email(self):
+ return self._service_account_email
diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py
index 6aeb3d1..3ec7fc6 100644
--- a/tests/oauth2/test__client.py
+++ b/tests/oauth2/test__client.py
@@ -14,6 +14,7 @@
import datetime
import json
+import os
import mock
import pytest
@@ -21,11 +22,22 @@
from six.moves import http_client
from six.moves import urllib
+from google.auth import _helpers
+from google.auth import crypt
from google.auth import exceptions
+from google.auth import jwt
from google.auth import transport
from google.oauth2 import _client
+DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+
+with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+ PRIVATE_KEY_BYTES = fh.read()
+
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+
+
def test__handle_error_response():
response_data = json.dumps({
'error': 'help',
@@ -129,6 +141,42 @@
_client.jwt_grant(request, 'http://example.com', 'assertion_value')
+def test_id_token_jwt_grant():
+ now = _helpers.utcnow()
+ id_token_expiry = _helpers.datetime_to_secs(now)
+ id_token = jwt.encode(SIGNER, {'exp': id_token_expiry}).decode('utf-8')
+ request = make_request({
+ 'id_token': id_token,
+ 'extra': 'data'})
+
+ token, expiry, extra_data = _client.id_token_jwt_grant(
+ request, 'http://example.com', 'assertion_value')
+
+ # Check request call
+ verify_request_params(request, {
+ 'grant_type': _client._JWT_GRANT_TYPE,
+ 'assertion': 'assertion_value'
+ })
+
+ # Check result
+ assert token == id_token
+ # JWT does not store microseconds
+ now = now.replace(microsecond=0)
+ assert expiry == now
+ assert extra_data['extra'] == 'data'
+
+
+def test_id_token_jwt_grant_no_access_token():
+ request = make_request({
+ # No access token.
+ 'expires_in': 500,
+ 'extra': 'data'})
+
+ with pytest.raises(exceptions.RefreshError):
+ _client.id_token_jwt_grant(
+ request, 'http://example.com', 'assertion_value')
+
+
@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
def test_refresh_grant(unused_utcnow):
request = make_request({
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index 9c235db..54ac0f5 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -216,3 +216,126 @@
# Credentials should now be valid.
assert credentials.valid
+
+
+class TestIDTokenCredentials(object):
+ SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
+ TOKEN_URI = 'https://example.com/oauth2/token'
+ TARGET_AUDIENCE = 'https://example.com'
+
+ @classmethod
+ def make_credentials(cls):
+ return service_account.IDTokenCredentials(
+ SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI,
+ cls.TARGET_AUDIENCE)
+
+ def test_from_service_account_info(self):
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_info(
+ SERVICE_ACCOUNT_INFO,
+ target_audience=self.TARGET_AUDIENCE))
+
+ assert (credentials._signer.key_id ==
+ SERVICE_ACCOUNT_INFO['private_key_id'])
+ assert (credentials.service_account_email ==
+ SERVICE_ACCOUNT_INFO['client_email'])
+ assert credentials._token_uri == SERVICE_ACCOUNT_INFO['token_uri']
+ assert credentials._target_audience == self.TARGET_AUDIENCE
+
+ def test_from_service_account_file(self):
+ info = SERVICE_ACCOUNT_INFO.copy()
+
+ credentials = (
+ service_account.IDTokenCredentials.from_service_account_file(
+ SERVICE_ACCOUNT_JSON_FILE,
+ target_audience=self.TARGET_AUDIENCE))
+
+ assert credentials.service_account_email == info['client_email']
+ assert credentials._signer.key_id == info['private_key_id']
+ assert credentials._token_uri == info['token_uri']
+ assert credentials._target_audience == self.TARGET_AUDIENCE
+
+ def test_default_state(self):
+ credentials = self.make_credentials()
+ assert not credentials.valid
+ # Expiration hasn't been set yet
+ assert not credentials.expired
+
+ def test_sign_bytes(self):
+ credentials = self.make_credentials()
+ to_sign = b'123'
+ signature = credentials.sign_bytes(to_sign)
+ assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
+
+ def test_signer(self):
+ credentials = self.make_credentials()
+ assert isinstance(credentials.signer, crypt.Signer)
+
+ def test_signer_email(self):
+ credentials = self.make_credentials()
+ assert credentials.signer_email == self.SERVICE_ACCOUNT_EMAIL
+
+ def test_with_target_audience(self):
+ credentials = self.make_credentials()
+ new_credentials = credentials.with_target_audience(
+ 'https://new.example.com')
+ assert new_credentials._target_audience == 'https://new.example.com'
+
+ def test__make_authorization_grant_assertion(self):
+ credentials = self.make_credentials()
+ token = credentials._make_authorization_grant_assertion()
+ payload = jwt.decode(token, PUBLIC_CERT_BYTES)
+ assert payload['iss'] == self.SERVICE_ACCOUNT_EMAIL
+ assert payload['aud'] == self.TOKEN_URI
+ assert payload['target_audience'] == self.TARGET_AUDIENCE
+
+ @mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True)
+ def test_refresh_success(self, id_token_jwt_grant):
+ credentials = self.make_credentials()
+ token = 'token'
+ id_token_jwt_grant.return_value = (
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ {})
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Refresh credentials
+ credentials.refresh(request)
+
+ # Check jwt grant call.
+ assert id_token_jwt_grant.called
+
+ called_request, token_uri, assertion = id_token_jwt_grant.call_args[0]
+ assert called_request == request
+ assert token_uri == credentials._token_uri
+ assert jwt.decode(assertion, PUBLIC_CERT_BYTES)
+ # No further assertion done on the token, as there are separate tests
+ # for checking the authorization grant assertion.
+
+ # Check that the credentials have the token.
+ assert credentials.token == token
+
+ # Check that the credentials are valid (have a token and are not
+ # expired)
+ assert credentials.valid
+
+ @mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True)
+ def test_before_request_refreshes(self, id_token_jwt_grant):
+ credentials = self.make_credentials()
+ token = 'token'
+ id_token_jwt_grant.return_value = (
+ token, _helpers.utcnow() + datetime.timedelta(seconds=500), None)
+ request = mock.create_autospec(transport.Request, instance=True)
+
+ # Credentials should start as invalid
+ assert not credentials.valid
+
+ # before_request should cause a refresh
+ credentials.before_request(
+ request, 'GET', 'http://example.com?a=1#3', {})
+
+ # The refresh endpoint should've been called.
+ assert id_token_jwt_grant.called
+
+ # Credentials should now be valid.
+ assert credentials.valid