Add downscoping to ouath2 credentials (#309)
diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py
index 3ec7fc6..5a4a567 100644
--- a/tests/oauth2/test__client.py
+++ b/tests/oauth2/test__client.py
@@ -37,6 +37,11 @@
SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+SCOPES_AS_LIST = ['https://www.googleapis.com/auth/pubsub',
+ 'https://www.googleapis.com/auth/logging.write']
+SCOPES_AS_STRING = ('https://www.googleapis.com/auth/pubsub'
+ ' https://www.googleapis.com/auth/logging.write')
+
def test__handle_error_response():
response_data = json.dumps({
@@ -204,6 +209,35 @@
assert extra_data['extra'] == 'data'
+@mock.patch('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
+def test_refresh_grant_with_scopes(unused_utcnow):
+ request = make_request({
+ 'access_token': 'token',
+ 'refresh_token': 'new_refresh_token',
+ 'expires_in': 500,
+ 'extra': 'data',
+ 'scope': SCOPES_AS_STRING})
+
+ token, refresh_token, expiry, extra_data = _client.refresh_grant(
+ request, 'http://example.com', 'refresh_token', 'client_id',
+ 'client_secret', SCOPES_AS_LIST)
+
+ # Check request call.
+ verify_request_params(request, {
+ 'grant_type': _client._REFRESH_GRANT_TYPE,
+ 'refresh_token': 'refresh_token',
+ 'client_id': 'client_id',
+ 'client_secret': 'client_secret',
+ 'scope': SCOPES_AS_STRING
+ })
+
+ # Check result.
+ assert token == 'token'
+ assert refresh_token == 'new_refresh_token'
+ assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
+ assert extra_data['extra'] == 'data'
+
+
def test_refresh_grant_no_access_token():
request = make_request({
# No access token.
diff --git a/tests/oauth2/test_credentials.py b/tests/oauth2/test_credentials.py
index 922c3bb..3231509 100644
--- a/tests/oauth2/test_credentials.py
+++ b/tests/oauth2/test_credentials.py
@@ -86,7 +86,7 @@
# Check jwt grant call.
refresh_grant.assert_called_with(
request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
- self.CLIENT_SECRET)
+ self.CLIENT_SECRET, None)
# Check that the credentials have the token and expiry
assert credentials.token == token
@@ -107,6 +107,143 @@
request.assert_not_called()
+ @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch(
+ 'google.auth._helpers.utcnow',
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ def test_credentials_with_scopes_requested_refresh_success(
+ self, unused_utcnow, refresh_grant):
+ scopes = ['email', 'profile']
+ token = 'token'
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ grant_response = {'id_token': mock.sentinel.id_token}
+ refresh_grant.return_value = (
+ # Access token
+ token,
+ # New refresh token
+ None,
+ # Expiry,
+ expiry,
+ # Extra data
+ grant_response)
+
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None, refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET, scopes=scopes)
+
+ # Refresh credentials
+ creds.refresh(request)
+
+ # Check jwt grant call.
+ refresh_grant.assert_called_with(
+ request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
+ self.CLIENT_SECRET, scopes)
+
+ # Check that the credentials have the token and expiry
+ assert creds.token == token
+ assert creds.expiry == expiry
+ assert creds.id_token == mock.sentinel.id_token
+ assert creds.has_scopes(scopes)
+
+ # Check that the credentials are valid (have a token and are not
+ # expired.)
+ assert creds.valid
+
+ @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch(
+ 'google.auth._helpers.utcnow',
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ def test_credentials_with_scopes_returned_refresh_success(
+ self, unused_utcnow, refresh_grant):
+ scopes = ['email', 'profile']
+ token = 'token'
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ grant_response = {'id_token': mock.sentinel.id_token,
+ 'scopes': ' '.join(scopes)}
+ refresh_grant.return_value = (
+ # Access token
+ token,
+ # New refresh token
+ None,
+ # Expiry,
+ expiry,
+ # Extra data
+ grant_response)
+
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None, refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET, scopes=scopes)
+
+ # Refresh credentials
+ creds.refresh(request)
+
+ # Check jwt grant call.
+ refresh_grant.assert_called_with(
+ request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
+ self.CLIENT_SECRET, scopes)
+
+ # Check that the credentials have the token and expiry
+ assert creds.token == token
+ assert creds.expiry == expiry
+ assert creds.id_token == mock.sentinel.id_token
+ assert creds.has_scopes(scopes)
+
+ # Check that the credentials are valid (have a token and are not
+ # expired.)
+ assert creds.valid
+
+ @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch(
+ 'google.auth._helpers.utcnow',
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ def test_credentials_with_scopes_refresh_failure_raises_refresh_error(
+ self, unused_utcnow, refresh_grant):
+ scopes = ['email', 'profile']
+ scopes_returned = ['email']
+ token = 'token'
+ expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+ grant_response = {'id_token': mock.sentinel.id_token,
+ 'scopes': ' '.join(scopes_returned)}
+ refresh_grant.return_value = (
+ # Access token
+ token,
+ # New refresh token
+ None,
+ # Expiry,
+ expiry,
+ # Extra data
+ grant_response)
+
+ request = mock.create_autospec(transport.Request)
+ creds = credentials.Credentials(
+ token=None, refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET, scopes=scopes)
+
+ # Refresh credentials
+ with pytest.raises(exceptions.RefreshError,
+ match='Not all requested scopes were granted'):
+ creds.refresh(request)
+
+ # Check jwt grant call.
+ refresh_grant.assert_called_with(
+ request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
+ self.CLIENT_SECRET, scopes)
+
+ # Check that the credentials have the token and expiry
+ assert creds.token == token
+ assert creds.expiry == expiry
+ assert creds.id_token == mock.sentinel.id_token
+ assert creds.has_scopes(scopes)
+
+ # Check that the credentials are valid (have a token and are not
+ # expired.)
+ assert creds.valid
+
def test_from_authorized_user_info(self):
info = AUTH_USER_INFO.copy()