feat: add support for workforce pool credentials (#868)

Workforce pools (external account credentials for non-Google users) are
organization-level resources which means that issued workforce pool tokens
will not have any client project ID on token exchange as currently designed.

"To use a Google API, the client must identify the application to the server.
If the API requires authentication, the client must also identify the principal
running the application."

The application here is the client project. The token will identify the user
principal but not the application. This will result in APIs rejecting requests
authenticated with these tokens.

Note that passing a `x-goog-user-project` override header on API request is
still not sufficient. The token is still expected to have a client project.

As a result, we have extended the spec to support an additional
`workforce_pool_user_project` for these credentials (workforce pools) which will
be passed when exchanging an external token for a Google Access token. After the
exchange, the issued access token will use the supplied project as the client
project. The underlying principal must still have `serviceusage.services.use`
IAM permission to use the project for billing/quota.

This field is not needed for flows with basic client authentication (e.g. client
ID is supplied). The client ID is sufficient to determine the client project and
any additionally supplied `workforce_pool_user_project` value will be ignored.

Note that this feature is not usable yet publicly.

The additional field has been added to the abstract external account credentials
`google.auth.external_account.Credentials` and the subclass
`google.auth.identity_pool.Credentials`.
diff --git a/tests/test_external_account.py b/tests/test_external_account.py
index df6174f..97f1564 100644
--- a/tests/test_external_account.py
+++ b/tests/test_external_account.py
@@ -37,6 +37,33 @@
     "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
     "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
 ]
+# Workload identity pool audiences or invalid workforce pool audiences.
+TEST_NON_USER_AUDIENCES = [
+    # Legacy K8s audience format.
+    "identitynamespace:1f12345:my_provider",
+    (
+        "//iam.googleapis.com/projects/123456/locations/"
+        "global/workloadIdentityPools/pool-id/providers/"
+        "provider-id"
+    ),
+    (
+        "//iam.googleapis.com/projects/123456/locations/"
+        "eu/workloadIdentityPools/pool-id/providers/"
+        "provider-id"
+    ),
+    # Pool ID with workforcePools string.
+    (
+        "//iam.googleapis.com/projects/123456/locations/"
+        "global/workloadIdentityPools/workforcePools/providers/"
+        "provider-id"
+    ),
+    # Unrealistic / incorrect workforce pool audiences.
+    "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
+    "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
+    "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
+    "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
+    "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
+]
 
 
 class CredentialsImpl(external_account.Credentials):
@@ -52,6 +79,7 @@
         quota_project_id=None,
         scopes=None,
         default_scopes=None,
+        workforce_pool_user_project=None,
     ):
         super(CredentialsImpl, self).__init__(
             audience=audience,
@@ -64,6 +92,7 @@
             quota_project_id=quota_project_id,
             scopes=scopes,
             default_scopes=default_scopes,
+            workforce_pool_user_project=workforce_pool_user_project,
         )
         self._counter = 0
 
@@ -83,7 +112,12 @@
         "/locations/global/workloadIdentityPools/{}"
         "/providers/{}"
     ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID)
+    WORKFORCE_AUDIENCE = (
+        "//iam.googleapis.com/locations/global/workforcePools/{}/providers/{}"
+    ).format(POOL_ID, PROVIDER_ID)
+    WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
     SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
+    WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
     CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"}
     SUCCESS_RESPONSE = {
         "access_token": "ACCESS_TOKEN",
@@ -147,6 +181,31 @@
         )
 
     @classmethod
