chore: blacken (#375)
diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py
index 6fc4c3b..c415a1f 100644
--- a/tests/oauth2/test__client.py
+++ b/tests/oauth2/test__client.py
@@ -30,42 +30,44 @@
from google.oauth2 import _client
-DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
-SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
-SCOPES_AS_LIST = ['https://www.googleapis.com/auth/pubsub',
- 'https://www.googleapis.com/auth/logging.write']
-SCOPES_AS_STRING = ('https://www.googleapis.com/auth/pubsub'
- ' https://www.googleapis.com/auth/logging.write')
+SCOPES_AS_LIST = [
+ "https://www.googleapis.com/auth/pubsub",
+ "https://www.googleapis.com/auth/logging.write",
+]
+SCOPES_AS_STRING = (
+ "https://www.googleapis.com/auth/pubsub"
+ " https://www.googleapis.com/auth/logging.write"
+)
def test__handle_error_response():
- response_data = json.dumps({
- 'error': 'help',
- 'error_description': 'I\'m alive'})
+ response_data = json.dumps({"error": "help", "error_description": "I'm alive"})
with pytest.raises(exceptions.RefreshError) as excinfo:
_client._handle_error_response(response_data)
- assert excinfo.match(r'help: I\'m alive')
+ assert excinfo.match(r"help: I\'m alive")
def test__handle_error_response_non_json():
- response_data = 'Help, I\'m alive'
+ response_data = "Help, I'm alive"
with pytest.raises(exceptions.RefreshError) as excinfo:
_client._handle_error_response(response_data)
- assert excinfo.match(r'Help, I\'m alive')
+ assert excinfo.match(r"Help, I\'m alive")
-@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test__parse_expiry(unused_utcnow):
- result = _client._parse_expiry({'expires_in': 500})
+ result = _client._parse_expiry({"expires_in": 500})
assert result == datetime.datetime.min + datetime.timedelta(seconds=500)
@@ -76,188 +78,214 @@
def make_request(response_data, status=http_client.OK):
response = mock.create_autospec(transport.Response, instance=True)
response.status = status
- response.data = json.dumps(response_data).encode('utf-8')
+ response.data = json.dumps(response_data).encode("utf-8")
request = mock.create_autospec(transport.Request)
request.return_value = response
return request
def test__token_endpoint_request():
- request = make_request({'test': 'response'})
+ request = make_request({"test": "response"})
result = _client._token_endpoint_request(
- request, 'http://example.com', {'test': 'params'})
+ request, "http://example.com", {"test": "params"}
+ )
# Check request call
request.assert_called_with(
- method='POST',
- url='http://example.com',
- headers={'content-type': 'application/x-www-form-urlencoded'},
- body='test=params')
+ method="POST",
+ url="http://example.com",
+ headers={"content-type": "application/x-www-form-urlencoded"},
+ body="test=params",
+ )
# Check result
- assert result == {'test': 'response'}
+ assert result == {"test": "response"}
def test__token_endpoint_request_error():
request = make_request({}, status=http_client.BAD_REQUEST)
with pytest.raises(exceptions.RefreshError):
- _client._token_endpoint_request(request, 'http://example.com', {})
+ _client._token_endpoint_request(request, "http://example.com", {})
def test__token_endpoint_request_internal_failure_error():
- request = make_request({'error': 'internal_failure',
- 'error_description': 'internal_failure'},
- status=http_client.BAD_REQUEST)
+ request = make_request(
+ {"error": "internal_failure", "error_description": "internal_failure"},
+ status=http_client.BAD_REQUEST,
+ )
with pytest.raises(exceptions.RefreshError):
_client._token_endpoint_request(
- request, 'http://example.com',
- {'error': 'internal_failure',
- 'error_description': 'internal_failure'})
+ request,
+ "http://example.com",
+ {"error": "internal_failure", "error_description": "internal_failure"},
+ )
def verify_request_params(request, params):
- request_body = request.call_args[1]['body']
+ request_body = request.call_args[1]["body"]
request_params = urllib.parse.parse_qs(request_body)
for key, value in six.iteritems(params):
assert request_params[key][0] == value
-@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_jwt_grant(utcnow):
- request = make_request({
- 'access_token': 'token',
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {"access_token": "token", "expires_in": 500, "extra": "data"}
+ )
token, expiry, extra_data = _client.jwt_grant(
- request, 'http://example.com', 'assertion_value')
+ request, "http://example.com", "assertion_value"
+ )
# Check request call
- verify_request_params(request, {
- 'grant_type': _client._JWT_GRANT_TYPE,
- 'assertion': 'assertion_value'
- })
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
# Check result
- assert token == 'token'
+ assert token == "token"
assert expiry == utcnow() + datetime.timedelta(seconds=500)
- assert extra_data['extra'] == 'data'
+ assert extra_data["extra"] == "data"
def test_jwt_grant_no_access_token():
- request = make_request({
- # No access token.
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
with pytest.raises(exceptions.RefreshError):
- _client.jwt_grant(request, 'http://example.com', 'assertion_value')
+ _client.jwt_grant(request, "http://example.com", "assertion_value")
def test_id_token_jwt_grant():
now = _helpers.utcnow()
id_token_expiry = _helpers.datetime_to_secs(now)
- id_token = jwt.encode(SIGNER, {'exp': id_token_expiry}).decode('utf-8')
- request = make_request({
- 'id_token': id_token,
- 'extra': 'data'})
+ id_token = jwt.encode(SIGNER, {"exp": id_token_expiry}).decode("utf-8")
+ request = make_request({"id_token": id_token, "extra": "data"})
token, expiry, extra_data = _client.id_token_jwt_grant(
- request, 'http://example.com', 'assertion_value')
+ request, "http://example.com", "assertion_value"
+ )
# Check request call
- verify_request_params(request, {
- 'grant_type': _client._JWT_GRANT_TYPE,
- 'assertion': 'assertion_value'
- })
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
# Check result
assert token == id_token
# JWT does not store microseconds
now = now.replace(microsecond=0)
assert expiry == now
- assert extra_data['extra'] == 'data'
+ assert extra_data["extra"] == "data"
def test_id_token_jwt_grant_no_access_token():
- request = make_request({
- # No access token.
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
with pytest.raises(exceptions.RefreshError):
- _client.id_token_jwt_grant(
- request, 'http://example.com', 'assertion_value')
+ _client.id_token_jwt_grant(request, "http://example.com", "assertion_value")
-@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_grant(unused_utcnow):
- request = make_request({
- 'access_token': 'token',
- 'refresh_token': 'new_refresh_token',
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
token, refresh_token, expiry, extra_data = _client.refresh_grant(
- request, 'http://example.com', 'refresh_token', 'client_id',
- 'client_secret')
+ request, "http://example.com", "refresh_token", "client_id", "client_secret"
+ )
# Check request call
- verify_request_params(request, {
- 'grant_type': _client._REFRESH_GRANT_TYPE,
- 'refresh_token': 'refresh_token',
- 'client_id': 'client_id',
- 'client_secret': 'client_secret'
- })
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ },
+ )
# Check result
- assert token == 'token'
- assert refresh_token == 'new_refresh_token'
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
- assert extra_data['extra'] == 'data'
+ assert extra_data["extra"] == "data"
-@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
+@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_grant_with_scopes(unused_utcnow):
- request = make_request({
- 'access_token': 'token',
- 'refresh_token': 'new_refresh_token',
- 'expires_in': 500,
- 'extra': 'data',
- 'scope': SCOPES_AS_STRING})
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ "scope": SCOPES_AS_STRING,
+ }
+ )
token, refresh_token, expiry, extra_data = _client.refresh_grant(
- request, 'http://example.com', 'refresh_token', 'client_id',
- 'client_secret', SCOPES_AS_LIST)
+ request,
+ "http://example.com",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ SCOPES_AS_LIST,
+ )
# Check request call.
- verify_request_params(request, {
- 'grant_type': _client._REFRESH_GRANT_TYPE,
- 'refresh_token': 'refresh_token',
- 'client_id': 'client_id',
- 'client_secret': 'client_secret',
- 'scope': SCOPES_AS_STRING
- })
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "scope": SCOPES_AS_STRING,
+ },
+ )
# Check result.
- assert token == 'token'
- assert refresh_token == 'new_refresh_token'
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
- assert extra_data['extra'] == 'data'
+ assert extra_data["extra"] == "data"
def test_refresh_grant_no_access_token():
- request = make_request({
- # No access token.
- 'refresh_token': 'new_refresh_token',
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {
+ # No access token.
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
with pytest.raises(exceptions.RefreshError):
_client.refresh_grant(
- request, 'http://example.com', 'refresh_token', 'client_id',
- 'client_secret')
+ request, "http://example.com", "refresh_token", "client_id", "client_secret"
+ )
diff --git a/tests/oauth2/test_credentials.py b/tests/oauth2/test_credentials.py
index 3231509..8bfdd7e 100644
--- a/tests/oauth2/test_credentials.py
+++ b/tests/oauth2/test_credentials.py
@@ -25,26 +25,29 @@
from google.oauth2 import credentials
-DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
-AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, 'authorized_user.json')
+AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json")
-with open(AUTH_USER_JSON_FILE, 'r') as fh:
+with open(AUTH_USER_JSON_FILE, "r") as fh:
AUTH_USER_INFO = json.load(fh)
class TestCredentials(object):
- TOKEN_URI = 'https://example.com/oauth2/token'
- REFRESH_TOKEN = 'refresh_token'
- CLIENT_ID = 'client_id'
- CLIENT_SECRET = 'client_secret'
+ TOKEN_URI = "https://example.com/oauth2/token"
+ REFRESH_TOKEN = "refresh_token"
+ CLIENT_ID = "client_id"
+ CLIENT_SECRET = "client_secret"
@classmethod
def make_credentials(cls):
return credentials.Credentials(
- token=None, refresh_token=cls.REFRESH_TOKEN,
- token_uri=cls.TOKEN_URI, client_id=cls.CLIENT_ID,
- client_secret=cls.CLIENT_SECRET)
+ token=None,
+ refresh_token=cls.REFRESH_TOKEN,
+ token_uri=cls.TOKEN_URI,
+ client_id=cls.CLIENT_ID,
+ client_secret=cls.CLIENT_SECRET,
+ )
def test_default_state(self):
credentials = self.make_credentials()
@@ -59,14 +62,15 @@
assert credentials.client_id == self.CLIENT_ID
assert credentials.client_secret == self.CLIENT_SECRET
- @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
def test_refresh_success(self, unused_utcnow, refresh_grant):
- token = 'token'
+ token = "token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {'id_token': mock.sentinel.id_token}
+ grant_response = {"id_token": mock.sentinel.id_token}
refresh_grant.return_value = (
# Access token
token,
@@ -75,7 +79,8 @@
# Expiry,
expiry,
# Extra data
- grant_response)
+ grant_response,
+ )
request = mock.create_autospec(transport.Request)
credentials = self.make_credentials()
@@ -85,8 +90,13 @@
# Check jwt grant call.
refresh_grant.assert_called_with(
- request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
- self.CLIENT_SECRET, None)
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ None,
+ )
# Check that the credentials have the token and expiry
assert credentials.token == token
@@ -99,24 +109,25 @@
def test_refresh_no_refresh_token(self):
request = mock.create_autospec(transport.Request)
- credentials_ = credentials.Credentials(
- token=None, refresh_token=None)
+ credentials_ = credentials.Credentials(token=None, refresh_token=None)
- with pytest.raises(exceptions.RefreshError, match='necessary fields'):
+ with pytest.raises(exceptions.RefreshError, match="necessary fields"):
credentials_.refresh(request)
request.assert_not_called()
- @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
def test_credentials_with_scopes_requested_refresh_success(
- self, unused_utcnow, refresh_grant):
- scopes = ['email', 'profile']
- token = 'token'
+ self, unused_utcnow, refresh_grant
+ ):
+ scopes = ["email", "profile"]
+ token = "token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {'id_token': mock.sentinel.id_token}
+ grant_response = {"id_token": mock.sentinel.id_token}
refresh_grant.return_value = (
# Access token
token,
@@ -125,21 +136,31 @@
# Expiry,
expiry,
# Extra data
- grant_response)
+ grant_response,
+ )
request = mock.create_autospec(transport.Request)
creds = credentials.Credentials(
- token=None, refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET, scopes=scopes)
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ scopes=scopes,
+ )
# Refresh credentials
creds.refresh(request)
# Check jwt grant call.
refresh_grant.assert_called_with(
- request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
- self.CLIENT_SECRET, scopes)
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ scopes,
+ )
# Check that the credentials have the token and expiry
assert creds.token == token
@@ -151,17 +172,21 @@
# expired.)
assert creds.valid
- @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
def test_credentials_with_scopes_returned_refresh_success(
- self, unused_utcnow, refresh_grant):
- scopes = ['email', 'profile']
- token = 'token'
+ self, unused_utcnow, refresh_grant
+ ):
+ scopes = ["email", "profile"]
+ token = "token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {'id_token': mock.sentinel.id_token,
- 'scopes': ' '.join(scopes)}
+ grant_response = {
+ "id_token": mock.sentinel.id_token,
+ "scopes": " ".join(scopes),
+ }
refresh_grant.return_value = (
# Access token
token,
@@ -170,21 +195,31 @@
# Expiry,
expiry,
# Extra data
- grant_response)
+ grant_response,
+ )
request = mock.create_autospec(transport.Request)
creds = credentials.Credentials(
- token=None, refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET, scopes=scopes)
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ scopes=scopes,
+ )
# Refresh credentials
creds.refresh(request)
# Check jwt grant call.
refresh_grant.assert_called_with(
- request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
- self.CLIENT_SECRET, scopes)
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ scopes,
+ )
# Check that the credentials have the token and expiry
assert creds.token == token
@@ -196,18 +231,22 @@
# expired.)
assert creds.valid
- @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
def test_credentials_with_scopes_refresh_failure_raises_refresh_error(
- self, unused_utcnow, refresh_grant):
- scopes = ['email', 'profile']
- scopes_returned = ['email']
- token = 'token'
+ self, unused_utcnow, refresh_grant
+ ):
+ scopes = ["email", "profile"]
+ scopes_returned = ["email"]
+ token = "token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {'id_token': mock.sentinel.id_token,
- 'scopes': ' '.join(scopes_returned)}
+ grant_response = {
+ "id_token": mock.sentinel.id_token,
+ "scopes": " ".join(scopes_returned),
+ }
refresh_grant.return_value = (
# Access token
token,
@@ -216,23 +255,34 @@
# Expiry,
expiry,
# Extra data
- grant_response)
+ grant_response,
+ )
request = mock.create_autospec(transport.Request)
creds = credentials.Credentials(
- token=None, refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET, scopes=scopes)
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ scopes=scopes,
+ )
# Refresh credentials
- with pytest.raises(exceptions.RefreshError,
- match='Not all requested scopes were granted'):
+ with pytest.raises(
+ exceptions.RefreshError, match="Not all requested scopes were granted"
+ ):
creds.refresh(request)
# Check jwt grant call.
refresh_grant.assert_called_with(
- request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
- self.CLIENT_SECRET, scopes)
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ scopes,
+ )
# Check that the credentials have the token and expiry
assert creds.token == token
@@ -248,37 +298,36 @@
info = AUTH_USER_INFO.copy()
creds = credentials.Credentials.from_authorized_user_info(info)
- assert creds.client_secret == info['client_secret']
- assert creds.client_id == info['client_id']
- assert creds.refresh_token == info['refresh_token']
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
assert creds.scopes is None
- scopes = ['email', 'profile']
- creds = credentials.Credentials.from_authorized_user_info(
- info, scopes)
- assert creds.client_secret == info['client_secret']
- assert creds.client_id == info['client_id']
- assert creds.refresh_token == info['refresh_token']
+ scopes = ["email", "profile"]
+ creds = credentials.Credentials.from_authorized_user_info(info, scopes)
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
assert creds.scopes == scopes
def test_from_authorized_user_file(self):
info = AUTH_USER_INFO.copy()
- creds = credentials.Credentials.from_authorized_user_file(
- AUTH_USER_JSON_FILE)
- assert creds.client_secret == info['client_secret']
- assert creds.client_id == info['client_id']
- assert creds.refresh_token == info['refresh_token']
+ creds = credentials.Credentials.from_authorized_user_file(AUTH_USER_JSON_FILE)
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
assert creds.scopes is None
- scopes = ['email', 'profile']
+ scopes = ["email", "profile"]
creds = credentials.Credentials.from_authorized_user_file(
- AUTH_USER_JSON_FILE, scopes)
- assert creds.client_secret == info['client_secret']
- assert creds.client_id == info['client_id']
- assert creds.refresh_token == info['refresh_token']
+ AUTH_USER_JSON_FILE, scopes
+ )
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
assert creds.scopes == scopes
diff --git a/tests/oauth2/test_id_token.py b/tests/oauth2/test_id_token.py
index 360f92f..980a8e9 100644
--- a/tests/oauth2/test_id_token.py
+++ b/tests/oauth2/test_id_token.py
@@ -27,7 +27,7 @@
response.status = status
if data is not None:
- response.data = json.dumps(data).encode('utf-8')
+ response.data = json.dumps(data).encode("utf-8")
request = mock.create_autospec(transport.Request)
request.return_value = response
@@ -35,12 +35,12 @@
def test__fetch_certs_success():
- certs = {'1': 'cert'}
+ certs = {"1": "cert"}
request = make_request(200, certs)
returned_certs = id_token._fetch_certs(request, mock.sentinel.cert_url)
- request.assert_called_once_with(mock.sentinel.cert_url, method='GET')
+ request.assert_called_once_with(mock.sentinel.cert_url, method="GET")
assert returned_certs == certs
@@ -50,66 +50,67 @@
with pytest.raises(exceptions.TransportError):
id_token._fetch_certs(request, mock.sentinel.cert_url)
- request.assert_called_once_with(mock.sentinel.cert_url, method='GET')
+ request.assert_called_once_with(mock.sentinel.cert_url, method="GET")
-@mock.patch('google.auth.jwt.decode', autospec=True)
-@mock.patch('google.oauth2.id_token._fetch_certs', autospec=True)
+@mock.patch("google.auth.jwt.decode", autospec=True)
+@mock.patch("google.oauth2.id_token._fetch_certs", autospec=True)
def test_verify_token(_fetch_certs, decode):
result = id_token.verify_token(mock.sentinel.token, mock.sentinel.request)
assert result == decode.return_value
_fetch_certs.assert_called_once_with(
- mock.sentinel.request, id_token._GOOGLE_OAUTH2_CERTS_URL)
+ mock.sentinel.request, id_token._GOOGLE_OAUTH2_CERTS_URL
+ )
decode.assert_called_once_with(
- mock.sentinel.token,
- certs=_fetch_certs.return_value,
- audience=None)
+ mock.sentinel.token, certs=_fetch_certs.return_value, audience=None
+ )
-@mock.patch('google.auth.jwt.decode', autospec=True)
-@mock.patch('google.oauth2.id_token._fetch_certs', autospec=True)
+@mock.patch("google.auth.jwt.decode", autospec=True)
+@mock.patch("google.oauth2.id_token._fetch_certs", autospec=True)
def test_verify_token_args(_fetch_certs, decode):
result = id_token.verify_token(
mock.sentinel.token,
mock.sentinel.request,
audience=mock.sentinel.audience,
- certs_url=mock.sentinel.certs_url)
+ certs_url=mock.sentinel.certs_url,
+ )
assert result == decode.return_value
- _fetch_certs.assert_called_once_with(
- mock.sentinel.request, mock.sentinel.certs_url)
+ _fetch_certs.assert_called_once_with(mock.sentinel.request, mock.sentinel.certs_url)
decode.assert_called_once_with(
mock.sentinel.token,
certs=_fetch_certs.return_value,
- audience=mock.sentinel.audience)
+ audience=mock.sentinel.audience,
+ )
-@mock.patch('google.oauth2.id_token.verify_token', autospec=True)
+@mock.patch("google.oauth2.id_token.verify_token", autospec=True)
def test_verify_oauth2_token(verify_token):
result = id_token.verify_oauth2_token(
- mock.sentinel.token,
- mock.sentinel.request,
- audience=mock.sentinel.audience)
+ mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
+ )
assert result == verify_token.return_value
verify_token.assert_called_once_with(
mock.sentinel.token,
mock.sentinel.request,
audience=mock.sentinel.audience,
- certs_url=id_token._GOOGLE_OAUTH2_CERTS_URL)
+ certs_url=id_token._GOOGLE_OAUTH2_CERTS_URL,
+ )
-@mock.patch('google.oauth2.id_token.verify_token', autospec=True)
+@mock.patch("google.oauth2.id_token.verify_token", autospec=True)
def test_verify_firebase_token(verify_token):
result = id_token.verify_firebase_token(
- mock.sentinel.token,
- mock.sentinel.request,
- audience=mock.sentinel.audience)
+ mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
+ )
assert result == verify_token.return_value
verify_token.assert_called_once_with(
mock.sentinel.token,
mock.sentinel.request,
audience=mock.sentinel.audience,
- certs_url=id_token._GOOGLE_APIS_CERTS_URL)
+ certs_url=id_token._GOOGLE_APIS_CERTS_URL,
+ )
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index 54ac0f5..0f9d460 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -25,58 +25,58 @@
from google.oauth2 import service_account
-DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'public_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
PUBLIC_CERT_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'other_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh:
OTHER_CERT_BYTES = fh.read()
-SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
-with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
-SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
class TestCredentials(object):
- SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
- TOKEN_URI = 'https://example.com/oauth2/token'
+ SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
+ TOKEN_URI = "https://example.com/oauth2/token"
@classmethod
def make_credentials(cls):
return service_account.Credentials(
- SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI)
+ SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI
+ )
def test_from_service_account_info(self):
credentials = service_account.Credentials.from_service_account_info(
- SERVICE_ACCOUNT_INFO)
+ SERVICE_ACCOUNT_INFO
+ )
- assert (credentials._signer.key_id ==
- SERVICE_ACCOUNT_INFO['private_key_id'])
- assert (credentials.service_account_email ==
- SERVICE_ACCOUNT_INFO['client_email'])
- assert credentials._token_uri == SERVICE_ACCOUNT_INFO['token_uri']
+ assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
+ assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
+ assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
def test_from_service_account_info_args(self):
info = SERVICE_ACCOUNT_INFO.copy()
- scopes = ['email', 'profile']
- subject = 'subject'
- additional_claims = {'meta': 'data'}
+ scopes = ["email", "profile"]
+ subject = "subject"
+ additional_claims = {"meta": "data"}
credentials = service_account.Credentials.from_service_account_info(
- info, scopes=scopes, subject=subject,
- additional_claims=additional_claims)
+ info, scopes=scopes, subject=subject, additional_claims=additional_claims
+ )
- assert credentials.service_account_email == info['client_email']
- assert credentials.project_id == info['project_id']
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._token_uri == info['token_uri']
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials.project_id == info["project_id"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
assert credentials._scopes == scopes
assert credentials._subject == subject
assert credentials._additional_claims == additional_claims
@@ -85,27 +85,31 @@
info = SERVICE_ACCOUNT_INFO.copy()
credentials = service_account.Credentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE)
+ SERVICE_ACCOUNT_JSON_FILE
+ )
- assert credentials.service_account_email == info['client_email']
- assert credentials.project_id == info['project_id']
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._token_uri == info['token_uri']
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials.project_id == info["project_id"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
def test_from_service_account_file_args(self):
info = SERVICE_ACCOUNT_INFO.copy()
- scopes = ['email', 'profile']
- subject = 'subject'
- additional_claims = {'meta': 'data'}
+ scopes = ["email", "profile"]
+ subject = "subject"
+ additional_claims = {"meta": "data"}
credentials = service_account.Credentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE, subject=subject,
- scopes=scopes, additional_claims=additional_claims)
+ SERVICE_ACCOUNT_JSON_FILE,
+ subject=subject,
+ scopes=scopes,
+ additional_claims=additional_claims,
+ )
- assert credentials.service_account_email == info['client_email']
- assert credentials.project_id == info['project_id']
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._token_uri == info['token_uri']
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials.project_id == info["project_id"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
assert credentials._scopes == scopes
assert credentials._subject == subject
assert credentials._additional_claims == additional_claims
@@ -120,7 +124,7 @@
def test_sign_bytes(self):
credentials = self.make_credentials()
- to_sign = b'123'
+ to_sign = b"123"
signature = credentials.sign_bytes(to_sign)
assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
@@ -134,46 +138,47 @@
def test_create_scoped(self):
credentials = self.make_credentials()
- scopes = ['email', 'profile']
+ scopes = ["email", "profile"]
credentials = credentials.with_scopes(scopes)
assert credentials._scopes == scopes
def test_with_claims(self):
credentials = self.make_credentials()
- new_credentials = credentials.with_claims({'meep': 'moop'})
- assert new_credentials._additional_claims == {'meep': 'moop'}
+ new_credentials = credentials.with_claims({"meep": "moop"})
+ assert new_credentials._additional_claims == {"meep": "moop"}
def test__make_authorization_grant_assertion(self):
credentials = self.make_credentials()
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['iss'] == self.SERVICE_ACCOUNT_EMAIL
- assert payload['aud'] == self.TOKEN_URI
+ assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
+ assert payload["aud"] == self.TOKEN_URI
def test__make_authorization_grant_assertion_scoped(self):
credentials = self.make_credentials()
- scopes = ['email', 'profile']
+ scopes = ["email", "profile"]
credentials = credentials.with_scopes(scopes)
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['scope'] == 'email profile'
+ assert payload["scope"] == "email profile"
def test__make_authorization_grant_assertion_subject(self):
credentials = self.make_credentials()
- subject = 'user@example.com'
+ subject = "user@example.com"
credentials = credentials.with_subject(subject)
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['sub'] == subject
+ assert payload["sub"] == subject
- @mock.patch('google.oauth2._client.jwt_grant', autospec=True)
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_success(self, jwt_grant):
credentials = self.make_credentials()
- token = 'token'
+ token = "token"
jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
- {})
+ {},
+ )
request = mock.create_autospec(transport.Request, instance=True)
# Refresh credentials
@@ -196,20 +201,22 @@
# expired)
assert credentials.valid
- @mock.patch('google.oauth2._client.jwt_grant', autospec=True)
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_before_request_refreshes(self, jwt_grant):
credentials = self.make_credentials()
- token = 'token'
+ token = "token"
jwt_grant.return_value = (
- token, _helpers.utcnow() + datetime.timedelta(seconds=500), None)
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ None,
+ )
request = mock.create_autospec(transport.Request, instance=True)
# Credentials should start as invalid
assert not credentials.valid
# before_request should cause a refresh
- credentials.before_request(
- request, 'GET', 'http://example.com?a=1#3', {})
+ credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert jwt_grant.called
@@ -219,40 +226,36 @@
class TestIDTokenCredentials(object):
- SERVICE_ACCOUNT_EMAIL = 'service-account@example.com'
- TOKEN_URI = 'https://example.com/oauth2/token'
- TARGET_AUDIENCE = 'https://example.com'
+ SERVICE_ACCOUNT_EMAIL = "service-account@example.com"
+ TOKEN_URI = "https://example.com/oauth2/token"
+ TARGET_AUDIENCE = "https://example.com"
@classmethod
def make_credentials(cls):
return service_account.IDTokenCredentials(
- SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI,
- cls.TARGET_AUDIENCE)
+ SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, cls.TARGET_AUDIENCE
+ )
def test_from_service_account_info(self):
- credentials = (
- service_account.IDTokenCredentials.from_service_account_info(
- SERVICE_ACCOUNT_INFO,
- target_audience=self.TARGET_AUDIENCE))
+ credentials = service_account.IDTokenCredentials.from_service_account_info(
+ SERVICE_ACCOUNT_INFO, target_audience=self.TARGET_AUDIENCE
+ )
- assert (credentials._signer.key_id ==
- SERVICE_ACCOUNT_INFO['private_key_id'])
- assert (credentials.service_account_email ==
- SERVICE_ACCOUNT_INFO['client_email'])
- assert credentials._token_uri == SERVICE_ACCOUNT_INFO['token_uri']
+ assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
+ assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
+ assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
assert credentials._target_audience == self.TARGET_AUDIENCE
def test_from_service_account_file(self):
info = SERVICE_ACCOUNT_INFO.copy()
- credentials = (
- service_account.IDTokenCredentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE,
- target_audience=self.TARGET_AUDIENCE))
+ credentials = service_account.IDTokenCredentials.from_service_account_file(
+ SERVICE_ACCOUNT_JSON_FILE, target_audience=self.TARGET_AUDIENCE
+ )
- assert credentials.service_account_email == info['client_email']
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._token_uri == info['token_uri']
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
assert credentials._target_audience == self.TARGET_AUDIENCE
def test_default_state(self):
@@ -263,7 +266,7 @@
def test_sign_bytes(self):
credentials = self.make_credentials()
- to_sign = b'123'
+ to_sign = b"123"
signature = credentials.sign_bytes(to_sign)
assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
@@ -277,26 +280,26 @@
def test_with_target_audience(self):
credentials = self.make_credentials()
- new_credentials = credentials.with_target_audience(
- 'https://new.example.com')
- assert new_credentials._target_audience == 'https://new.example.com'
+ new_credentials = credentials.with_target_audience("https://new.example.com")
+ assert new_credentials._target_audience == "https://new.example.com"
def test__make_authorization_grant_assertion(self):
credentials = self.make_credentials()
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['iss'] == self.SERVICE_ACCOUNT_EMAIL
- assert payload['aud'] == self.TOKEN_URI
- assert payload['target_audience'] == self.TARGET_AUDIENCE
+ assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
+ assert payload["aud"] == self.TOKEN_URI
+ assert payload["target_audience"] == self.TARGET_AUDIENCE
- @mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True)
+ @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
def test_refresh_success(self, id_token_jwt_grant):
credentials = self.make_credentials()
- token = 'token'
+ token = "token"
id_token_jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
- {})
+ {},
+ )
request = mock.create_autospec(transport.Request, instance=True)
# Refresh credentials
@@ -319,20 +322,22 @@
# expired)
assert credentials.valid
- @mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True)
+ @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
def test_before_request_refreshes(self, id_token_jwt_grant):
credentials = self.make_credentials()
- token = 'token'
+ token = "token"
id_token_jwt_grant.return_value = (
- token, _helpers.utcnow() + datetime.timedelta(seconds=500), None)
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ None,
+ )
request = mock.create_autospec(transport.Request, instance=True)
# Credentials should start as invalid
assert not credentials.valid
# before_request should cause a refresh
- credentials.before_request(
- request, 'GET', 'http://example.com?a=1#3', {})
+ credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert id_token_jwt_grant.called