chore: blacken (#375)
diff --git a/google/auth/jwt.py b/google/auth/jwt.py
index d63c50b..a30c575 100644
--- a/google/auth/jwt.py
+++ b/google/auth/jwt.py
@@ -79,36 +79,30 @@
if key_id is None:
key_id = signer.key_id
- header.update({'typ': 'JWT', 'alg': 'RS256'})
+ header.update({"typ": "JWT", "alg": "RS256"})
if key_id is not None:
- header['kid'] = key_id
+ header["kid"] = key_id
segments = [
- _helpers.unpadded_urlsafe_b64encode(
- json.dumps(header).encode('utf-8')
- ),
- _helpers.unpadded_urlsafe_b64encode(
- json.dumps(payload).encode('utf-8')
- ),
+ _helpers.unpadded_urlsafe_b64encode(json.dumps(header).encode("utf-8")),
+ _helpers.unpadded_urlsafe_b64encode(json.dumps(payload).encode("utf-8")),
]
- signing_input = b'.'.join(segments)
+ signing_input = b".".join(segments)
signature = signer.sign(signing_input)
- segments.append(
- _helpers.unpadded_urlsafe_b64encode(signature)
- )
+ segments.append(_helpers.unpadded_urlsafe_b64encode(signature))
- return b'.'.join(segments)
+ return b".".join(segments)
def _decode_jwt_segment(encoded_section):
"""Decodes a single JWT segment."""
section_bytes = _helpers.padded_urlsafe_b64decode(encoded_section)
try:
- return json.loads(section_bytes.decode('utf-8'))
+ return json.loads(section_bytes.decode("utf-8"))
except ValueError as caught_exc:
- new_exc = ValueError('Can\'t parse segment: {0}'.format(section_bytes))
+ new_exc = ValueError("Can't parse segment: {0}".format(section_bytes))
six.raise_from(new_exc, caught_exc)
@@ -127,12 +121,11 @@
"""
token = _helpers.to_bytes(token)
- if token.count(b'.') != 2:
- raise ValueError(
- 'Wrong number of segments in token: {0}'.format(token))
+ if token.count(b".") != 2:
+ raise ValueError("Wrong number of segments in token: {0}".format(token))
- encoded_header, encoded_payload, signature = token.split(b'.')
- signed_section = encoded_header + b'.' + encoded_payload
+ encoded_header, encoded_payload, signature = token.split(b".")
+ signed_section = encoded_header + b"." + encoded_payload
signature = _helpers.padded_urlsafe_b64decode(signature)
# Parse segments
@@ -172,26 +165,25 @@
now = _helpers.datetime_to_secs(_helpers.utcnow())
# Make sure the iat and exp claims are present.
- for key in ('iat', 'exp'):
+ for key in ("iat", "exp"):
if key not in payload:
- raise ValueError(
- 'Token does not contain required claim {}'.format(key))
+ raise ValueError("Token does not contain required claim {}".format(key))
# Make sure the token wasn't issued in the future.
- iat = payload['iat']
+ iat = payload["iat"]
# Err on the side of accepting a token that is slightly early to account
# for clock skew.
earliest = iat - _helpers.CLOCK_SKEW_SECS
if now < earliest:
- raise ValueError('Token used too early, {} < {}'.format(now, iat))
+ raise ValueError("Token used too early, {} < {}".format(now, iat))
# Make sure the token wasn't issued in the past.
- exp = payload['exp']
+ exp = payload["exp"]
# Err on the side of accepting a token that is slightly out of date
# to account for clow skew.
latest = exp + _helpers.CLOCK_SKEW_SECS
if latest < now:
- raise ValueError('Token expired, {} < {}'.format(latest, now))
+ raise ValueError("Token expired, {} < {}".format(latest, now))
def decode(token, certs=None, verify=True, audience=None):
@@ -224,11 +216,10 @@
# If certs is specified as a dictionary of key IDs to certificates, then
# use the certificate identified by the key ID in the token header.
if isinstance(certs, collections.Mapping):
- key_id = header.get('kid')
+ key_id = header.get("kid")
if key_id:
if key_id not in certs:
- raise ValueError(
- 'Certificate for key id {} not found.'.format(key_id))
+ raise ValueError("Certificate for key id {} not found.".format(key_id))
certs_to_check = [certs[key_id]]
# If there's no key id in the header, check against all of the certs.
else:
@@ -238,24 +229,25 @@
# Verify that the signature matches the message.
if not crypt.verify_signature(signed_section, signature, certs_to_check):
- raise ValueError('Could not verify token signature.')
+ raise ValueError("Could not verify token signature.")
# Verify the issued at and created times in the payload.
_verify_iat_and_exp(payload)
# Check audience.
if audience is not None:
- claim_audience = payload.get('aud')
+ claim_audience = payload.get("aud")
if audience != claim_audience:
raise ValueError(
- 'Token has wrong audience {}, expected {}'.format(
- claim_audience, audience))
+ "Token has wrong audience {}, expected {}".format(
+ claim_audience, audience
+ )
+ )
return payload
-class Credentials(google.auth.credentials.Signing,
- google.auth.credentials.Credentials):
+class Credentials(google.auth.credentials.Signing, google.auth.credentials.Credentials):
"""Credentials that use a JWT as the bearer token.
These credentials require an "audience" claim. This claim identifies the
@@ -305,9 +297,15 @@
new_credentials = credentials.with_claims(audience=new_audience)
"""
- def __init__(self, signer, issuer, subject, audience,
- additional_claims=None,
- token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS):
+ def __init__(
+ self,
+ signer,
+ issuer,
+ subject,
+ audience,
+ additional_claims=None,
+ token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
+ ):
"""
Args:
signer (google.auth.crypt.Signer): The signer used to sign JWTs.
@@ -348,8 +346,8 @@
Raises:
ValueError: If the info is not in the expected format.
"""
- kwargs.setdefault('subject', info['client_email'])
- kwargs.setdefault('issuer', info['client_email'])
+ kwargs.setdefault("subject", info["client_email"])
+ kwargs.setdefault("issuer", info["client_email"])
return cls(signer, **kwargs)
@classmethod
@@ -367,8 +365,7 @@
Raises:
ValueError: If the info is not in the expected format.
"""
- signer = _service_account_info.from_dict(
- info, require=['client_email'])
+ signer = _service_account_info.from_dict(info, require=["client_email"])
return cls._from_signer_and_info(signer, info, **kwargs)
@classmethod
@@ -384,7 +381,8 @@
google.auth.jwt.Credentials: The constructed credentials.
"""
info, signer = _service_account_info.from_filename(
- filename, require=['client_email'])
+ filename, require=["client_email"]
+ )
return cls._from_signer_and_info(signer, info, **kwargs)
@classmethod
@@ -415,15 +413,13 @@
Returns:
google.auth.jwt.Credentials: A new Credentials instance.
"""
- kwargs.setdefault('issuer', credentials.signer_email)
- kwargs.setdefault('subject', credentials.signer_email)
- return cls(
- credentials.signer,
- audience=audience,
- **kwargs)
+ kwargs.setdefault("issuer", credentials.signer_email)
+ kwargs.setdefault("subject", credentials.signer_email)
+ return cls(credentials.signer, audience=audience, **kwargs)
- def with_claims(self, issuer=None, subject=None, audience=None,
- additional_claims=None):
+ def with_claims(
+ self, issuer=None, subject=None, audience=None, additional_claims=None
+ ):
"""Returns a copy of these credentials with modified claims.
Args:
@@ -448,7 +444,8 @@
issuer=issuer if issuer is not None else self._issuer,
subject=subject if subject is not None else self._subject,
audience=audience if audience is not None else self._audience,
- additional_claims=new_additional_claims)
+ additional_claims=new_additional_claims,
+ )
def _make_jwt(self):
"""Make a signed JWT.
@@ -461,11 +458,11 @@
expiry = now + lifetime
payload = {
- 'iss': self._issuer,
- 'sub': self._subject,
- 'iat': _helpers.datetime_to_secs(now),
- 'exp': _helpers.datetime_to_secs(expiry),
- 'aud': self._audience,
+ "iss": self._issuer,
+ "sub": self._subject,
+ "iat": _helpers.datetime_to_secs(now),
+ "exp": _helpers.datetime_to_secs(expiry),
+ "aud": self._audience,
}
payload.update(self._additional_claims)
@@ -500,8 +497,8 @@
class OnDemandCredentials(
- google.auth.credentials.Signing,
- google.auth.credentials.Credentials):
+ google.auth.credentials.Signing, google.auth.credentials.Credentials
+):
"""On-demand JWT credentials.
Like :class:`Credentials`, this class uses a JWT as the bearer token for
@@ -519,10 +516,15 @@
.. _grpc: http://www.grpc.io/
"""
- def __init__(self, signer, issuer, subject,
- additional_claims=None,
- token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
- max_cache_size=_DEFAULT_MAX_CACHE_SIZE):
+ def __init__(
+ self,
+ signer,
+ issuer,
+ subject,
+ additional_claims=None,
+ token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
+ max_cache_size=_DEFAULT_MAX_CACHE_SIZE,
+ ):
"""
Args:
signer (google.auth.crypt.Signer): The signer used to sign JWTs.
@@ -563,8 +565,8 @@
Raises:
ValueError: If the info is not in the expected format.
"""
- kwargs.setdefault('subject', info['client_email'])
- kwargs.setdefault('issuer', info['client_email'])
+ kwargs.setdefault("subject", info["client_email"])
+ kwargs.setdefault("issuer", info["client_email"])
return cls(signer, **kwargs)
@classmethod
@@ -582,8 +584,7 @@
Raises:
ValueError: If the info is not in the expected format.
"""
- signer = _service_account_info.from_dict(
- info, require=['client_email'])
+ signer = _service_account_info.from_dict(info, require=["client_email"])
return cls._from_signer_and_info(signer, info, **kwargs)
@classmethod
@@ -599,7 +600,8 @@
google.auth.jwt.OnDemandCredentials: The constructed credentials.
"""
info, signer = _service_account_info.from_filename(
- filename, require=['client_email'])
+ filename, require=["client_email"]
+ )
return cls._from_signer_and_info(signer, info, **kwargs)
@classmethod
@@ -626,8 +628,8 @@
Returns:
google.auth.jwt.Credentials: A new Credentials instance.
"""
- kwargs.setdefault('issuer', credentials.signer_email)
- kwargs.setdefault('subject', credentials.signer_email)
+ kwargs.setdefault("issuer", credentials.signer_email)
+ kwargs.setdefault("subject", credentials.signer_email)
return cls(credentials.signer, **kwargs)
def with_claims(self, issuer=None, subject=None, additional_claims=None):
@@ -653,7 +655,8 @@
issuer=issuer if issuer is not None else self._issuer,
subject=subject if subject is not None else self._subject,
additional_claims=new_additional_claims,
- max_cache_size=self._cache.maxsize)
+ max_cache_size=self._cache.maxsize,
+ )
@property
def valid(self):
@@ -678,11 +681,11 @@
expiry = now + lifetime
payload = {
- 'iss': self._issuer,
- 'sub': self._subject,
- 'iat': _helpers.datetime_to_secs(now),
- 'exp': _helpers.datetime_to_secs(expiry),
- 'aud': audience,
+ "iss": self._issuer,
+ "sub": self._subject,
+ "iat": _helpers.datetime_to_secs(now),
+ "exp": _helpers.datetime_to_secs(expiry),
+ "aud": audience,
}
payload.update(self._additional_claims)
@@ -725,7 +728,8 @@
# pylint: disable=unused-argument
# (pylint doesn't correctly recognize overridden methods.)
raise exceptions.RefreshError(
- 'OnDemandCredentials can not be directly refreshed.')
+ "OnDemandCredentials can not be directly refreshed."
+ )
def before_request(self, request, method, url, headers):
"""Performs credential-specific before request logic.
@@ -743,7 +747,8 @@
parts = urllib.parse.urlsplit(url)
# Strip query string and fragment
audience = urllib.parse.urlunsplit(
- (parts.scheme, parts.netloc, parts.path, "", ""))
+ (parts.scheme, parts.netloc, parts.path, "", "")
+ )
token = self._get_jwt_for_audience(audience)
self.apply(headers, token=token)