+    def make_workforce_pool_credentials(
+        cls,
+        client_id=None,
+        client_secret=None,
+        quota_project_id=None,
+        scopes=None,
+        default_scopes=None,
+        service_account_impersonation_url=None,
+        workforce_pool_user_project=None,
+    ):
+        return CredentialsImpl(
+            audience=cls.WORKFORCE_AUDIENCE,
+            subject_token_type=cls.WORKFORCE_SUBJECT_TOKEN_TYPE,
+            token_url=cls.TOKEN_URL,
+            service_account_impersonation_url=service_account_impersonation_url,
+            credential_source=cls.CREDENTIAL_SOURCE,
+            client_id=client_id,
+            client_secret=client_secret,
+            quota_project_id=quota_project_id,
+            scopes=scopes,
+            default_scopes=default_scopes,
+            workforce_pool_user_project=workforce_pool_user_project,
+        )
+
+    @classmethod
     def make_mock_request(
         cls,
         status=http.client.OK,
@@ -230,6 +289,21 @@
         assert credentials.requires_scopes
         assert not credentials.quota_project_id
 
+    def test_nonworkforce_with_workforce_pool_user_project(self):
+        with pytest.raises(ValueError) as excinfo:
+            CredentialsImpl(
+                audience=self.AUDIENCE,
+                subject_token_type=self.SUBJECT_TOKEN_TYPE,
+                token_url=self.TOKEN_URL,
+                credential_source=self.CREDENTIAL_SOURCE,
+                workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
+            )
+
+        assert excinfo.match(
+            "workforce_pool_user_project should not be set for non-workforce "
+            "pool credentials"
+        )
+
     def test_with_scopes(self):
         credentials = self.make_credentials()
 
@@ -241,6 +315,23 @@
         assert scoped_credentials.has_scopes(["email"])
         assert not scoped_credentials.requires_scopes
 
+    def test_with_scopes_workforce_pool(self):
+        credentials = self.make_workforce_pool_credentials(
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
+        )
+
+        assert not credentials.scopes
+        assert credentials.requires_scopes
+
+        scoped_credentials = credentials.with_scopes(["email"])
+
+        assert scoped_credentials.has_scopes(["email"])
+        assert not scoped_credentials.requires_scopes
+        assert (
+            scoped_credentials.info.get("workforce_pool_user_project")
+            == self.WORKFORCE_POOL_USER_PROJECT
+        )
+
     def test_with_scopes_using_user_and_default_scopes(self):
         credentials = self.make_credentials()
 
@@ -296,6 +387,7 @@
             quota_project_id=self.QUOTA_PROJECT_ID,
             scopes=["email"],
             default_scopes=["default2"],
+            workforce_pool_user_project=None,
         )
 
     def test_with_quota_project(self):
@@ -308,6 +400,22 @@
 
         assert quota_project_creds.quota_project_id == "project-foo"
 
+    def test_with_quota_project_workforce_pool(self):
+        credentials = self.make_workforce_pool_credentials(
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
+        )
+
+        assert not credentials.scopes
+        assert not credentials.quota_project_id
+
+        quota_project_creds = credentials.with_quota_project("project-foo")
+
+        assert quota_project_creds.quota_project_id == "project-foo"
+        assert (
+            quota_project_creds.info.get("workforce_pool_user_project")
+            == self.WORKFORCE_POOL_USER_PROJECT
+        )
+
     def test_with_quota_project_full_options_propagated(self):
         credentials = self.make_credentials(
             client_id=CLIENT_ID,
@@ -336,6 +444,7 @@
             quota_project_id="project-foo",
             scopes=self.SCOPES,
             default_scopes=["default1"],
+            workforce_pool_user_project=None,
         )
 
     def test_with_invalid_impersonation_target_principal(self):
@@ -359,6 +468,20 @@
             "credential_source": self.CREDENTIAL_SOURCE.copy(),
         }
 
