chore: blacken (#375)
diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py
index bb2bbf2..70fa5dc 100644
--- a/google/auth/impersonated_credentials.py
+++ b/google/auth/impersonated_credentials.py
@@ -41,22 +41,28 @@
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
-_IAM_SCOPE = ['https://www.googleapis.com/auth/iam']
+_IAM_SCOPE = ["https://www.googleapis.com/auth/iam"]
-_IAM_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/projects/-' +
- '/serviceAccounts/{}:generateAccessToken')
+_IAM_ENDPOINT = (
+ "https://iamcredentials.googleapis.com/v1/projects/-"
+ + "/serviceAccounts/{}:generateAccessToken"
+)
-_IAM_SIGN_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/projects/-' +
- '/serviceAccounts/{}:signBlob')
+_IAM_SIGN_ENDPOINT = (
+ "https://iamcredentials.googleapis.com/v1/projects/-"
+ + "/serviceAccounts/{}:signBlob"
+)
-_IAM_IDTOKEN_ENDPOINT = ('https://iamcredentials.googleapis.com/v1/' +
- 'projects/-/serviceAccounts/{}:generateIdToken')
+_IAM_IDTOKEN_ENDPOINT = (
+ "https://iamcredentials.googleapis.com/v1/"
+ + "projects/-/serviceAccounts/{}:generateIdToken"
+)
-_REFRESH_ERROR = 'Unable to acquire impersonated credentials'
+_REFRESH_ERROR = "Unable to acquire impersonated credentials"
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
-_DEFAULT_TOKEN_URI = 'https://oauth2.googleapis.com/token'
+_DEFAULT_TOKEN_URI = "https://oauth2.googleapis.com/token"
def _make_iam_token_request(request, principal, headers, body):
@@ -80,34 +86,31 @@
body = json.dumps(body)
- response = request(
- url=iam_endpoint,
- method='POST',
- headers=headers,
- body=body)
+ response = request(url=iam_endpoint, method="POST", headers=headers, body=body)
- response_body = response.data.decode('utf-8')
+ response_body = response.data.decode("utf-8")
if response.status != http_client.OK:
exceptions.RefreshError(_REFRESH_ERROR, response_body)
try:
- token_response = json.loads(response.data.decode('utf-8'))
- token = token_response['accessToken']
- expiry = datetime.strptime(
- token_response['expireTime'], '%Y-%m-%dT%H:%M:%SZ')
+ token_response = json.loads(response.data.decode("utf-8"))
+ token = token_response["accessToken"]
+ expiry = datetime.strptime(token_response["expireTime"], "%Y-%m-%dT%H:%M:%SZ")
return token, expiry
except (KeyError, ValueError) as caught_exc:
new_exc = exceptions.RefreshError(
- '{}: No access token or invalid expiration in response.'.format(
- _REFRESH_ERROR),
- response_body)
+ "{}: No access token or invalid expiration in response.".format(
+ _REFRESH_ERROR
+ ),
+ response_body,
+ )
six.raise_from(new_exc, caught_exc)
-class Credentials(credentials.Credentials, credentials.Signing):
+class Credentials(credentials.Credentials, credentials.Signing):
"""This module defines impersonated credentials which are essentially
impersonated identities.
@@ -169,9 +172,14 @@
print(bucket.name)
"""
- def __init__(self, source_credentials, target_principal,
- target_scopes, delegates=None,
- lifetime=_DEFAULT_TOKEN_LIFETIME_SECS):
+ def __init__(
+ self,
+ source_credentials,
+ target_principal,
+ target_scopes,
+ delegates=None,
+ lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
+ ):
"""
Args:
source_credentials (google.auth.Credentials): The source credential
@@ -228,12 +236,10 @@
body = {
"delegates": self._delegates,
"scope": self._target_scopes,
- "lifetime": str(self._lifetime) + "s"
+ "lifetime": str(self._lifetime) + "s",
}
- headers = {
- 'Content-Type': 'application/json',
- }
+ headers = {"Content-Type": "application/json"}
# Apply the source credentials authentication info.
self._source_credentials.apply(headers)
@@ -242,29 +248,24 @@
request=request,
principal=self._target_principal,
headers=headers,
- body=body)
+ body=body,
+ )
def sign_bytes(self, message):
iam_sign_endpoint = _IAM_SIGN_ENDPOINT.format(self._target_principal)
- body = {
- "payload": base64.b64encode(message),
- "delegates": self._delegates
- }
+ body = {"payload": base64.b64encode(message), "delegates": self._delegates}
- headers = {
- 'Content-Type': 'application/json',
- }
+ headers = {"Content-Type": "application/json"}
authed_session = AuthorizedSession(self._source_credentials)
response = authed_session.post(
- url=iam_sign_endpoint,
- headers=headers,
- json=body)
+ url=iam_sign_endpoint, headers=headers, json=body
+ )
- return base64.b64decode(response.json()['signedBlob'])
+ return base64.b64decode(response.json()["signedBlob"])
@property
def signer_email(self):
@@ -283,8 +284,8 @@
"""Open ID Connect ID Token-based service account credentials.
"""
- def __init__(self, target_credentials,
- target_audience=None, include_email=False):
+
+ def __init__(self, target_credentials, target_audience=None, include_email=False):
"""
Args:
target_credentials (google.auth.Credentials): The target
@@ -294,57 +295,54 @@
"""
super(IDTokenCredentials, self).__init__()
- if not isinstance(target_credentials,
- Credentials):
- raise exceptions.GoogleAuthError("Provided Credential must be "
- "impersonated_credentials")
+ if not isinstance(target_credentials, Credentials):
+ raise exceptions.GoogleAuthError(
+ "Provided Credential must be " "impersonated_credentials"
+ )
self._target_credentials = target_credentials
self._target_audience = target_audience
self._include_email = include_email
- def from_credentials(self, target_credentials,
- target_audience=None):
+ def from_credentials(self, target_credentials, target_audience=None):
return self.__class__(
- target_credentials=self._target_credentials,
- target_audience=target_audience)
+ target_credentials=self._target_credentials, target_audience=target_audience
+ )
def with_target_audience(self, target_audience):
return self.__class__(
- target_credentials=self._target_credentials,
- target_audience=target_audience)
+ target_credentials=self._target_credentials, target_audience=target_audience
+ )
def with_include_email(self, include_email):
return self.__class__(
target_credentials=self._target_credentials,
target_audience=self._target_audience,
- include_email=include_email)
+ include_email=include_email,
+ )
@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
- iam_sign_endpoint = _IAM_IDTOKEN_ENDPOINT.format(self.
- _target_credentials.
- signer_email)
+ iam_sign_endpoint = _IAM_IDTOKEN_ENDPOINT.format(
+ self._target_credentials.signer_email
+ )
body = {
"audience": self._target_audience,
"delegates": self._target_credentials._delegates,
- "includeEmail": self._include_email
+ "includeEmail": self._include_email,
}
- headers = {
- 'Content-Type': 'application/json',
- }
+ headers = {"Content-Type": "application/json"}
- authed_session = AuthorizedSession(self._target_credentials.
- _source_credentials)
+ authed_session = AuthorizedSession(self._target_credentials._source_credentials)
response = authed_session.post(
url=iam_sign_endpoint,
headers=headers,
- data=json.dumps(body).encode('utf-8'))
+ data=json.dumps(body).encode("utf-8"),
+ )
- id_token = response.json()['token']
+ id_token = response.json()["token"]
self.token = id_token
- self.expiry = datetime.fromtimestamp(jwt.decode(id_token,
- verify=False)['exp'])
+ self.expiry = datetime.fromtimestamp(jwt.decode(id_token, verify=False)["exp"])