feat: define useful properties on `google.auth.external_account.Credentials` (#770)
This includes the following properties:
- `info`: This is the reverse of `from_info` defined on subclasses and useful to
serialize external account credentials.
- `service_account_email`: This is the corresponding service account email if impersonation is used.
- `is_user`: This is `False` for workload identity pools and `True` for workforce pools (not yet supported).
This can be mainly determined from the STS audience.
While the properties will primarily facilitate integration with gcloud, they are publicly useful for other contexts.
diff --git a/google/auth/external_account.py b/google/auth/external_account.py
index 0429ee0..e40c652 100644
--- a/google/auth/external_account.py
+++ b/google/auth/external_account.py
@@ -28,8 +28,10 @@
"""
import abc
+import copy
import datetime
import json
+import re
import six
@@ -40,6 +42,8 @@
from google.oauth2 import sts
from google.oauth2 import utils
+# External account JSON type identifier.
+_EXTERNAL_ACCOUNT_JSON_TYPE = "external_account"
# The token exchange grant_type used for exchanging credentials.
_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
# The token exchange requested_token_type. This is always an access_token.
@@ -118,6 +122,76 @@
self._project_id = None
@property
+ def info(self):
+ """Generates the dictionary representation of the current credentials.
+
+ Returns:
+ Mapping: The dictionary representation of the credentials. This is the
+ reverse of "from_info" defined on the subclasses of this class. It is
+ useful for serializing the current credentials so it can deserialized
+ later.
+ """
+ config_info = {
+ "type": _EXTERNAL_ACCOUNT_JSON_TYPE,
+ "audience": self._audience,
+ "subject_token_type": self._subject_token_type,
+ "token_url": self._token_url,
+ "service_account_impersonation_url": self._service_account_impersonation_url,
+ "credential_source": copy.deepcopy(self._credential_source),
+ "quota_project_id": self._quota_project_id,
+ "client_id": self._client_id,
+ "client_secret": self._client_secret,
+ }
+ # Remove None fields in the info dictionary.
+ for k, v in dict(config_info).items():
+ if v is None:
+ del config_info[k]
+
+ return config_info
+
+ @property
+ def service_account_email(self):
+ """Returns the service account email if service account impersonation is used.
+
+ Returns:
+ Optional[str]: The service account email if impersonation is used. Otherwise
+ None is returned.
+ """
+ if self._service_account_impersonation_url:
+ # Parse email from URL. The formal looks as follows:
+ # https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/name@project-id.iam.gserviceaccount.com:generateAccessToken
+ url = self._service_account_impersonation_url
+ start_index = url.rfind("/")
+ end_index = url.find(":generateAccessToken")
+ if start_index != -1 and end_index != -1 and start_index < end_index:
+ start_index = start_index + 1
+ return url[start_index:end_index]
+ return None
+
+ @property
+ def is_user(self):
+ """Returns whether the credentials represent a user (True) or workload (False).
+ Workloads behave similarly to service accounts. Currently workloads will use
+ service account impersonation but will eventually not require impersonation.
+ As a result, this property is more reliable than the service account email
+ property in determining if the credentials represent a user or workload.
+
+ Returns:
+ bool: True if the credentials represent a user. False if they represent a
+ workload.
+ """
+ # If service account impersonation is used, the credentials will always represent a
+ # service account.
+ if self._service_account_impersonation_url:
+ return False
+ # Workforce pools representing users have the following audience format:
+ # //iam.googleapis.com/locations/$location/workforcePools/$poolId/providers/$providerId
+ p = re.compile(r"//iam\.googleapis\.com/locations/[^/]+/workforcePools/")
+ if p.match(self._audience):
+ return True
+ return False
+
+ @property
def requires_scopes(self):
"""Checks if the credentials requires scopes.
@@ -282,14 +356,8 @@
)
# Determine target_principal.
- start_index = self._service_account_impersonation_url.rfind("/")
- end_index = self._service_account_impersonation_url.find(":generateAccessToken")
- if start_index != -1 and end_index != -1 and start_index < end_index:
- start_index = start_index + 1
- target_principal = self._service_account_impersonation_url[
- start_index:end_index
- ]
- else:
+ target_principal = self.service_account_email
+ if not target_principal:
raise exceptions.RefreshError(
"Unable to determine target principal from service account impersonation URL."
)