+    def test_info_workforce_pool(self):
+        credentials = self.make_workforce_pool_credentials(
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
+        )
+
+        assert credentials.info == {
+            "type": "external_account",
+            "audience": self.WORKFORCE_AUDIENCE,
+            "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
+            "token_url": self.TOKEN_URL,
+            "credential_source": self.CREDENTIAL_SOURCE.copy(),
+            "workforce_pool_user_project": self.WORKFORCE_POOL_USER_PROJECT,
+        }
+
     def test_info_with_full_options(self):
         credentials = self.make_credentials(
             client_id=CLIENT_ID,
@@ -391,36 +514,7 @@
 
         assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
 
-    @pytest.mark.parametrize(
-        "audience",
-        # Workload identity pool audiences or invalid workforce pool audiences.
-        [
-            # Legacy K8s audience format.
-            "identitynamespace:1f12345:my_provider",
-            (
-                "//iam.googleapis.com/projects/123456/locations/"
-                "global/workloadIdentityPools/pool-id/providers/"
-                "provider-id"
-            ),
-            (
-                "//iam.googleapis.com/projects/123456/locations/"
-                "eu/workloadIdentityPools/pool-id/providers/"
-                "provider-id"
-            ),
-            # Pool ID with workforcePools string.
-            (
-                "//iam.googleapis.com/projects/123456/locations/"
-                "global/workloadIdentityPools/workforcePools/providers/"
-                "provider-id"
-            ),
-            # Unrealistic / incorrect workforce pool audiences.
-            "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
-            "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
-            "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
-            "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
-            "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
-        ],
-    )
+    @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
     def test_is_user_with_non_users(self, audience):
         credentials = CredentialsImpl(
             audience=audience,
@@ -458,6 +552,43 @@
         # not a user.
         assert credentials.is_user is False
 
+    @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES)
+    def test_is_workforce_pool_with_non_users(self, audience):
+        credentials = CredentialsImpl(
+            audience=audience,
+            subject_token_type=self.SUBJECT_TOKEN_TYPE,
+            token_url=self.TOKEN_URL,
+            credential_source=self.CREDENTIAL_SOURCE,
+        )
+
+        assert credentials.is_workforce_pool is False
+
+    @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
+    def test_is_workforce_pool_with_users(self, audience):
+        credentials = CredentialsImpl(
+            audience=audience,
+            subject_token_type=self.SUBJECT_TOKEN_TYPE,
+            token_url=self.TOKEN_URL,
+            credential_source=self.CREDENTIAL_SOURCE,
+        )
+
+        assert credentials.is_workforce_pool is True
+
+    @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
+    def test_is_workforce_pool_with_users_and_impersonation(self, audience):
+        # Initialize the credentials with workforce audience and service account
+        # impersonation.
+        credentials = CredentialsImpl(
+            audience=audience,
+            subject_token_type=self.SUBJECT_TOKEN_TYPE,
+            token_url=self.TOKEN_URL,
+            credential_source=self.CREDENTIAL_SOURCE,
+            service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
+        )
+
+        # Even though impersonation is used, is_workforce_pool should still return True.
+        assert credentials.is_workforce_pool is True
+
     @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
     def test_refresh_without_client_auth_success(self, unused_utcnow):
         response = self.SUCCESS_RESPONSE.copy()
@@ -485,6 +616,110 @@
         assert not credentials.expired
         assert credentials.token == response["access_token"]
 
+    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+    def test_refresh_workforce_without_client_auth_success(self, unused_utcnow):
+        response = self.SUCCESS_RESPONSE.copy()
+        # Test custom expiration to confirm expiry is set correctly.
+        response["expires_in"] = 2800
+        expected_expiry = datetime.datetime.min + datetime.timedelta(
+            seconds=response["expires_in"]
+        )
+        headers = {"Content-Type": "application/x-www-form-urlencoded"}
+        request_data = {
+            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+            "audience": self.WORKFORCE_AUDIENCE,
+            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+            "subject_token": "subject_token_0",
+            "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
+            "options": urllib.parse.quote(
+                json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
+            ),
+        }
+        request = self.make_mock_request(status=http.client.OK, data=response)
+        credentials = self.make_workforce_pool_credentials(
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
+        )
+
+        credentials.refresh(request)
+
+        self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
+        assert credentials.valid
+        assert credentials.expiry == expected_expiry
+        assert not credentials.expired
+        assert credentials.token == response["access_token"]
+
+    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+    def test_refresh_workforce_with_client_auth_success(self, unused_utcnow):
+        response = self.SUCCESS_RESPONSE.copy()
+        # Test custom expiration to confirm expiry is set correctly.
+        response["expires_in"] = 2800
+        expected_expiry = datetime.datetime.min + datetime.timedelta(
+            seconds=response["expires_in"]
+        )
+        headers = {
+            "Content-Type": "application/x-www-form-urlencoded",
+            "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
+        }
+        request_data = {
+            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+            "audience": self.WORKFORCE_AUDIENCE,
+            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+            "subject_token": "subject_token_0",
+            "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
+        }
+        request = self.make_mock_request(status=http.client.OK, data=response)
+        # Client Auth will have higher priority over workforce_pool_user_project.
+        credentials = self.make_workforce_pool_credentials(
+            client_id=CLIENT_ID,
+            client_secret=CLIENT_SECRET,
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
+        )
+
+        credentials.refresh(request)
+
+        self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
+        assert credentials.valid
+        assert credentials.expiry == expected_expiry
+        assert not credentials.expired
+        assert credentials.token == response["access_token"]
+
+    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+    def test_refresh_workforce_with_client_auth_and_no_workforce_project_success(
+        self, unused_utcnow
+    ):
+        response = self.SUCCESS_RESPONSE.copy()
+        # Test custom expiration to confirm expiry is set correctly.
+        response["expires_in"] = 2800
+        expected_expiry = datetime.datetime.min + datetime.timedelta(
+            seconds=response["expires_in"]
+        )
+        headers = {
+            "Content-Type": "application/x-www-form-urlencoded",
+            "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING),
+        }
+        request_data = {
+            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+            "audience": self.WORKFORCE_AUDIENCE,
+            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+            "subject_token": "subject_token_0",
+            "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
+        }
+        request = self.make_mock_request(status=http.client.OK, data=response)
+        # Client Auth will be sufficient for user project determination.
+        credentials = self.make_workforce_pool_credentials(
+            client_id=CLIENT_ID,
+            client_secret=CLIENT_SECRET,
+            workforce_pool_user_project=None,
+        )
+
+        credentials.refresh(request)
+
+        self.assert_token_request_kwargs(request.call_args[1], headers, request_data)
+        assert credentials.valid
+        assert credentials.expiry == expected_expiry
+        assert not credentials.expired
+        assert credentials.token == response["access_token"]
+
     def test_refresh_impersonation_without_client_auth_success(self):
         # Simulate service account access token expires in 2800 seconds.
         expire_time = (
@@ -549,6 +784,74 @@
         assert not credentials.expired
         assert credentials.token == impersonation_response["accessToken"]
 
+    def test_refresh_workforce_impersonation_without_client_auth_success(self):
+        # Simulate service account access token expires in 2800 seconds.
+        expire_time = (
+            _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800)
+        ).isoformat("T") + "Z"
+        expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ")
+        # STS token exchange request/response.
+        token_response = self.SUCCESS_RESPONSE.copy()
+        token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
+        token_request_data = {
+            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+            "audience": self.WORKFORCE_AUDIENCE,
+            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+            "subject_token": "subject_token_0",
+            "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
+            "scope": "https://www.googleapis.com/auth/iam",
+            "options": urllib.parse.quote(
+                json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
+            ),
+        }
+        # Service account impersonation request/response.
+        impersonation_response = {
+            "accessToken": "SA_ACCESS_TOKEN",
+            "expireTime": expire_time,
+        }
+        impersonation_headers = {
+            "Content-Type": "application/json",
+            "authorization": "Bearer {}".format(token_response["access_token"]),
+        }
+        impersonation_request_data = {
+            "delegates": None,
+            "scope": self.SCOPES,
+            "lifetime": "3600s",
+        }
+        # Initialize mock request to handle token exchange and service account
+        # impersonation request.
+        request = self.make_mock_request(
+            status=http.client.OK,
+            data=token_response,
+            impersonation_status=http.client.OK,
+            impersonation_data=impersonation_response,
+        )
+        # Initialize credentials with service account impersonation.
+        credentials = self.make_workforce_pool_credentials(
+            service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
+            scopes=self.SCOPES,
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
+        )
+
+        credentials.refresh(request)
+
+        # Only 2 requests should be processed.
+        assert len(request.call_args_list) == 2
+        # Verify token exchange request parameters.
+        self.assert_token_request_kwargs(
+            request.call_args_list[0][1], token_headers, token_request_data
+        )
+        # Verify service account impersonation request parameters.
+        self.assert_impersonation_request_kwargs(
+            request.call_args_list[1][1],
+            impersonation_headers,
+            impersonation_request_data,
+        )
+        assert credentials.valid
+        assert credentials.expiry == expected_expiry
+        assert not credentials.expired
+        assert credentials.token == impersonation_response["accessToken"]
+
     def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes(
         self,
     ):
@@ -822,6 +1125,22 @@
             "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
         }
 
