feat: use self-signed jwt for service account (#665)
diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py
index c4898a2..ed91011 100644
--- a/google/oauth2/service_account.py
+++ b/google/oauth2/service_account.py
@@ -126,6 +126,7 @@
service_account_email,
token_uri,
scopes=None,
+ default_scopes=None,
subject=None,
project_id=None,
quota_project_id=None,
@@ -135,8 +136,10 @@
Args:
signer (google.auth.crypt.Signer): The signer used to sign JWTs.
service_account_email (str): The service account's email.
- scopes (Sequence[str]): Scopes to request during the authorization
- grant.
+ scopes (Sequence[str]): User-defined scopes to request during the
+ authorization grant.
+ default_scopes (Sequence[str]): Default scopes passed by a
+ Google client library. Use 'scopes' for user-defined scopes.
token_uri (str): The OAuth 2.0 Token URI.
subject (str): For domain-wide delegation, the email address of the
user to for which to request delegated access.
@@ -155,6 +158,7 @@
super(Credentials, self).__init__()
self._scopes = scopes
+ self._default_scopes = default_scopes
self._signer = signer
self._service_account_email = service_account_email
self._subject = subject
@@ -162,6 +166,8 @@
self._quota_project_id = quota_project_id
self._token_uri = token_uri
+ self._jwt_credentials = None
+
if additional_claims is not None:
self._additional_claims = additional_claims
else:
@@ -249,11 +255,12 @@
return True if not self._scopes else False
@_helpers.copy_docstring(credentials.Scoped)
- def with_scopes(self, scopes):
+ def with_scopes(self, scopes, default_scopes=None):
return self.__class__(
self._signer,
service_account_email=self._service_account_email,
scopes=scopes,
+ default_scopes=default_scopes,
token_uri=self._token_uri,
subject=self._subject,
project_id=self._project_id,
@@ -275,6 +282,7 @@
self._signer,
service_account_email=self._service_account_email,
scopes=self._scopes,
+ default_scopes=self._default_scopes,
token_uri=self._token_uri,
subject=subject,
project_id=self._project_id,
@@ -301,6 +309,7 @@
self._signer,
service_account_email=self._service_account_email,
scopes=self._scopes,
+ default_scopes=self._default_scopes,
token_uri=self._token_uri,
subject=self._subject,
project_id=self._project_id,
@@ -314,6 +323,7 @@
return self.__class__(
self._signer,
service_account_email=self._service_account_email,
+ default_scopes=self._default_scopes,
scopes=self._scopes,
token_uri=self._token_uri,
subject=self._subject,
@@ -357,10 +367,30 @@
@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
- assertion = self._make_authorization_grant_assertion()
- access_token, expiry, _ = _client.jwt_grant(request, self._token_uri, assertion)
- self.token = access_token
- self.expiry = expiry
+ if self._jwt_credentials is not None:
+ self._jwt_credentials.refresh(request)
+ self.token = self._jwt_credentials.token
+ self.expiry = self._jwt_credentials.expiry
+ else:
+ assertion = self._make_authorization_grant_assertion()
+ access_token, expiry, _ = _client.jwt_grant(
+ request, self._token_uri, assertion
+ )
+ self.token = access_token
+ self.expiry = expiry
+
+ def _create_self_signed_jwt(self, audience):
+ """Create a self-signed JWT from the credentials if requirements are met.
+
+ Args:
+ audience (str): The service URL. ``https://[API_ENDPOINT]/``
+ """
+ # https://google.aip.dev/auth/4111
+ # If the user has not defined scopes, create a self-signed jwt
+ if not self.scopes:
+ self._jwt_credentials = jwt.Credentials.from_signing_credentials(
+ self, audience
+ )
@_helpers.copy_docstring(credentials.Signing)
def sign_bytes(self, message):