fix: add fetch_id_token_credentials (#866)

diff --git a/google/oauth2/id_token.py b/google/oauth2/id_token.py
index 20d3ac1..74899ae 100644
--- a/google/oauth2/id_token.py
+++ b/google/oauth2/id_token.py
@@ -64,6 +64,7 @@
 from google.auth import environment_vars
 from google.auth import exceptions
 from google.auth import jwt
+import google.auth.transport.requests
 
 
 # The URL that provides public certificates for verifying ID tokens issued
@@ -201,6 +202,101 @@
     )
 
 
+def fetch_id_token_credentials(audience, request=None):
+    """Create the ID Token credentials from the current environment.
+
+    This function acquires ID token from the environment in the following order.
+    See https://google.aip.dev/auth/4110.
+
+    1. If the environment variable ``GOOGLE_APPLICATION_CREDENTIALS`` is set
+       to the path of a valid service account JSON file, then ID token is
+       acquired using this service account credentials.
+    2. If the application is running in Compute Engine, App Engine or Cloud Run,
+       then the ID token are obtained from the metadata server.
+    3. If metadata server doesn't exist and no valid service account credentials
+       are found, :class:`~google.auth.exceptions.DefaultCredentialsError` will
+       be raised.
+
+    Example::
+
+        import google.oauth2.id_token
+        import google.auth.transport.requests
+
+        request = google.auth.transport.requests.Request()
+        target_audience = "https://pubsub.googleapis.com"
+
+        # Create ID token credentials.
+        credentials = google.oauth2.id_token.fetch_id_token_credentials(target_audience, request=request)
+
+        # Refresh the credential to obtain an ID token.
+        credentials.refresh(request)
+
+        id_token = credentials.token
+        id_token_expiry = credentials.expiry
+
+    Args:
+        audience (str): The audience that this ID token is intended for.
+        request (Optional[google.auth.transport.Request]): A callable used to make
+            HTTP requests. A request object will be created if not provided.
+
+    Returns:
+        google.auth.credentials.Credentials: The ID token credentials.
+
+    Raises:
+        ~google.auth.exceptions.DefaultCredentialsError:
+            If metadata server doesn't exist and no valid service account
+            credentials are found.
+    """
+    # 1. Try to get credentials from the GOOGLE_APPLICATION_CREDENTIALS environment
+    # variable.
+    credentials_filename = os.environ.get(environment_vars.CREDENTIALS)
+    if credentials_filename:
+        if not (
+            os.path.exists(credentials_filename)
+            and os.path.isfile(credentials_filename)
+        ):
+            raise exceptions.DefaultCredentialsError(
+                "GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid."
+            )
+
+        try:
+            with open(credentials_filename, "r") as f:
+                from google.oauth2 import service_account
+
+                info = json.load(f)
+                if info.get("type") == "service_account":
+                    return service_account.IDTokenCredentials.from_service_account_info(
+                        info, target_audience=audience
+                    )
+        except ValueError as caught_exc:
+            new_exc = exceptions.DefaultCredentialsError(
+                "GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials.",
+                caught_exc,
+            )
+            six.raise_from(new_exc, caught_exc)
+
+    # 2. Try to fetch ID token from metada server if it exists. The code
+    # works for GAE and Cloud Run metadata server as well.
+    try:
+        from google.auth import compute_engine
+        from google.auth.compute_engine import _metadata
+
+        # Create a request object if not provided.
+        if not request:
+            request = google.auth.transport.requests.Request()
+
+        if _metadata.ping(request):
+            return compute_engine.IDTokenCredentials(
+                request, audience, use_metadata_identity_endpoint=True
+            )
+    except (ImportError, exceptions.TransportError):
+        pass
+
+    raise exceptions.DefaultCredentialsError(
+        "Neither metadata server or valid service account credentials are found."
+    )
+
+
 def fetch_id_token(request, audience):
     """Fetch the ID Token from the current environment.
 
@@ -239,51 +335,6 @@
             If metadata server doesn't exist and no valid service account
             credentials are found.
     """