+    def test_apply_workforce_without_quota_project_id(self):
+        headers = {}
+        request = self.make_mock_request(
+            status=http.client.OK, data=self.SUCCESS_RESPONSE
+        )
+        credentials = self.make_workforce_pool_credentials(
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
+        )
+
+        credentials.refresh(request)
+        credentials.apply(headers)
+
+        assert headers == {
+            "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"])
+        }
+
     def test_apply_impersonation_without_quota_project_id(self):
         expire_time = (
             _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
@@ -926,6 +1245,31 @@
             "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
         }
 
+    def test_before_request_workforce(self):
+        headers = {"other": "header-value"}
+        request = self.make_mock_request(
+            status=http.client.OK, data=self.SUCCESS_RESPONSE
+        )
+        credentials = self.make_workforce_pool_credentials(
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT
+        )
+
+        # First call should call refresh, setting the token.
+        credentials.before_request(request, "POST", "https://example.com/api", headers)
+
+        assert headers == {
+            "other": "header-value",
+            "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
+        }
+
+        # Second call shouldn't call refresh.
+        credentials.before_request(request, "POST", "https://example.com/api", headers)
+
+        assert headers == {
+            "other": "header-value",
+            "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]),
+        }
+
     def test_before_request_impersonation(self):
         expire_time = (
             _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600)
