feat: add fetch_id_token to support id_token adc (#469)
feat: id_token adc with gcloud cred
diff --git a/google/oauth2/id_token.py b/google/oauth2/id_token.py
index 1dbfb20..f559c6c 100644
--- a/google/oauth2/id_token.py
+++ b/google/oauth2/id_token.py
@@ -59,12 +59,16 @@
"""
import json
+import os
+import six
from six.moves import http_client
+from google.auth import environment_vars
from google.auth import exceptions
from google.auth import jwt
+
# The URL that provides public certificates for verifying ID tokens issued
# by Google's OAuth 2.0 authorization server.
_GOOGLE_OAUTH2_CERTS_URL = "https://www.googleapis.com/oauth2/v1/certs"
@@ -159,3 +163,90 @@
return verify_token(
id_token, request, audience=audience, certs_url=_GOOGLE_APIS_CERTS_URL
)
+
+
+def fetch_id_token(request, audience):
+ """Fetch the ID Token from the current environment.
+
+ This function acquires ID token from the environment in the following order:
+
+ 1. If the application is running in Compute Engine, App Engine or Cloud Run,
+ then the ID token are obtained from the metadata server.
+ 2. If the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` is set
+ to the path of a valid service account JSON file, then ID token is
+ acquired using this service account credentials.
+ 3. If metadata server doesn't exist and no valid service account credentials
+ are found, :class:`~google.auth.exceptions.DefaultCredentialsError` will
+ be raised.
+
+ Example::
+
+ import google.oauth2.id_token
+ import google.auth.transport.requests
+
+ request = google.auth.transport.requests.Request()
+ target_audience = "https://pubsub.googleapis.com"
+
+ id_token = google.oauth2.id_token.fetch_id_token(request, target_audience)
+
+ Args:
+ request (google.auth.transport.Request): A callable used to make
+ HTTP requests.
+ audience (str): The audience that this ID token is intended for.
+
+ Returns:
+ str: The ID token.
+
+ Raises:
+ ~google.auth.exceptions.DefaultCredentialsError:
+ If metadata server doesn't exist and no valid service account
+ credentials are found.
+ """
+ # 1. First try to fetch ID token from metada server if it exists. The code
+ # works for GAE and Cloud Run metadata server as well.
+ try:
+ from google.auth import compute_engine
+
+ credentials = compute_engine.IDTokenCredentials(
+ request, audience, use_metadata_identity_endpoint=True
+ )
+ credentials.refresh(request)
+ return credentials.token
+ except (ImportError, exceptions.TransportError):
+ pass
+
+ # 2. Try to use service account credentials to get ID token.
+
+ # Try to get credentials from the GOOGLE_APPLICATION_CREDENTIALS environment
+ # variable.
+ credentials_filename = os.environ.get(environment_vars.CREDENTIALS)
+ if not (
+ credentials_filename
+ and os.path.exists(credentials_filename)
+ and os.path.isfile(credentials_filename)
+ ):
+ raise exceptions.DefaultCredentialsError(
+ "Neither metadata server or valid service account credentials are found."
+ )
+
+ try:
+ with open(credentials_filename, "r") as f:
+ info = json.load(f)
+ credentials_content = (
+ (info.get("type") == "service_account") and info or None
+ )
+
+ from google.oauth2 import service_account
+
+ credentials = service_account.IDTokenCredentials.from_service_account_info(
+ credentials_content, target_audience=audience
+ )
+ except ValueError as caught_exc:
+ new_exc = exceptions.DefaultCredentialsError(
+ "Neither metadata server or valid service account credentials are found.",
+ caught_exc,
+ )
+ six.raise_from(new_exc, caught_exc)
+
+ credentials.refresh(request)
+ return credentials.token