feat: workload identity federation support (#698)

Using workload identity federation, applications can access Google Cloud resources from Amazon Web Services (AWS), Microsoft Azure or any identity provider that supports OpenID Connect (OIDC). Workload identity federation is recommended for non-Google Cloud environments as it avoids the need to download, manage and store service account private keys locally.

This includes a rollforward of the [previous reverted PR](https://github.com/googleapis/google-auth-library-python/pull/686) and the [fix](https://github.com/googleapis/google-auth-library-python/pull/686) to not pass scopes to user credentials from `google.auth.default()`.
diff --git a/google/auth/_default.py b/google/auth/_default.py
index 3b8c281..8e3b210 100644
--- a/google/auth/_default.py
+++ b/google/auth/_default.py
@@ -34,7 +34,8 @@
 # Valid types accepted for file-based credentials.
 _AUTHORIZED_USER_TYPE = "authorized_user"
 _SERVICE_ACCOUNT_TYPE = "service_account"
-_VALID_TYPES = (_AUTHORIZED_USER_TYPE, _SERVICE_ACCOUNT_TYPE)
+_EXTERNAL_ACCOUNT_TYPE = "external_account"
+_VALID_TYPES = (_AUTHORIZED_USER_TYPE, _SERVICE_ACCOUNT_TYPE, _EXTERNAL_ACCOUNT_TYPE)
 
 # Help message when no credentials can be found.
 _HELP_MESSAGE = """\
@@ -70,12 +71,12 @@
 
 
 def load_credentials_from_file(
-    filename, scopes=None, default_scopes=None, quota_project_id=None
+    filename, scopes=None, default_scopes=None, quota_project_id=None, request=None
 ):
     """Loads Google credentials from a file.
 
-    The credentials file must be a service account key or stored authorized
-    user credentials.
+    The credentials file must be a service account key, stored authorized
+    user credentials or external account credentials.
 
     Args:
         filename (str): The full path to the credentials file.
@@ -85,12 +86,18 @@
         default_scopes (Optional[Sequence[str]]): Default scopes passed by a
             Google client library. Use 'scopes' for user-defined scopes.
         quota_project_id (Optional[str]):  The project ID used for
-                quota and billing.
+            quota and billing.
+        request (Optional[google.auth.transport.Request]): An object used to make
+            HTTP requests. This is used to determine the associated project ID
+            for a workload identity pool resource (external account credentials).
+            If not specified, then it will use a
+            google.auth.transport.requests.Request client to make requests.
 
     Returns:
         Tuple[google.auth.credentials.Credentials, Optional[str]]: Loaded
             credentials and the project ID. Authorized user credentials do not
-            have the project ID information.
+            have the project ID information. External account credentials project
+            IDs may not always be determined.
 
     Raises:
         google.auth.exceptions.DefaultCredentialsError: if the file is in the
@@ -146,6 +153,18 @@
             credentials = credentials.with_quota_project(quota_project_id)
         return credentials, info.get("project_id")
 
+    elif credential_type == _EXTERNAL_ACCOUNT_TYPE:
+        credentials, project_id = _get_external_account_credentials(
+            info,
+            filename,
+            scopes=scopes,
+            default_scopes=default_scopes,
+            request=request,
+        )
+        if quota_project_id:
+            credentials = credentials.with_quota_project(quota_project_id)
+        return credentials, project_id
+
     else:
         raise exceptions.DefaultCredentialsError(
             "The file {file} does not have a valid type. "
@@ -252,6 +271,65 @@
         return None, None
 
 
+def _get_external_account_credentials(
+    info, filename, scopes=None, default_scopes=None, request=None
+):
+    """Loads external account Credentials from the parsed external account info.
+
+    The credentials information must correspond to a supported external account
+    credentials.
+
+    Args:
+        info (Mapping[str, str]): The external account info in Google format.
+        filename (str): The full path to the credentials file.
+        scopes (Optional[Sequence[str]]): The list of scopes for the credentials. If
+            specified, the credentials will automatically be scoped if
+            necessary.
+        default_scopes (Optional[Sequence[str]]): Default scopes passed by a
+            Google client library. Use 'scopes' for user-defined scopes.
+        request (Optional[google.auth.transport.Request]): An object used to make
+            HTTP requests. This is used to determine the associated project ID
+            for a workload identity pool resource (external account credentials).
+            If not specified, then it will use a
+            google.auth.transport.requests.Request client to make requests.
+
+    Returns:
+        Tuple[google.auth.credentials.Credentials, Optional[str]]: Loaded
+            credentials and the project ID. External account credentials project
+            IDs may not always be determined.
+
+    Raises:
+        google.auth.exceptions.DefaultCredentialsError: if the info dictionary
+            is in the wrong format or is missing required information.
+    """
+    # There are currently 2 types of external_account credentials.
+    try:
+        # Check if configuration corresponds to an AWS credentials.
+        from google.auth import aws
+
+        credentials = aws.Credentials.from_info(
+            info, scopes=scopes, default_scopes=default_scopes
+        )
+    except ValueError:
+        try:
+            # Check if configuration corresponds to an Identity Pool credentials.
+            from google.auth import identity_pool
+
+            credentials = identity_pool.Credentials.from_info(
+                info, scopes=scopes, default_scopes=default_scopes
+            )
+        except ValueError:
+            # If the configuration is invalid or does not correspond to any
+            # supported external_account credentials, raise an error.
+            raise exceptions.DefaultCredentialsError(
+                "Failed to load external account credentials from {}".format(filename)
+            )
+    if request is None:
+        request = google.auth.transport.requests.Request()
+
+    return credentials, credentials.get_project_id(request=request)
+
+
 def default(scopes=None, request=None, quota_project_id=None, default_scopes=None):
     """Gets the default credentials for the current environment.
 
@@ -265,6 +343,15 @@
        loaded and returned. The project ID returned is the project ID defined
        in the service account file if available (some older files do not
        contain project ID information).
+
+       If the environment variable is set to the path of a valid external
+       account JSON configuration file (workload identity federation), then the
+       configuration file is used to determine and retrieve the external
+       credentials from the current environment (AWS, Azure, etc).
+       These will then be exchanged for Google access tokens via the Google STS
+       endpoint.
+       The project ID returned in this case is the one corresponding to the
+       underlying workload identity pool resource if determinable.
     2. If the `Google Cloud SDK`_ is installed and has application default
        credentials set they are loaded and returned.
 
@@ -310,11 +397,15 @@
         scopes (Sequence[str]): The list of scopes for the credentials. If
             specified, the credentials will automatically be scoped if
             necessary.
-        request (google.auth.transport.Request): An object used to make
-            HTTP requests. This is used to detect whether the application
-            is running on Compute Engine. If not specified, then it will
-            use the standard library http client to make requests.
-        quota_project_id (Optional[str]):  The project ID used for
+        request (Optional[google.auth.transport.Request]): An object used to make
+            HTTP requests. This is used to either detect whether the application
+            is running on Compute Engine or to determine the associated project
+            ID for a workload identity pool resource (external account
+            credentials). If not specified, then it will either use the standard
+            library http client to make requests for Compute Engine credentials
+            or a google.auth.transport.requests.Request client for external
+            account credentials.
+        quota_project_id (Optional[str]): The project ID used for
             quota and billing.
         default_scopes (Optional[Sequence[str]]): Default scopes passed by a
             Google client library. Use 'scopes' for user-defined scopes.
@@ -336,6 +427,10 @@
     )
 
     checkers = (
+        # Avoid passing scopes here to prevent passing scopes to user credentials.
+        # with_scopes_if_required() below will ensure scopes/default scopes are
+        # safely set on the returned credentials since requires_scopes will
+        # guard against setting scopes on user credentials.
         _get_explicit_environ_credentials,
         _get_gcloud_sdk_credentials,
         _get_gae_credentials,
@@ -348,6 +443,17 @@
             credentials = with_scopes_if_required(
                 credentials, scopes, default_scopes=default_scopes
             )
+
+            # For external account credentials, scopes are required to determine
+            # the project ID. Try to get the project ID again if not yet
+            # determined.
+            if not project_id and callable(
+                getattr(credentials, "get_project_id", None)
+            ):
+                if request is None:
+                    request = google.auth.transport.requests.Request()
+                project_id = credentials.get_project_id(request=request)
+
             if quota_project_id:
                 credentials = credentials.with_quota_project(quota_project_id)
 
diff --git a/google/auth/aws.py b/google/auth/aws.py
new file mode 100644
index 0000000..b362dd3
--- /dev/null
+++ b/google/auth/aws.py
@@ -0,0 +1,714 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""AWS Credentials and AWS Signature V4 Request Signer.
+
+This module provides credentials to access Google Cloud resources from Amazon
+Web Services (AWS) workloads. These credentials are recommended over the
+use of service account credentials in AWS as they do not involve the management
+of long-live service account private keys.
+
+AWS Credentials are initialized using external_account arguments which are
+typically loaded from the external credentials JSON file.
+Unlike other Credentials that can be initialized with a list of explicit
+arguments, secrets or credentials, external account clients use the
+environment and hints/guidelines provided by the external_account JSON
+file to retrieve credentials and exchange them for Google access tokens.
+
+This module also provides a basic implementation of the
+`AWS Signature Version 4`_ request signing algorithm.
+
+AWS Credentials use serialized signed requests to the
+`AWS STS GetCallerIdentity`_ API that can be exchanged for Google access tokens
+via the GCP STS endpoint.
+
+.. _AWS Signature Version 4: https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html
+.. _AWS STS GetCallerIdentity: https://docs.aws.amazon.com/STS/latest/APIReference/API_GetCallerIdentity.html
+"""
+
+import hashlib
+import hmac
+import io
+import json
+import os
+import re
+
+from six.moves import http_client
+from six.moves import urllib
+
+from google.auth import _helpers
+from google.auth import environment_vars
+from google.auth import exceptions
+from google.auth import external_account
+
+# AWS Signature Version 4 signing algorithm identifier.
+_AWS_ALGORITHM = "AWS4-HMAC-SHA256"
+# The termination string for the AWS credential scope value as defined in
+# https://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
+_AWS_REQUEST_TYPE = "aws4_request"
+# The AWS authorization header name for the security session token if available.
+_AWS_SECURITY_TOKEN_HEADER = "x-amz-security-token"
+# The AWS authorization header name for the auto-generated date.
+_AWS_DATE_HEADER = "x-amz-date"
+
+
+class RequestSigner(object):
+    """Implements an AWS request signer based on the AWS Signature Version 4 signing
+    process.
+    https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html
+    """
+
+    def __init__(self, region_name):
+        """Instantiates an AWS request signer used to compute authenticated signed
+        requests to AWS APIs based on the AWS Signature Version 4 signing process.
+
+        Args:
+            region_name (str): The AWS region to use.
+        """
+
+        self._region_name = region_name
+
+    def get_request_options(
+        self,
+        aws_security_credentials,
+        url,
+        method,
+        request_payload="",
+        additional_headers={},
+    ):
+        """Generates the signed request for the provided HTTP request for calling
+        an AWS API. This follows the steps described at:
+        https://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html
+
+        Args:
+            aws_security_credentials (Mapping[str, str]): A dictionary containing
+                the AWS security credentials.
+            url (str): The AWS service URL containing the canonical URI and
+                query string.
+            method (str): The HTTP method used to call this API.
+            request_payload (Optional[str]): The optional request payload if
+                available.
+            additional_headers (Optional[Mapping[str, str]]): The optional
+                additional headers needed for the requested AWS API.
+
+        Returns:
+            Mapping[str, str]: The AWS signed request dictionary object.
+        """
+        # Get AWS credentials.
+        access_key = aws_security_credentials.get("access_key_id")
+        secret_key = aws_security_credentials.get("secret_access_key")
+        security_token = aws_security_credentials.get("security_token")
+
+        additional_headers = additional_headers or {}
+
+        uri = urllib.parse.urlparse(url)
+        # Validate provided URL.
+        if not uri.hostname or uri.scheme != "https":
+            raise ValueError("Invalid AWS service URL")
+
+        header_map = _generate_authentication_header_map(
+            host=uri.hostname,
+            canonical_uri=os.path.normpath(uri.path or "/"),
+            canonical_querystring=_get_canonical_querystring(uri.query),
+            method=method,
+            region=self._region_name,
+            access_key=access_key,
+            secret_key=secret_key,
+            security_token=security_token,
+            request_payload=request_payload,
+            additional_headers=additional_headers,
+        )
+        headers = {
+            "Authorization": header_map.get("authorization_header"),
+            "host": uri.hostname,
+        }
+        # Add x-amz-date if available.
+        if "amz_date" in header_map:
+            headers[_AWS_DATE_HEADER] = header_map.get("amz_date")
+        # Append additional optional headers, eg. X-Amz-Target, Content-Type, etc.
+        for key in additional_headers:
+            headers[key] = additional_headers[key]
+
+        # Add session token if available.
+        if security_token is not None:
+            headers[_AWS_SECURITY_TOKEN_HEADER] = security_token
+
+        signed_request = {"url": url, "method": method, "headers": headers}
+        if request_payload:
+            signed_request["data"] = request_payload
+        return signed_request
+
+
+def _get_canonical_querystring(query):
+    """Generates the canonical query string given a raw query string.
+    Logic is based on
+    https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+
+    Args:
+        query (str): The raw query string.
+
+    Returns:
+        str: The canonical query string.
+    """
+    # Parse raw query string.
+    querystring = urllib.parse.parse_qs(query)
+    querystring_encoded_map = {}
+    for key in querystring:
+        quote_key = urllib.parse.quote(key, safe="-_.~")
+        # URI encode key.
+        querystring_encoded_map[quote_key] = []
+        for item in querystring[key]:
+            # For each key, URI encode all values for that key.
+            querystring_encoded_map[quote_key].append(
+                urllib.parse.quote(item, safe="-_.~")
+            )
+        # Sort values for each key.
+        querystring_encoded_map[quote_key].sort()
+    # Sort keys.
+    sorted_keys = list(querystring_encoded_map.keys())
+    sorted_keys.sort()
+    # Reconstruct the query string. Preserve keys with multiple values.
+    querystring_encoded_pairs = []
+    for key in sorted_keys:
+        for item in querystring_encoded_map[key]:
+            querystring_encoded_pairs.append("{}={}".format(key, item))
+    return "&".join(querystring_encoded_pairs)
+
+
+def _sign(key, msg):
+    """Creates the HMAC-SHA256 hash of the provided message using the provided
+    key.
+
+    Args:
+        key (str): The HMAC-SHA256 key to use.
+        msg (str): The message to hash.
+
+    Returns:
+        str: The computed hash bytes.
+    """
+    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
+
+
+def _get_signing_key(key, date_stamp, region_name, service_name):
+    """Calculates the signing key used to calculate the signature for
+    AWS Signature Version 4 based on:
+    https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
+
+    Args:
+        key (str): The AWS secret access key.
+        date_stamp (str): The '%Y%m%d' date format.
+        region_name (str): The AWS region.
+        service_name (str): The AWS service name, eg. sts.
+
+    Returns:
+        str: The signing key bytes.
+    """
+    k_date = _sign(("AWS4" + key).encode("utf-8"), date_stamp)
+    k_region = _sign(k_date, region_name)
+    k_service = _sign(k_region, service_name)
+    k_signing = _sign(k_service, "aws4_request")
+    return k_signing
+
+
+def _generate_authentication_header_map(
+    host,
+    canonical_uri,
+    canonical_querystring,
+    method,
+    region,
+    access_key,
+    secret_key,
+    security_token,
+    request_payload="",
+    additional_headers={},
+):
+    """Generates the authentication header map needed for generating the AWS
+    Signature Version 4 signed request.
+
+    Args:
+        host (str): The AWS service URL hostname.
+        canonical_uri (str): The AWS service URL path name.
+        canonical_querystring (str): The AWS service URL query string.
+        method (str): The HTTP method used to call this API.
+        region (str): The AWS region.
+        access_key (str): The AWS access key ID.
+        secret_key (str): The AWS secret access key.
+        security_token (Optional[str]): The AWS security session token. This is
+            available for temporary sessions.
+        request_payload (Optional[str]): The optional request payload if
+            available.
+        additional_headers (Optional[Mapping[str, str]]): The optional
+            additional headers needed for the requested AWS API.
+
+    Returns:
+        Mapping[str, str]: The AWS authentication header dictionary object.
+            This contains the x-amz-date and authorization header information.
+    """
+    # iam.amazonaws.com host => iam service.
+    # sts.us-east-2.amazonaws.com host => sts service.
+    service_name = host.split(".")[0]
+
+    current_time = _helpers.utcnow()
+    amz_date = current_time.strftime("%Y%m%dT%H%M%SZ")
+    date_stamp = current_time.strftime("%Y%m%d")
+
+    # Change all additional headers to be lower case.
+    full_headers = {}
+    for key in additional_headers:
+        full_headers[key.lower()] = additional_headers[key]
+    # Add AWS session token if available.
+    if security_token is not None:
+        full_headers[_AWS_SECURITY_TOKEN_HEADER] = security_token
+
+    # Required headers
+    full_headers["host"] = host
+    # Do not use generated x-amz-date if the date header is provided.
+    # Previously the date was not fixed with x-amz- and could be provided
+    # manually.
+    # https://github.com/boto/botocore/blob/879f8440a4e9ace5d3cf145ce8b3d5e5ffb892ef/tests/unit/auth/aws4_testsuite/get-header-value-trim.req
+    if "date" not in full_headers:
+        full_headers[_AWS_DATE_HEADER] = amz_date
+
+    # Header keys need to be sorted alphabetically.
+    canonical_headers = ""
+    header_keys = list(full_headers.keys())
+    header_keys.sort()
+    for key in header_keys:
+        canonical_headers = "{}{}:{}\n".format(
+            canonical_headers, key, full_headers[key]
+        )
+    signed_headers = ";".join(header_keys)
+
+    payload_hash = hashlib.sha256((request_payload or "").encode("utf-8")).hexdigest()
+
+    # https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+    canonical_request = "{}\n{}\n{}\n{}\n{}\n{}".format(
+        method,
+        canonical_uri,
+        canonical_querystring,
+        canonical_headers,
+        signed_headers,
+        payload_hash,
+    )
+
+    credential_scope = "{}/{}/{}/{}".format(
+        date_stamp, region, service_name, _AWS_REQUEST_TYPE
+    )
+
+    # https://docs.aws.amazon.com/general/latest/gr/sigv4-create-string-to-sign.html
+    string_to_sign = "{}\n{}\n{}\n{}".format(
+        _AWS_ALGORITHM,
+        amz_date,
+        credential_scope,
+        hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(),
+    )
+
+    # https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html
+    signing_key = _get_signing_key(secret_key, date_stamp, region, service_name)
+    signature = hmac.new(
+        signing_key, string_to_sign.encode("utf-8"), hashlib.sha256
+    ).hexdigest()
+
+    # https://docs.aws.amazon.com/general/latest/gr/sigv4-add-signature-to-request.html
+    authorization_header = "{} Credential={}/{}, SignedHeaders={}, Signature={}".format(
+        _AWS_ALGORITHM, access_key, credential_scope, signed_headers, signature
+    )
+
+    authentication_header = {"authorization_header": authorization_header}
+    # Do not use generated x-amz-date if the date header is provided.
+    if "date" not in full_headers:
+        authentication_header["amz_date"] = amz_date
+    return authentication_header
+
+
+class Credentials(external_account.Credentials):
+    """AWS external account credentials.
+    This is used to exchange serialized AWS signature v4 signed requests to
+    AWS STS GetCallerIdentity service for Google access tokens.
+    """
+
+    def __init__(
+        self,
+        audience,
+        subject_token_type,
+        token_url,
+        credential_source=None,
+        service_account_impersonation_url=None,
+        client_id=None,
+        client_secret=None,
+        quota_project_id=None,
+        scopes=None,
+        default_scopes=None,
+    ):
+        """Instantiates an AWS workload external account credentials object.
+
+        Args:
+            audience (str): The STS audience field.
+            subject_token_type (str): The subject token type.
+            token_url (str): The STS endpoint URL.
+            credential_source (Mapping): The credential source dictionary used
+                to provide instructions on how to retrieve external credential
+                to be exchanged for Google access tokens.
+            service_account_impersonation_url (Optional[str]): The optional
+                service account impersonation getAccessToken URL.
+            client_id (Optional[str]): The optional client ID.
+            client_secret (Optional[str]): The optional client secret.
+            quota_project_id (Optional[str]): The optional quota project ID.
+            scopes (Optional[Sequence[str]]): Optional scopes to request during
+                the authorization grant.
+            default_scopes (Optional[Sequence[str]]): Default scopes passed by a
+                Google client library. Use 'scopes' for user-defined scopes.
+
+        Raises:
+            google.auth.exceptions.RefreshError: If an error is encountered during
+                access token retrieval logic.
+            ValueError: For invalid parameters.
+
+        .. note:: Typically one of the helper constructors
+            :meth:`from_file` or
+            :meth:`from_info` are used instead of calling the constructor directly.
+        """
+        super(Credentials, self).__init__(
+            audience=audience,
+            subject_token_type=subject_token_type,
+            token_url=token_url,
+            credential_source=credential_source,
+            service_account_impersonation_url=service_account_impersonation_url,
+            client_id=client_id,
+            client_secret=client_secret,
+            quota_project_id=quota_project_id,
+            scopes=scopes,
+            default_scopes=default_scopes,
+        )
+        credential_source = credential_source or {}
+        self._environment_id = credential_source.get("environment_id") or ""
+        self._region_url = credential_source.get("region_url")
+        self._security_credentials_url = credential_source.get("url")
+        self._cred_verification_url = credential_source.get(
+            "regional_cred_verification_url"
+        )
+        self._region = None
+        self._request_signer = None
+        self._target_resource = audience
+
+        # Get the environment ID. Currently, only one version supported (v1).
+        matches = re.match(r"^(aws)([\d]+)$", self._environment_id)
+        if matches:
+            env_id, env_version = matches.groups()
+        else:
+            env_id, env_version = (None, None)
+
+        if env_id != "aws" or self._cred_verification_url is None:
+            raise ValueError("No valid AWS 'credential_source' provided")
+        elif int(env_version or "") != 1:
+            raise ValueError(
+                "aws version '{}' is not supported in the current build.".format(
+                    env_version
+                )
+            )
+
+    def retrieve_subject_token(self, request):
+        """Retrieves the subject token using the credential_source object.
+        The subject token is a serialized `AWS GetCallerIdentity signed request`_.
+
+        The logic is summarized as:
+
+        Retrieve the AWS region from the AWS_REGION environment variable or from
+        the AWS metadata server availability-zone if not found in the
+        environment variable.
+
+        Check AWS credentials in environment variables. If not found, retrieve
+        from the AWS metadata server security-credentials endpoint.
+
+        When retrieving AWS credentials from the metadata server
+        security-credentials endpoint, the AWS role needs to be determined by
+        calling the security-credentials endpoint without any argument. Then the
+        credentials can be retrieved via: security-credentials/role_name
+
+        Generate the signed request to AWS STS GetCallerIdentity action.
+
+        Inject x-goog-cloud-target-resource into header and serialize the
+        signed request. This will be the subject-token to pass to GCP STS.
+
+        .. _AWS GetCallerIdentity signed request:
+            https://cloud.google.com/iam/docs/access-resources-aws#exchange-token
+
+        Args:
+            request (google.auth.transport.Request): A callable used to make
+                HTTP requests.
+        Returns:
+            str: The retrieved subject token.
+        """
+        # Initialize the request signer if not yet initialized after determining
+        # the current AWS region.
+        if self._request_signer is None:
+            self._region = self._get_region(request, self._region_url)
+            self._request_signer = RequestSigner(self._region)
+
+        # Retrieve the AWS security credentials needed to generate the signed
+        # request.
+        aws_security_credentials = self._get_security_credentials(request)
+        # Generate the signed request to AWS STS GetCallerIdentity API.
+        # Use the required regional endpoint. Otherwise, the request will fail.
+        request_options = self._request_signer.get_request_options(
+            aws_security_credentials,
+            self._cred_verification_url.replace("{region}", self._region),
+            "POST",
+        )
+        # The GCP STS endpoint expects the headers to be formatted as:
+        # [
+        #   {key: 'x-amz-date', value: '...'},
+        #   {key: 'Authorization', value: '...'},
+        #   ...
+        # ]
+        # And then serialized as:
+        # quote(json.dumps({
+        #   url: '...',
+        #   method: 'POST',
+        #   headers: [{key: 'x-amz-date', value: '...'}, ...]
+        # }))
+        request_headers = request_options.get("headers")
+        # The full, canonical resource name of the workload identity pool
+        # provider, with or without the HTTPS prefix.
+        # Including this header as part of the signature is recommended to
+        # ensure data integrity.
+        request_headers["x-goog-cloud-target-resource"] = self._target_resource
+
+        # Serialize AWS signed request.
+        # Keeping inner keys in sorted order makes testing easier for Python
+        # versions <=3.5 as the stringified JSON string would have a predictable
+        # key order.
+        aws_signed_req = {}
+        aws_signed_req["url"] = request_options.get("url")
+        aws_signed_req["method"] = request_options.get("method")
+        aws_signed_req["headers"] = []
+        # Reformat header to GCP STS expected format.
+        for key in sorted(request_headers.keys()):
+            aws_signed_req["headers"].append(
+                {"key": key, "value": request_headers[key]}
+            )
+
+        return urllib.parse.quote(
+            json.dumps(aws_signed_req, separators=(",", ":"), sort_keys=True)
+        )
+
+    def _get_region(self, request, url):
+        """Retrieves the current AWS region from either the AWS_REGION
+        environment variable or from the AWS metadata server.
+
+        Args:
+            request (google.auth.transport.Request): A callable used to make
+                HTTP requests.
+            url (str): The AWS metadata server region URL.
+
+        Returns:
+            str: The current AWS region.
+
+        Raises:
+            google.auth.exceptions.RefreshError: If an error occurs while
+                retrieving the AWS region.
+        """
+        # The AWS metadata server is not available in some AWS environments
+        # such as AWS lambda. Instead, it is available via environment
+        # variable.
+        env_aws_region = os.environ.get(environment_vars.AWS_REGION)
+        if env_aws_region is not None:
+            return env_aws_region
+
+        if not self._region_url:
+            raise exceptions.RefreshError("Unable to determine AWS region")
+        response = request(url=self._region_url, method="GET")
+
+        # Support both string and bytes type response.data.
+        response_body = (
+            response.data.decode("utf-8")
+            if hasattr(response.data, "decode")
+            else response.data
+        )
+
+        if response.status != 200:
+            raise exceptions.RefreshError(
+                "Unable to retrieve AWS region", response_body
+            )
+
+        # This endpoint will return the region in format: us-east-2b.
+        # Only the us-east-2 part should be used.
+        return response_body[:-1]
+
+    def _get_security_credentials(self, request):
+        """Retrieves the AWS security credentials required for signing AWS
+        requests from either the AWS security credentials environment variables
+        or from the AWS metadata server.
+
+        Args:
+            request (google.auth.transport.Request): A callable used to make
+                HTTP requests.
+
+        Returns:
+            Mapping[str, str]: The AWS security credentials dictionary object.
+
+        Raises:
+            google.auth.exceptions.RefreshError: If an error occurs while
+                retrieving the AWS security credentials.
+        """
+
+        # Check environment variables for permanent credentials first.
+        # https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html
+        env_aws_access_key_id = os.environ.get(environment_vars.AWS_ACCESS_KEY_ID)
+        env_aws_secret_access_key = os.environ.get(
+            environment_vars.AWS_SECRET_ACCESS_KEY
+        )
+        # This is normally not available for permanent credentials.
+        env_aws_session_token = os.environ.get(environment_vars.AWS_SESSION_TOKEN)
+        if env_aws_access_key_id and env_aws_secret_access_key:
+            return {
+                "access_key_id": env_aws_access_key_id,
+                "secret_access_key": env_aws_secret_access_key,
+                "security_token": env_aws_session_token,
+            }
+
+        # Get role name.
+        role_name = self._get_metadata_role_name(request)
+
+        # Get security credentials.
+        credentials = self._get_metadata_security_credentials(request, role_name)
+
+        return {
+            "access_key_id": credentials.get("AccessKeyId"),
+            "secret_access_key": credentials.get("SecretAccessKey"),
+            "security_token": credentials.get("Token"),
+        }
+
+    def _get_metadata_security_credentials(self, request, role_name):
+        """Retrieves the AWS security credentials required for signing AWS
+        requests from the AWS metadata server.
+
+        Args:
+            request (google.auth.transport.Request): A callable used to make
+                HTTP requests.
+            role_name (str): The AWS role name required by the AWS metadata
+                server security_credentials endpoint in order to return the
+                credentials.
+
+        Returns:
+            Mapping[str, str]: The AWS metadata server security credentials
+                response.
+
+        Raises:
+            google.auth.exceptions.RefreshError: If an error occurs while
+                retrieving the AWS security credentials.
+        """
+        headers = {"Content-Type": "application/json"}
+        response = request(
+            url="{}/{}".format(self._security_credentials_url, role_name),
+            method="GET",
+            headers=headers,
+        )
+
+        # support both string and bytes type response.data
+        response_body = (
+            response.data.decode("utf-8")
+            if hasattr(response.data, "decode")
+            else response.data
+        )
+
+        if response.status != http_client.OK:
+            raise exceptions.RefreshError(
+                "Unable to retrieve AWS security credentials", response_body
+            )
+
+        credentials_response = json.loads(response_body)
+
+        return credentials_response
+
+    def _get_metadata_role_name(self, request):
+        """Retrieves the AWS role currently attached to the current AWS
+        workload by querying the AWS metadata server. This is needed for the
+        AWS metadata server security credentials endpoint in order to retrieve
+        the AWS security credentials needed to sign requests to AWS APIs.
+
+        Args:
+            request (google.auth.transport.Request): A callable used to make
+                HTTP requests.
+
+        Returns:
+            str: The AWS role name.
+
+        Raises:
+            google.auth.exceptions.RefreshError: If an error occurs while
+                retrieving the AWS role name.
+        """
+        if self._security_credentials_url is None:
+            raise exceptions.RefreshError(
+                "Unable to determine the AWS metadata server security credentials endpoint"
+            )
+        response = request(url=self._security_credentials_url, method="GET")
+
+        # support both string and bytes type response.data
+        response_body = (
+            response.data.decode("utf-8")
+            if hasattr(response.data, "decode")
+            else response.data
+        )
+
+        if response.status != http_client.OK:
+            raise exceptions.RefreshError(
+                "Unable to retrieve AWS role name", response_body
+            )
+
+        return response_body
+
+    @classmethod
+    def from_info(cls, info, **kwargs):
+        """Creates an AWS Credentials instance from parsed external account info.
+
+        Args:
+            info (Mapping[str, str]): The AWS external account info in Google
+                format.
+            kwargs: Additional arguments to pass to the constructor.
+
+        Returns:
+            google.auth.aws.Credentials: The constructed credentials.
+
+        Raises:
+            ValueError: For invalid parameters.
+        """
+        return cls(
+            audience=info.get("audience"),
+            subject_token_type=info.get("subject_token_type"),
+            token_url=info.get("token_url"),
+            service_account_impersonation_url=info.get(
+                "service_account_impersonation_url"
+            ),
+            client_id=info.get("client_id"),
+            client_secret=info.get("client_secret"),
+            credential_source=info.get("credential_source"),
+            quota_project_id=info.get("quota_project_id"),
+            **kwargs
+        )
+
+    @classmethod
+    def from_file(cls, filename, **kwargs):
+        """Creates an AWS Credentials instance from an external account json file.
+
+        Args:
+            filename (str): The path to the AWS external account json file.
+            kwargs: Additional arguments to pass to the constructor.
+
+        Returns:
+            google.auth.aws.Credentials: The constructed credentials.
+        """
+        with io.open(filename, "r", encoding="utf-8") as json_file:
+            data = json.load(json_file)
+            return cls.from_info(data, **kwargs)
diff --git a/google/auth/environment_vars.py b/google/auth/environment_vars.py
index 46a8926..416bab0 100644
--- a/google/auth/environment_vars.py
+++ b/google/auth/environment_vars.py
@@ -59,3 +59,13 @@
 
 The default value is false. Users have to explicitly set this value to true
 in order to use client certificate to establish a mutual TLS channel."""
+
+# AWS environment variables used with AWS workload identity pools to retrieve
+# AWS security credentials and the AWS region needed to create a serialized
+# signed requests to the AWS STS GetCalledIdentity API that can be exchanged
+# for a Google access tokens via the GCP STS endpoint.
+# When not available the AWS metadata server is used to retrieve these values.
+AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID"
+AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY"
+AWS_SESSION_TOKEN = "AWS_SESSION_TOKEN"
+AWS_REGION = "AWS_REGION"
diff --git a/google/auth/exceptions.py b/google/auth/exceptions.py
index da06d86..b6f686b 100644
--- a/google/auth/exceptions.py
+++ b/google/auth/exceptions.py
@@ -43,3 +43,8 @@
 
 class ClientCertError(GoogleAuthError):
     """Used to indicate that client certificate is missing or invalid."""
+
+
+class OAuthError(GoogleAuthError):
+    """Used to indicate an error occurred during an OAuth related HTTP
+    request."""
diff --git a/google/auth/external_account.py b/google/auth/external_account.py
new file mode 100644
index 0000000..0429ee0
--- /dev/null
+++ b/google/auth/external_account.py
@@ -0,0 +1,305 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""External Account Credentials.
+
+This module provides credentials that exchange workload identity pool external
+credentials for Google access tokens. This facilitates accessing Google Cloud
+Platform resources from on-prem and non-Google Cloud platforms (e.g. AWS,
+Microsoft Azure, OIDC identity providers), using native credentials retrieved
+from the current environment without the need to copy, save and manage
+long-lived service account credentials.
+
+Specifically, this is intended to use access tokens acquired using the GCP STS
+token exchange endpoint following the `OAuth 2.0 Token Exchange`_ spec.
+
+.. _OAuth 2.0 Token Exchange: https://tools.ietf.org/html/rfc8693
+"""
+
+import abc
+import datetime
+import json
+
+import six
+
+from google.auth import _helpers
+from google.auth import credentials
+from google.auth import exceptions
+from google.auth import impersonated_credentials
+from google.oauth2 import sts
+from google.oauth2 import utils
+
+# The token exchange grant_type used for exchanging credentials.
+_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
+# The token exchange requested_token_type. This is always an access_token.
+_STS_REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
+# Cloud resource manager URL used to retrieve project information.
+_CLOUD_RESOURCE_MANAGER = "https://cloudresourcemanager.googleapis.com/v1/projects/"
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Credentials(credentials.Scoped, credentials.CredentialsWithQuotaProject):
+    """Base class for all external account credentials.
+
+    This is used to instantiate Credentials for exchanging external account
+    credentials for Google access token and authorizing requests to Google APIs.
+    The base class implements the common logic for exchanging external account
+    credentials for Google access tokens.
+    """
+
+    def __init__(
+        self,
+        audience,
+        subject_token_type,
+        token_url,
+        credential_source,
+        service_account_impersonation_url=None,
+        client_id=None,
+        client_secret=None,
+        quota_project_id=None,
+        scopes=None,
+        default_scopes=None,
+    ):
+        """Instantiates an external account credentials object.
+
+        Args:
+            audience (str): The STS audience field.
+            subject_token_type (str): The subject token type.
+            token_url (str): The STS endpoint URL.
+            credential_source (Mapping): The credential source dictionary.
+            service_account_impersonation_url (Optional[str]): The optional service account
+                impersonation generateAccessToken URL.
+            client_id (Optional[str]): The optional client ID.
+            client_secret (Optional[str]): The optional client secret.
+            quota_project_id (Optional[str]): The optional quota project ID.
+            scopes (Optional[Sequence[str]]): Optional scopes to request during the
+                authorization grant.
+            default_scopes (Optional[Sequence[str]]): Default scopes passed by a
+                Google client library. Use 'scopes' for user-defined scopes.
+        Raises:
+            google.auth.exceptions.RefreshError: If the generateAccessToken
+                endpoint returned an error.
+        """
+        super(Credentials, self).__init__()
+        self._audience = audience
+        self._subject_token_type = subject_token_type
+        self._token_url = token_url
+        self._credential_source = credential_source
+        self._service_account_impersonation_url = service_account_impersonation_url
+        self._client_id = client_id
+        self._client_secret = client_secret
+        self._quota_project_id = quota_project_id
+        self._scopes = scopes
+        self._default_scopes = default_scopes
+
+        if self._client_id:
+            self._client_auth = utils.ClientAuthentication(
+                utils.ClientAuthType.basic, self._client_id, self._client_secret
+            )
+        else:
+            self._client_auth = None
+        self._sts_client = sts.Client(self._token_url, self._client_auth)
+
+        if self._service_account_impersonation_url:
+            self._impersonated_credentials = self._initialize_impersonated_credentials()
+        else:
+            self._impersonated_credentials = None
+        self._project_id = None
+
+    @property
+    def requires_scopes(self):
+        """Checks if the credentials requires scopes.
+
+        Returns:
+            bool: True if there are no scopes set otherwise False.
+        """
+        return not self._scopes and not self._default_scopes
+
+    @property
+    def project_number(self):
+        """Optional[str]: The project number corresponding to the workload identity pool."""
+
+        # STS audience pattern:
+        # //iam.googleapis.com/projects/$PROJECT_NUMBER/locations/...
+        components = self._audience.split("/")
+        try:
+            project_index = components.index("projects")
+            if project_index + 1 < len(components):
+                return components[project_index + 1] or None
+        except ValueError:
+            return None
+
+    @_helpers.copy_docstring(credentials.Scoped)
+    def with_scopes(self, scopes, default_scopes=None):
+        return self.__class__(
+            audience=self._audience,
+            subject_token_type=self._subject_token_type,
+            token_url=self._token_url,
+            credential_source=self._credential_source,
+            service_account_impersonation_url=self._service_account_impersonation_url,
+            client_id=self._client_id,
+            client_secret=self._client_secret,
+            quota_project_id=self._quota_project_id,
+            scopes=scopes,
+            default_scopes=default_scopes,
+        )
+
+    @abc.abstractmethod
+    def retrieve_subject_token(self, request):
+        """Retrieves the subject token using the credential_source object.
+
+        Args:
+            request (google.auth.transport.Request): A callable used to make
+                HTTP requests.
+        Returns:
+            str: The retrieved subject token.
+        """
+        # pylint: disable=missing-raises-doc
+        # (pylint doesn't recognize that this is abstract)
+        raise NotImplementedError("retrieve_subject_token must be implemented")
+
+    def get_project_id(self, request):
+        """Retrieves the project ID corresponding to the workload identity pool.
+
+        When not determinable, None is returned.
+
+        This is introduced to support the current pattern of using the Auth library:
+
+            credentials, project_id = google.auth.default()
+
+        The resource may not have permission (resourcemanager.projects.get) to
+        call this API or the required scopes may not be selected:
+        https://cloud.google.com/resource-manager/reference/rest/v1/projects/get#authorization-scopes
+
+        Args:
+            request (google.auth.transport.Request): A callable used to make
+                HTTP requests.
+        Returns:
+            Optional[str]: The project ID corresponding to the workload identity pool
+                if determinable.
+        """
+        if self._project_id:
+            # If already retrieved, return the cached project ID value.
+            return self._project_id
+        scopes = self._scopes if self._scopes is not None else self._default_scopes
+        # Scopes are required in order to retrieve a valid access token.
+        if self.project_number and scopes:
+            headers = {}
+            url = _CLOUD_RESOURCE_MANAGER + self.project_number
+            self.before_request(request, "GET", url, headers)
+            response = request(url=url, method="GET", headers=headers)
+
+            response_body = (
+                response.data.decode("utf-8")
+                if hasattr(response.data, "decode")
+                else response.data
+            )
+            response_data = json.loads(response_body)
+
+            if response.status == 200:
+                # Cache result as this field is immutable.
+                self._project_id = response_data.get("projectId")
+                return self._project_id
+
+        return None
+
+    @_helpers.copy_docstring(credentials.Credentials)
+    def refresh(self, request):
+        scopes = self._scopes if self._scopes is not None else self._default_scopes
+        if self._impersonated_credentials:
+            self._impersonated_credentials.refresh(request)
+            self.token = self._impersonated_credentials.token
+            self.expiry = self._impersonated_credentials.expiry
+        else:
+            now = _helpers.utcnow()
+            response_data = self._sts_client.exchange_token(
+                request=request,
+                grant_type=_STS_GRANT_TYPE,
+                subject_token=self.retrieve_subject_token(request),
+                subject_token_type=self._subject_token_type,
+                audience=self._audience,
+                scopes=scopes,
+                requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
+            )
+            self.token = response_data.get("access_token")
+            lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
+            self.expiry = now + lifetime
+
+    @_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
+    def with_quota_project(self, quota_project_id):
+        # Return copy of instance with the provided quota project ID.
+        return self.__class__(
+            audience=self._audience,
+            subject_token_type=self._subject_token_type,
+            token_url=self._token_url,
+            credential_source=self._credential_source,
+            service_account_impersonation_url=self._service_account_impersonation_url,
+            client_id=self._client_id,
+            client_secret=self._client_secret,
+            quota_project_id=quota_project_id,
+            scopes=self._scopes,
+            default_scopes=self._default_scopes,
+        )
+
+    def _initialize_impersonated_credentials(self):
+        """Generates an impersonated credentials.
+
+        For more details, see `projects.serviceAccounts.generateAccessToken`_.
+
+        .. _projects.serviceAccounts.generateAccessToken: https://cloud.google.com/iam/docs/reference/credentials/rest/v1/projects.serviceAccounts/generateAccessToken
+
+        Returns:
+            impersonated_credentials.Credential: The impersonated credentials
+                object.
+
+        Raises:
+            google.auth.exceptions.RefreshError: If the generateAccessToken
+                endpoint returned an error.
+        """
+        # Return copy of instance with no service account impersonation.
+        source_credentials = self.__class__(
+            audience=self._audience,
+            subject_token_type=self._subject_token_type,
+            token_url=self._token_url,
+            credential_source=self._credential_source,
+            service_account_impersonation_url=None,
+            client_id=self._client_id,
+            client_secret=self._client_secret,
+            quota_project_id=self._quota_project_id,
+            scopes=self._scopes,
+            default_scopes=self._default_scopes,
+        )
+
+        # Determine target_principal.
+        start_index = self._service_account_impersonation_url.rfind("/")
+        end_index = self._service_account_impersonation_url.find(":generateAccessToken")
+        if start_index != -1 and end_index != -1 and start_index < end_index:
+            start_index = start_index + 1
+            target_principal = self._service_account_impersonation_url[
+                start_index:end_index
+            ]
+        else:
+            raise exceptions.RefreshError(
+                "Unable to determine target principal from service account impersonation URL."
+            )
+
+        scopes = self._scopes if self._scopes is not None else self._default_scopes
+        # Initialize and return impersonated credentials.
+        return impersonated_credentials.Credentials(
+            source_credentials=source_credentials,
+            target_principal=target_principal,
+            target_scopes=scopes,
+            quota_project_id=self._quota_project_id,
+            iam_endpoint_override=self._service_account_impersonation_url,
+        )
diff --git a/google/auth/identity_pool.py b/google/auth/identity_pool.py
new file mode 100644
index 0000000..5362199
--- /dev/null
+++ b/google/auth/identity_pool.py
@@ -0,0 +1,279 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Identity Pool Credentials.
+
+This module provides credentials to access Google Cloud resources from on-prem
+or non-Google Cloud platforms which support external credentials (e.g. OIDC ID
+tokens) retrieved from local file locations or local servers. This includes
+Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with
+Hub with Hub workload identity enabled).
+
+These credentials are recommended over the use of service account credentials
+in on-prem/non-Google Cloud platforms as they do not involve the management of
+long-live service account private keys.
+
+Identity Pool Credentials are initialized using external_account
+arguments which are typically loaded from an external credentials file or
+an external credentials URL. Unlike other Credentials that can be initialized
+with a list of explicit arguments, secrets or credentials, external account
+clients use the environment and hints/guidelines provided by the
+external_account JSON file to retrieve credentials and exchange them for Google
+access tokens.
+"""
+
+try:
+    from collections.abc import Mapping
+# Python 2.7 compatibility
+except ImportError:  # pragma: NO COVER
+    from collections import Mapping
+import io
+import json
+import os
+
+from google.auth import _helpers
+from google.auth import exceptions
+from google.auth import external_account
+
+
+class Credentials(external_account.Credentials):
+    """External account credentials sourced from files and URLs."""
+
+    def __init__(
+        self,
+        audience,
+        subject_token_type,
+        token_url,
+        credential_source,
+        service_account_impersonation_url=None,
+        client_id=None,
+        client_secret=None,
+        quota_project_id=None,
+        scopes=None,
+        default_scopes=None,
+    ):
+        """Instantiates an external account credentials object from a file/URL.
+
+        Args:
+            audience (str): The STS audience field.
+            subject_token_type (str): The subject token type.
+            token_url (str): The STS endpoint URL.
+            credential_source (Mapping): The credential source dictionary used to
+                provide instructions on how to retrieve external credential to be
+                exchanged for Google access tokens.
+
+                Example credential_source for url-sourced credential::
+
+                    {
+                        "url": "http://www.example.com",
+                        "format": {
+                            "type": "json",
+                            "subject_token_field_name": "access_token",
+                        },
+                        "headers": {"foo": "bar"},
+                    }
+
+                Example credential_source for file-sourced credential::
+
+                    {
+                        "file": "/path/to/token/file.txt"
+                    }
+
+            service_account_impersonation_url (Optional[str]): The optional service account
+                impersonation getAccessToken URL.
+            client_id (Optional[str]): The optional client ID.
+            client_secret (Optional[str]): The optional client secret.
+            quota_project_id (Optional[str]): The optional quota project ID.
+            scopes (Optional[Sequence[str]]): Optional scopes to request during the
+                authorization grant.
+            default_scopes (Optional[Sequence[str]]): Default scopes passed by a
+                Google client library. Use 'scopes' for user-defined scopes.
+
+        Raises:
+            google.auth.exceptions.RefreshError: If an error is encountered during
+                access token retrieval logic.
+            ValueError: For invalid parameters.
+
+        .. note:: Typically one of the helper constructors
+            :meth:`from_file` or
+            :meth:`from_info` are used instead of calling the constructor directly.
+        """
+
+        super(Credentials, self).__init__(
+            audience=audience,
+            subject_token_type=subject_token_type,
+            token_url=token_url,
+            credential_source=credential_source,
+            service_account_impersonation_url=service_account_impersonation_url,
+            client_id=client_id,
+            client_secret=client_secret,
+            quota_project_id=quota_project_id,
+            scopes=scopes,
+            default_scopes=default_scopes,
+        )
+        if not isinstance(credential_source, Mapping):
+            self._credential_source_file = None
+            self._credential_source_url = None
+        else:
+            self._credential_source_file = credential_source.get("file")
+            self._credential_source_url = credential_source.get("url")
+            self._credential_source_headers = credential_source.get("headers")
+            credential_source_format = credential_source.get("format", {})
+            # Get credential_source format type. When not provided, this
+            # defaults to text.
+            self._credential_source_format_type = (
+                credential_source_format.get("type") or "text"
+            )
+            # environment_id is only supported in AWS or dedicated future external
+            # account credentials.
+            if "environment_id" in credential_source:
+                raise ValueError(
+                    "Invalid Identity Pool credential_source field 'environment_id'"
+                )
+            if self._credential_source_format_type not in ["text", "json"]:
+                raise ValueError(
+                    "Invalid credential_source format '{}'".format(
+                        self._credential_source_format_type
+                    )
+                )
+            # For JSON types, get the required subject_token field name.
+            if self._credential_source_format_type == "json":
+                self._credential_source_field_name = credential_source_format.get(
+                    "subject_token_field_name"
+                )
+                if self._credential_source_field_name is None:
+                    raise ValueError(
+                        "Missing subject_token_field_name for JSON credential_source format"
+                    )
+            else:
+                self._credential_source_field_name = None
+
+        if self._credential_source_file and self._credential_source_url:
+            raise ValueError(
+                "Ambiguous credential_source. 'file' is mutually exclusive with 'url'."
+            )
+        if not self._credential_source_file and not self._credential_source_url:
+            raise ValueError(
+                "Missing credential_source. A 'file' or 'url' must be provided."
+            )
+
+    @_helpers.copy_docstring(external_account.Credentials)
+    def retrieve_subject_token(self, request):
+        return self._parse_token_data(
+            self._get_token_data(request),
+            self._credential_source_format_type,
+            self._credential_source_field_name,
+        )
+
+    def _get_token_data(self, request):
+        if self._credential_source_file:
+            return self._get_file_data(self._credential_source_file)
+        else:
+            return self._get_url_data(
+                request, self._credential_source_url, self._credential_source_headers
+            )
+
+    def _get_file_data(self, filename):
+        if not os.path.exists(filename):
+            raise exceptions.RefreshError("File '{}' was not found.".format(filename))
+
+        with io.open(filename, "r", encoding="utf-8") as file_obj:
+            return file_obj.read(), filename
+
+    def _get_url_data(self, request, url, headers):
+        response = request(url=url, method="GET", headers=headers)
+
+        # support both string and bytes type response.data
+        response_body = (
+            response.data.decode("utf-8")
+            if hasattr(response.data, "decode")
+            else response.data
+        )
+
+        if response.status != 200:
+            raise exceptions.RefreshError(
+                "Unable to retrieve Identity Pool subject token", response_body
+            )
+
+        return response_body, url
+
+    def _parse_token_data(
+        self, token_content, format_type="text", subject_token_field_name=None
+    ):
+        content, filename = token_content
+        if format_type == "text":
+            token = content
+        else:
+            try:
+                # Parse file content as JSON.
+                response_data = json.loads(content)
+                # Get the subject_token.
+                token = response_data[subject_token_field_name]
+            except (KeyError, ValueError):
+                raise exceptions.RefreshError(
+                    "Unable to parse subject_token from JSON file '{}' using key '{}'".format(
+                        filename, subject_token_field_name
+                    )
+                )
+        if not token:
+            raise exceptions.RefreshError(
+                "Missing subject_token in the credential_source file"
+            )
+        return token
+
+    @classmethod
+    def from_info(cls, info, **kwargs):
+        """Creates an Identity Pool Credentials instance from parsed external account info.
+
+        Args:
+            info (Mapping[str, str]): The Identity Pool external account info in Google
+                format.
+            kwargs: Additional arguments to pass to the constructor.
+
+        Returns:
+            google.auth.identity_pool.Credentials: The constructed
+                credentials.
+
+        Raises:
+            ValueError: For invalid parameters.
+        """
+        return cls(
+            audience=info.get("audience"),
+            subject_token_type=info.get("subject_token_type"),
+            token_url=info.get("token_url"),
+            service_account_impersonation_url=info.get(
+                "service_account_impersonation_url"
+            ),
+            client_id=info.get("client_id"),
+            client_secret=info.get("client_secret"),
+            credential_source=info.get("credential_source"),
+            quota_project_id=info.get("quota_project_id"),
+            **kwargs
+        )
+
+    @classmethod
+    def from_file(cls, filename, **kwargs):
+        """Creates an IdentityPool Credentials instance from an external account json file.
+
+        Args:
+            filename (str): The path to the IdentityPool external account json file.
+            kwargs: Additional arguments to pass to the constructor.
+
+        Returns:
+            google.auth.identity_pool.Credentials: The constructed
+                credentials.
+        """
+        with io.open(filename, "r", encoding="utf-8") as json_file:
+            data = json.load(json_file)
+            return cls.from_info(data, **kwargs)
diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py
index 4d15837..b8a6c49 100644
--- a/google/auth/impersonated_credentials.py
+++ b/google/auth/impersonated_credentials.py
@@ -65,7 +65,9 @@
 _DEFAULT_TOKEN_URI = "https://oauth2.googleapis.com/token"
 
 
-def _make_iam_token_request(request, principal, headers, body):
+def _make_iam_token_request(
+    request, principal, headers, body, iam_endpoint_override=None
+):
     """Makes a request to the Google Cloud IAM service for an access token.
     Args:
         request (Request): The Request object to use.
@@ -73,6 +75,9 @@
         headers (Mapping[str, str]): Map of headers to transmit.
         body (Mapping[str, str]): JSON Payload body for the iamcredentials
             API call.
+        iam_endpoint_override (Optiona[str]): The full IAM endpoint override
+            with the target_principal embedded. This is useful when supporting
+            impersonation with regional endpoints.
 
     Raises:
         google.auth.exceptions.TransportError: Raised if there is an underlying
@@ -82,7 +87,7 @@
             `iamcredentials.googleapis.com` is not enabled or the
             `Service Account Token Creator` is not assigned
     """
-    iam_endpoint = _IAM_ENDPOINT.format(principal)
+    iam_endpoint = iam_endpoint_override or _IAM_ENDPOINT.format(principal)
 
     body = json.dumps(body).encode("utf-8")
 
@@ -185,6 +190,7 @@
         delegates=None,
         lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
         quota_project_id=None,
+        iam_endpoint_override=None,
     ):
         """
         Args:
@@ -209,6 +215,9 @@
             quota_project_id (Optional[str]): The project ID used for quota and billing.
                 This project may be different from the project used to
                 create the credentials.
+            iam_endpoint_override (Optiona[str]): The full IAM endpoint override
+                with the target_principal embedded. This is useful when supporting
+                impersonation with regional endpoints.
         """
 
         super(Credentials, self).__init__()
@@ -226,6 +235,7 @@
         self.token = None
         self.expiry = _helpers.utcnow()
         self._quota_project_id = quota_project_id
+        self._iam_endpoint_override = iam_endpoint_override
 
     @_helpers.copy_docstring(credentials.Credentials)
     def refresh(self, request):
@@ -260,6 +270,7 @@
             principal=self._target_principal,
             headers=headers,
             body=body,
+            iam_endpoint_override=self._iam_endpoint_override,
         )
 
     def sign_bytes(self, message):
@@ -302,6 +313,7 @@
             delegates=self._delegates,
             lifetime=self._lifetime,
             quota_project_id=quota_project_id,
+            iam_endpoint_override=self._iam_endpoint_override,
         )