feat: add support for workforce pool credentials (#868)
Workforce pools (external account credentials for non-Google users) are
organization-level resources which means that issued workforce pool tokens
will not have any client project ID on token exchange as currently designed.
"To use a Google API, the client must identify the application to the server.
If the API requires authentication, the client must also identify the principal
running the application."
The application here is the client project. The token will identify the user
principal but not the application. This will result in APIs rejecting requests
authenticated with these tokens.
Note that passing a `x-goog-user-project` override header on API request is
still not sufficient. The token is still expected to have a client project.
As a result, we have extended the spec to support an additional
`workforce_pool_user_project` for these credentials (workforce pools) which will
be passed when exchanging an external token for a Google Access token. After the
exchange, the issued access token will use the supplied project as the client
project. The underlying principal must still have `serviceusage.services.use`
IAM permission to use the project for billing/quota.
This field is not needed for flows with basic client authentication (e.g. client
ID is supplied). The client ID is sufficient to determine the client project and
any additionally supplied `workforce_pool_user_project` value will be ignored.
Note that this feature is not usable yet publicly.
The additional field has been added to the abstract external account credentials
`google.auth.external_account.Credentials` and the subclass
`google.auth.identity_pool.Credentials`.
diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py
index efe11b0..e90e288 100644
--- a/tests/test_identity_pool.py
+++ b/tests/test_identity_pool.py
@@ -53,6 +53,11 @@
TOKEN_URL = "https://sts.googleapis.com/v1/token"
SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"
AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
+WORKFORCE_AUDIENCE = (
+ "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
+)
+WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token"
+WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
class TestCredentials(object):
@@ -158,6 +163,7 @@
credential_data=None,
scopes=None,
default_scopes=None,
+ workforce_pool_user_project=None,
):
"""Utility to assert that a credentials are initialized with the expected
attributes by calling refresh functionality and confirming response matches
@@ -183,6 +189,10 @@
"subject_token": subject_token,
"subject_token_type": subject_token_type,
}
+ if workforce_pool_user_project:
+ token_request_data["options"] = urllib.parse.quote(
+ json.dumps({"userProject": workforce_pool_user_project})
+ )
if service_account_impersonation_url:
# Service account impersonation request/response.
@@ -250,6 +260,8 @@
@classmethod
def make_credentials(
cls,
+ audience=AUDIENCE,
+ subject_token_type=SUBJECT_TOKEN_TYPE,
client_id=None,
client_secret=None,
quota_project_id=None,
@@ -257,10 +269,11 @@
default_scopes=None,
service_account_impersonation_url=None,
credential_source=None,
+ workforce_pool_user_project=None,
):
return identity_pool.Credentials(
- audience=AUDIENCE,
- subject_token_type=SUBJECT_TOKEN_TYPE,
+ audience=audience,
+ subject_token_type=subject_token_type,
token_url=TOKEN_URL,
service_account_impersonation_url=service_account_impersonation_url,
credential_source=credential_source,
@@ -269,6 +282,7 @@
quota_project_id=quota_project_id,
scopes=scopes,
default_scopes=default_scopes,
+ workforce_pool_user_project=workforce_pool_user_project,
)
@mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
@@ -297,6 +311,7 @@
client_secret=CLIENT_SECRET,
credential_source=self.CREDENTIAL_SOURCE_TEXT,
quota_project_id=QUOTA_PROJECT_ID,
+ workforce_pool_user_project=None,
)
@mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
@@ -321,6 +336,33 @@
client_secret=None,
credential_source=self.CREDENTIAL_SOURCE_TEXT,
quota_project_id=None,
+ workforce_pool_user_project=None,
+ )
+
+ @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
+ def test_from_info_workforce_pool(self, mock_init):
+ credentials = identity_pool.Credentials.from_info(
+ {
+ "audience": WORKFORCE_AUDIENCE,
+ "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_TEXT,
+ "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
+ }
+ )
+
+ # Confirm identity_pool.Credentials instantiated with expected attributes.
+ assert isinstance(credentials, identity_pool.Credentials)
+ mock_init.assert_called_once_with(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ client_id=None,
+ client_secret=None,
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ quota_project_id=None,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
)
@mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
@@ -350,6 +392,7 @@
client_secret=CLIENT_SECRET,
credential_source=self.CREDENTIAL_SOURCE_TEXT,
quota_project_id=QUOTA_PROJECT_ID,
+ workforce_pool_user_project=None,
)
@mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
@@ -375,6 +418,46 @@
client_secret=None,
credential_source=self.CREDENTIAL_SOURCE_TEXT,
quota_project_id=None,
+ workforce_pool_user_project=None,
+ )
+
+ @mock.patch.object(identity_pool.Credentials, "__init__", return_value=None)
+ def test_from_file_workforce_pool(self, mock_init, tmpdir):
+ info = {
+ "audience": WORKFORCE_AUDIENCE,
+ "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_TEXT,
+ "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
+ }
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(info))
+ credentials = identity_pool.Credentials.from_file(str(config_file))
+
+ # Confirm identity_pool.Credentials instantiated with expected attributes.
+ assert isinstance(credentials, identity_pool.Credentials)
+ mock_init.assert_called_once_with(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ client_id=None,
+ client_secret=None,
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ quota_project_id=None,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ def test_constructor_nonworkforce_with_workforce_pool_user_project(self):
+ with pytest.raises(ValueError) as excinfo:
+ self.make_credentials(
+ audience=AUDIENCE,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ assert excinfo.match(
+ "workforce_pool_user_project should not be set for non-workforce "
+ "pool credentials"
)
def test_constructor_invalid_options(self):
@@ -430,6 +513,23 @@
r"Missing subject_token_field_name for JSON credential_source format"
)
+ def test_info_with_workforce_pool_user_project(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy(),
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": WORKFORCE_AUDIENCE,
+ "subject_token_type": WORKFORCE_SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
+ "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
+ }
+
def test_info_with_file_credential_source(self):
credentials = self.make_credentials(
credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
@@ -557,6 +657,115 @@
default_scopes=["ignored"],
)
+ def test_refresh_workforce_success_with_client_auth_without_impersonation(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ # Test with text format type.
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ scopes=SCOPES,
+ # This will be ignored in favor of client auth.
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ self.assert_underlying_credentials_refresh(
+ credentials=credentials,
+ audience=WORKFORCE_AUDIENCE,
+ subject_token=TEXT_FILE_SUBJECT_TOKEN,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ basic_auth_encoding=BASIC_AUTH_ENCODING,
+ quota_project_id=None,
+ used_scopes=SCOPES,
+ scopes=SCOPES,
+ workforce_pool_user_project=None,
+ )
+
+ def test_refresh_workforce_success_with_client_auth_and_no_workforce_project(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ # Test with text format type.
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ scopes=SCOPES,
+ # This is not needed when client Auth is used.
+ workforce_pool_user_project=None,
+ )
+
+ self.assert_underlying_credentials_refresh(
+ credentials=credentials,
+ audience=WORKFORCE_AUDIENCE,
+ subject_token=TEXT_FILE_SUBJECT_TOKEN,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ basic_auth_encoding=BASIC_AUTH_ENCODING,
+ quota_project_id=None,
+ used_scopes=SCOPES,
+ scopes=SCOPES,
+ workforce_pool_user_project=None,
+ )
+
+ def test_refresh_workforce_success_without_client_auth_without_impersonation(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ client_id=None,
+ client_secret=None,
+ # Test with text format type.
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ scopes=SCOPES,
+ # This will not be ignored as client auth is not used.
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ self.assert_underlying_credentials_refresh(
+ credentials=credentials,
+ audience=WORKFORCE_AUDIENCE,
+ subject_token=TEXT_FILE_SUBJECT_TOKEN,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=None,
+ basic_auth_encoding=None,
+ quota_project_id=None,
+ used_scopes=SCOPES,
+ scopes=SCOPES,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ def test_refresh_workforce_success_without_client_auth_with_impersonation(self):
+ credentials = self.make_credentials(
+ audience=WORKFORCE_AUDIENCE,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ client_id=None,
+ client_secret=None,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ # Test with text format type.
+ credential_source=self.CREDENTIAL_SOURCE_TEXT,
+ scopes=SCOPES,
+ # This will not be ignored as client auth is not used.
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
+ self.assert_underlying_credentials_refresh(
+ credentials=credentials,
+ audience=WORKFORCE_AUDIENCE,
+ subject_token=TEXT_FILE_SUBJECT_TOKEN,
+ subject_token_type=WORKFORCE_SUBJECT_TOKEN_TYPE,
+ token_url=TOKEN_URL,
+ service_account_impersonation_url=SERVICE_ACCOUNT_IMPERSONATION_URL,
+ basic_auth_encoding=None,
+ quota_project_id=None,
+ used_scopes=SCOPES,
+ scopes=SCOPES,
+ workforce_pool_user_project=WORKFORCE_POOL_USER_PROJECT,
+ )
+
def test_refresh_text_file_success_without_impersonation_use_default_scopes(self):
credentials = self.make_credentials(
client_id=CLIENT_ID,