@@ -1091,6 +1435,17 @@
 
         assert credentials.project_number == self.PROJECT_NUMBER
 
+    def test_project_number_workforce(self):
+        credentials = CredentialsImpl(
+            audience=self.WORKFORCE_AUDIENCE,
+            subject_token_type=self.WORKFORCE_SUBJECT_TOKEN_TYPE,
+            token_url=self.TOKEN_URL,
+            credential_source=self.CREDENTIAL_SOURCE,
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
+        )
+
+        assert credentials.project_number is None
+
     def test_project_id_without_scopes(self):
         # Initialize credentials with no scopes.
         credentials = CredentialsImpl(
@@ -1190,6 +1545,68 @@
         # No additional requests.
         assert len(request.call_args_list) == 3
 
+    def test_workforce_pool_get_project_id_cloud_resource_manager_success(self):
+        # STS token exchange request/response.
+        token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
+        token_request_data = {
+            "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
+            "audience": self.WORKFORCE_AUDIENCE,
+            "requested_token_type": "urn:ietf:params:oauth:token-type:access_token",
+            "subject_token": "subject_token_0",
+            "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE,
+            "scope": "scope1 scope2",
+            "options": urllib.parse.quote(
+                json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT})
+            ),
+        }
+        # Initialize mock request to handle token exchange and cloud resource
+        # manager request.
+        request = self.make_mock_request(
+            status=http.client.OK,
+            data=self.SUCCESS_RESPONSE.copy(),
+            cloud_resource_manager_status=http.client.OK,
+            cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE,
+        )
+        credentials = self.make_workforce_pool_credentials(
+            scopes=self.SCOPES,
+            quota_project_id=self.QUOTA_PROJECT_ID,
+            workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT,
+        )
+
+        # Expected project ID from cloud resource manager response should be returned.
+        project_id = credentials.get_project_id(request)
+
+        assert project_id == self.PROJECT_ID
+        # 2 requests should be processed.
+        assert len(request.call_args_list) == 2
+        # Verify token exchange request parameters.
+        self.assert_token_request_kwargs(
+            request.call_args_list[0][1], token_headers, token_request_data
+        )
+        # In the process of getting project ID, an access token should be
+        # retrieved.
+        assert credentials.valid
+        assert not credentials.expired
+        assert credentials.token == self.SUCCESS_RESPONSE["access_token"]
+        # Verify cloud resource manager request parameters.
+        self.assert_resource_manager_request_kwargs(
+            request.call_args_list[1][1],
+            self.WORKFORCE_POOL_USER_PROJECT,
+            {
+                "x-goog-user-project": self.QUOTA_PROJECT_ID,
+                "authorization": "Bearer {}".format(
+                    self.SUCCESS_RESPONSE["access_token"]
+                ),
+            },
+        )
+
+        # Calling get_project_id again should return the cached project_id.
+        project_id = credentials.get_project_id(request)
+
+        assert project_id == self.PROJECT_ID
+        # No additional requests.
+        assert len(request.call_args_list) == 2
+
     def test_get_project_id_cloud_resource_manager_error(self):
         # Simulate resource doesn't have sufficient permissions to access
         # cloud resource manager.