-    # 1. Try to get credentials from the GOOGLE_APPLICATION_CREDENTIALS environment
-    # variable.
-    credentials_filename = os.environ.get(environment_vars.CREDENTIALS)
-    if credentials_filename:
-        if not (
-            os.path.exists(credentials_filename)
-            and os.path.isfile(credentials_filename)
-        ):
-            raise exceptions.DefaultCredentialsError(
-                "GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid."
-            )
-
-        try:
-            with open(credentials_filename, "r") as f:
-                from google.oauth2 import service_account
-
-                info = json.load(f)
-                if info.get("type") == "service_account":
-                    credentials = service_account.IDTokenCredentials.from_service_account_info(
-                        info, target_audience=audience
-                    )
-                    credentials.refresh(request)
-                    return credentials.token
-        except ValueError as caught_exc:
-            new_exc = exceptions.DefaultCredentialsError(
-                "GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials.",
-                caught_exc,
-            )
-            six.raise_from(new_exc, caught_exc)
-
-    # 2. Try to fetch ID token from metada server if it exists. The code
-    # works for GAE and Cloud Run metadata server as well.
-    try:
-        from google.auth import compute_engine
-        from google.auth.compute_engine import _metadata
-
-        if _metadata.ping(request):
-            credentials = compute_engine.IDTokenCredentials(
-                request, audience, use_metadata_identity_endpoint=True
-            )
-            credentials.refresh(request)
-            return credentials.token
-    except (ImportError, exceptions.TransportError):
-        pass
-
-    raise exceptions.DefaultCredentialsError(
-        "Neither metadata server or valid service account credentials are found."
-    )
+    id_token_credentials = fetch_id_token_credentials(audience, request=request)
+    id_token_credentials.refresh(request)
+    return id_token_credentials.token
diff --git a/tests/oauth2/test_id_token.py b/tests/oauth2/test_id_token.py
index a612c58..ccfaaaf 100644
--- a/tests/oauth2/test_id_token.py
+++ b/tests/oauth2/test_id_token.py
@@ -21,13 +21,13 @@
 from google.auth import environment_vars
 from google.auth import exceptions
 from google.auth import transport
-import google.auth.compute_engine._metadata
 from google.oauth2 import id_token
 from google.oauth2 import service_account
 
 SERVICE_ACCOUNT_FILE = os.path.join(
     os.path.dirname(__file__), "../data/service_account.json"
 )
+ID_TOKEN_AUDIENCE = "https://pubsub.googleapis.com"
 
 
 def make_request(status, data=None):
@@ -201,37 +201,45 @@
     )
 
 
-def test_fetch_id_token_from_metadata_server(monkeypatch):
+def test_fetch_id_token_credentials_optional_request(monkeypatch):
     monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)
 
-    def mock_init(self, request, audience, use_metadata_identity_endpoint):
-        assert use_metadata_identity_endpoint
-        self.token = "id_token"
+    # Test a request object is created if not provided
+    with mock.patch("google.auth.compute_engine._metadata.ping", return_value=True):
+        with mock.patch(
+            "google.auth.compute_engine.IDTokenCredentials.__init__", return_value=None
+        ):
+            with mock.patch(
+                "google.auth.transport.requests.Request.__init__", return_value=None
+            ) as mock_request:
+                id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
+            mock_request.assert_called()
+
+
+def test_fetch_id_token_credentials_from_metadata_server(monkeypatch):
+    monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)
+
+    mock_req = mock.Mock()
 
     with mock.patch("google.auth.compute_engine._metadata.ping", return_value=True):
