feat: define useful properties on `google.auth.external_account.Credentials` (#770)
This includes the following properties:
- `info`: This is the reverse of `from_info` defined on subclasses and useful to
serialize external account credentials.
- `service_account_email`: This is the corresponding service account email if impersonation is used.
- `is_user`: This is `False` for workload identity pools and `True` for workforce pools (not yet supported).
This can be mainly determined from the STS audience.
While the properties will primarily facilitate integration with gcloud, they are publicly useful for other contexts.
diff --git a/tests/test_aws.py b/tests/test_aws.py
index 7c7ee36..9ca08d5 100644
--- a/tests/test_aws.py
+++ b/tests/test_aws.py
@@ -919,6 +919,19 @@
assert excinfo.match(r"aws version '3' is not supported in the current build.")
+ def test_info(self):
+ credentials = self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE.copy()
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE,
+ }
+
def test_retrieve_subject_token_missing_region_url(self):
# When AWS_REGION envvar is not available, region_url is required for
# determining the current AWS region.
diff --git a/tests/test_external_account.py b/tests/test_external_account.py
index 8f8d980..7390fb9 100644
--- a/tests/test_external_account.py
+++ b/tests/test_external_account.py
@@ -31,6 +31,12 @@
# Base64 encoding of "username:password"
BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ="
SERVICE_ACCOUNT_EMAIL = "service-1234@service-name.iam.gserviceaccount.com"
+# List of valid workforce pool audiences.
+TEST_USER_AUDIENCES = [
+ "//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id",
+ "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
+ "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id",
+]
class CredentialsImpl(external_account.Credentials):
@@ -342,6 +348,116 @@
r"Unable to determine target principal from service account impersonation URL."
)
+ def test_info(self):
+ credentials = self.make_credentials()
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": self.AUDIENCE,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "token_url": self.TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE.copy(),
+ }
+
+ def test_info_with_full_options(self):
+ credentials = self.make_credentials(
+ client_id=CLIENT_ID,
+ client_secret=CLIENT_SECRET,
+ quota_project_id=self.QUOTA_PROJECT_ID,
+ service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": self.AUDIENCE,
+ "subject_token_type": self.SUBJECT_TOKEN_TYPE,
+ "token_url": self.TOKEN_URL,
+ "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL,
+ "credential_source": self.CREDENTIAL_SOURCE.copy(),
+ "quota_project_id": self.QUOTA_PROJECT_ID,
+ "client_id": CLIENT_ID,
+ "client_secret": CLIENT_SECRET,
+ }
+
+ def test_service_account_email_without_impersonation(self):
+ credentials = self.make_credentials()
+
+ assert credentials.service_account_email is None
+
+ def test_service_account_email_with_impersonation(self):
+ credentials = self.make_credentials(
+ service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL
+ )
+
+ assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL
+
+ @pytest.mark.parametrize(
+ "audience",
+ # Workload identity pool audiences or invalid workforce pool audiences.
+ [
+ # Legacy K8s audience format.
+ "identitynamespace:1f12345:my_provider",
+ (
+ "//iam.googleapis.com/projects/123456/locations/"
+ "global/workloadIdentityPools/pool-id/providers/"
+ "provider-id"
+ ),
+ (
+ "//iam.googleapis.com/projects/123456/locations/"
+ "eu/workloadIdentityPools/pool-id/providers/"
+ "provider-id"
+ ),
+ # Pool ID with workforcePools string.
+ (
+ "//iam.googleapis.com/projects/123456/locations/"
+ "global/workloadIdentityPools/workforcePools/providers/"
+ "provider-id"
+ ),
+ # Unrealistic / incorrect workforce pool audiences.
+ "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id",
+ "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id",
+ "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id",
+ "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id",
+ "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id",
+ ],
+ )
+ def test_is_user_with_non_users(self, audience):
+ credentials = CredentialsImpl(
+ audience=audience,
+ subject_token_type=self.SUBJECT_TOKEN_TYPE,
+ token_url=self.TOKEN_URL,
+ credential_source=self.CREDENTIAL_SOURCE,
+ )
+
+ assert credentials.is_user is False
+
+ @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
+ def test_is_user_with_users(self, audience):
+ credentials = CredentialsImpl(
+ audience=audience,
+ subject_token_type=self.SUBJECT_TOKEN_TYPE,
+ token_url=self.TOKEN_URL,
+ credential_source=self.CREDENTIAL_SOURCE,
+ )
+
+ assert credentials.is_user is True
+
+ @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES)
+ def test_is_user_with_users_and_impersonation(self, audience):
+ # Initialize the credentials with service account impersonation.
+ credentials = CredentialsImpl(
+ audience=audience,
+ subject_token_type=self.SUBJECT_TOKEN_TYPE,
+ token_url=self.TOKEN_URL,
+ credential_source=self.CREDENTIAL_SOURCE,
+ service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL,
+ )
+
+ # Even though the audience is for a workforce pool, since service account
+ # impersonation is used, the credentials will represent a service account and
+ # not a user.
+ assert credentials.is_user is False
+
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_without_client_auth_success(self, unused_utcnow):
response = self.SUCCESS_RESPONSE.copy()
diff --git a/tests/test_identity_pool.py b/tests/test_identity_pool.py
index 90a0e25..b529268 100644
--- a/tests/test_identity_pool.py
+++ b/tests/test_identity_pool.py
@@ -430,6 +430,32 @@
r"Missing subject_token_field_name for JSON credential_source format"
)
+ def test_info_with_file_credential_source(self):
+ credentials = self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE_TEXT_URL.copy()
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_TEXT_URL,
+ }
+
+ def test_info_with_url_credential_source(self):
+ credentials = self.make_credentials(
+ credential_source=self.CREDENTIAL_SOURCE_JSON_URL.copy()
+ )
+
+ assert credentials.info == {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": SUBJECT_TOKEN_TYPE,
+ "token_url": TOKEN_URL,
+ "credential_source": self.CREDENTIAL_SOURCE_JSON_URL,
+ }
+
def test_retrieve_subject_token_missing_subject_token(self, tmpdir):
# Provide empty text file.
empty_file = tmpdir.join("empty.txt")