-        with mock.patch.multiple(
-            google.auth.compute_engine.IDTokenCredentials,
-            __init__=mock_init,
-            refresh=mock.Mock(),
-        ):
-            request = mock.Mock()
-            token = id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
-            assert token == "id_token"
+        with mock.patch(
+            "google.auth.compute_engine.IDTokenCredentials.__init__", return_value=None
+        ) as mock_init:
+            id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE, request=mock_req)
+        mock_init.assert_called_once_with(
+            mock_req, ID_TOKEN_AUDIENCE, use_metadata_identity_endpoint=True
+        )
 
 
-def test_fetch_id_token_from_explicit_cred_json_file(monkeypatch):
+def test_fetch_id_token_credentials_from_explicit_cred_json_file(monkeypatch):
     monkeypatch.setenv(environment_vars.CREDENTIALS, SERVICE_ACCOUNT_FILE)
 
-    def mock_refresh(self, request):
-        self.token = "id_token"
-
-    with mock.patch.object(service_account.IDTokenCredentials, "refresh", mock_refresh):
-        request = mock.Mock()
-        token = id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
-        assert token == "id_token"
+    cred = id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
+    assert isinstance(cred, service_account.IDTokenCredentials)
+    assert cred._target_audience == ID_TOKEN_AUDIENCE
 
 
-def test_fetch_id_token_no_cred_exists(monkeypatch):
+def test_fetch_id_token_credentials_no_cred_exists(monkeypatch):
     monkeypatch.delenv(environment_vars.CREDENTIALS, raising=False)
 
     with mock.patch(
@@ -239,22 +247,20 @@
         side_effect=exceptions.TransportError(),
     ):
         with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
-            request = mock.Mock()
-            id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+            id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
         assert excinfo.match(
             r"Neither metadata server or valid service account credentials are found."
         )
 
     with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False):
         with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
-            request = mock.Mock()
-            id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+            id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
         assert excinfo.match(
             r"Neither metadata server or valid service account credentials are found."
         )
 
 
-def test_fetch_id_token_invalid_cred_file_type(monkeypatch):
+def test_fetch_id_token_credentials_invalid_cred_file_type(monkeypatch):
     user_credentials_file = os.path.join(
         os.path.dirname(__file__), "../data/authorized_user.json"
     )
@@ -262,32 +268,44 @@
 
     with mock.patch("google.auth.compute_engine._metadata.ping", return_value=False):
         with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
-            request = mock.Mock()
-            id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+            id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
         assert excinfo.match(
             r"Neither metadata server or valid service account credentials are found."
         )
 
 
-def test_fetch_id_token_invalid_json(monkeypatch):
+def test_fetch_id_token_credentials_invalid_json(monkeypatch):
     not_json_file = os.path.join(os.path.dirname(__file__), "../data/public_cert.pem")
     monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)
 
     with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
-        request = mock.Mock()
-        id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+        id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
     assert excinfo.match(
         r"GOOGLE_APPLICATION_CREDENTIALS is not valid service account credentials."
     )
 
 
-def test_fetch_id_token_invalid_cred_path(monkeypatch):
+def test_fetch_id_token_credentials_invalid_cred_path(monkeypatch):
     not_json_file = os.path.join(os.path.dirname(__file__), "../data/not_exists.json")
     monkeypatch.setenv(environment_vars.CREDENTIALS, not_json_file)
 
     with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
-        request = mock.Mock()
-        id_token.fetch_id_token(request, "https://pubsub.googleapis.com")
+        id_token.fetch_id_token_credentials(ID_TOKEN_AUDIENCE)
     assert excinfo.match(
         r"GOOGLE_APPLICATION_CREDENTIALS path is either not found or invalid."
     )
+
+
+def test_fetch_id_token(monkeypatch):
+    mock_cred = mock.MagicMock()
+    mock_cred.token = "token"
+
+    mock_req = mock.Mock()
+
+    with mock.patch(
+        "google.oauth2.id_token.fetch_id_token_credentials", return_value=mock_cred
+    ) as mock_fetch:
+        token = id_token.fetch_id_token(mock_req, ID_TOKEN_AUDIENCE)
+    mock_fetch.assert_called_once_with(ID_TOKEN_AUDIENCE, request=mock_req)
+    mock_cred.refresh.assert_called_once_with(mock_req)
+    assert token == "token"