feat: add reauth feature to user credentials (#727)
* feat: add reauth support to oauth2 credentials
* update
diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py
index c3ae2af..54686df 100644
--- a/tests/oauth2/test__client.py
+++ b/tests/oauth2/test__client.py
@@ -48,7 +48,7 @@
def test__handle_error_response():
- response_data = json.dumps({"error": "help", "error_description": "I'm alive"})
+ response_data = {"error": "help", "error_description": "I'm alive"}
with pytest.raises(exceptions.RefreshError) as excinfo:
_client._handle_error_response(response_data)
@@ -57,12 +57,12 @@
def test__handle_error_response_non_json():
- response_data = "Help, I'm alive"
+ response_data = {"foo": "bar"}
with pytest.raises(exceptions.RefreshError) as excinfo:
_client._handle_error_response(response_data)
- assert excinfo.match(r"Help, I\'m alive")
+ assert excinfo.match(r"{\"foo\": \"bar\"}")
@mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
@@ -95,7 +95,7 @@
request.assert_called_with(
method="POST",
url="http://example.com",
- headers={"content-type": "application/x-www-form-urlencoded"},
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
body="test=params".encode("utf-8"),
)
@@ -103,6 +103,32 @@
assert result == {"test": "response"}
+def test__token_endpoint_request_use_json():
+ request = make_request({"test": "response"})
+
+ result = _client._token_endpoint_request(
+ request,
+ "http://example.com",
+ {"test": "params"},
+ access_token="access_token",
+ use_json=True,
+ )
+
+ # Check request call
+ request.assert_called_with(
+ method="POST",
+ url="http://example.com",
+ headers={
+ "Content-Type": "application/json",
+ "Authorization": "Bearer access_token",
+ },
+ body=b'{"test": "params"}',
+ )
+
+ # Check result
+ assert result == {"test": "response"}
+
+
def test__token_endpoint_request_error():
request = make_request({}, status=http_client.BAD_REQUEST)
@@ -220,7 +246,12 @@
)
token, refresh_token, expiry, extra_data = _client.refresh_grant(
- request, "http://example.com", "refresh_token", "client_id", "client_secret"
+ request,
+ "http://example.com",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ rapt_token="rapt_token",
)
# Check request call
@@ -231,6 +262,7 @@
"refresh_token": "refresh_token",
"client_id": "client_id",
"client_secret": "client_secret",
+ "rapt": "rapt_token",
},
)
diff --git a/tests/oauth2/test_challenges.py b/tests/oauth2/test_challenges.py
new file mode 100644
index 0000000..019b908
--- /dev/null
+++ b/tests/oauth2/test_challenges.py
@@ -0,0 +1,132 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Tests for the reauth module."""
+
+import base64
+import sys
+
+import mock
+import pytest
+import pyu2f
+
+from google.auth import exceptions
+from google.oauth2 import challenges
+
+
+def test_get_user_password():
+ with mock.patch("getpass.getpass", return_value="foo"):
+ assert challenges.get_user_password("") == "foo"
+
+
+def test_security_key():
+ metadata = {
+ "status": "READY",
+ "challengeId": 2,
+ "challengeType": "SECURITY_KEY",
+ "securityKey": {
+ "applicationId": "security_key_application_id",
+ "challenges": [
+ {
+ "keyHandle": "some_key",
+ "challenge": base64.urlsafe_b64encode(
+ "some_challenge".encode("ascii")
+ ).decode("ascii"),
+ }
+ ],
+ },
+ }
+ mock_key = mock.Mock()
+
+ challenge = challenges.SecurityKeyChallenge()
+
+ # Test the case that security key challenge is passed.
+ with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key):
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.return_value = "security key response"
+ assert challenge.name == "SECURITY_KEY"
+ assert challenge.is_locally_eligible
+ assert challenge.obtain_challenge_input(metadata) == {
+ "securityKey": "security key response"
+ }
+ mock_authenticate.assert_called_with(
+ "security_key_application_id",
+ [{"key": mock_key, "challenge": b"some_challenge"}],
+ print_callback=sys.stderr.write,
+ )
+
+ # Test various types of exceptions.
+ with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key):
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.U2FError(
+ pyu2f.errors.U2FError.DEVICE_INELIGIBLE
+ )
+ assert challenge.obtain_challenge_input(metadata) is None
+
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.U2FError(
+ pyu2f.errors.U2FError.TIMEOUT
+ )
+ assert challenge.obtain_challenge_input(metadata) is None
+
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.U2FError(
+ pyu2f.errors.U2FError.BAD_REQUEST
+ )
+ with pytest.raises(pyu2f.errors.U2FError):
+ challenge.obtain_challenge_input(metadata)
+
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.NoDeviceFoundError()
+ assert challenge.obtain_challenge_input(metadata) is None
+
+ with mock.patch(
+ "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
+ ) as mock_authenticate:
+ mock_authenticate.side_effect = pyu2f.errors.UnsupportedVersionException()
+ with pytest.raises(pyu2f.errors.UnsupportedVersionException):
+ challenge.obtain_challenge_input(metadata)
+
+ with mock.patch.dict("sys.modules"):
+ sys.modules["pyu2f"] = None
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ challenge.obtain_challenge_input(metadata)
+ assert excinfo.match(r"pyu2f dependency is required")
+
+
+@mock.patch("getpass.getpass", return_value="foo")
+def test_password_challenge(getpass_mock):
+ challenge = challenges.PasswordChallenge()
+
+ with mock.patch("getpass.getpass", return_value="foo"):
+ assert challenge.is_locally_eligible
+ assert challenge.name == "PASSWORD"
+ assert challenges.PasswordChallenge().obtain_challenge_input({}) == {
+ "credential": "foo"
+ }
+
+ with mock.patch("getpass.getpass", return_value=None):
+ assert challenges.PasswordChallenge().obtain_challenge_input({}) == {
+ "credential": " "
+ }
diff --git a/tests/oauth2/test_credentials.py b/tests/oauth2/test_credentials.py
index b885d29..4a387a5 100644
--- a/tests/oauth2/test_credentials.py
+++ b/tests/oauth2/test_credentials.py
@@ -38,6 +38,7 @@
class TestCredentials(object):
TOKEN_URI = "https://example.com/oauth2/token"
REFRESH_TOKEN = "refresh_token"
+ RAPT_TOKEN = "rapt_token"
CLIENT_ID = "client_id"
CLIENT_SECRET = "client_secret"
@@ -49,6 +50,7 @@
token_uri=cls.TOKEN_URI,
client_id=cls.CLIENT_ID,
client_secret=cls.CLIENT_SECRET,
+ rapt_token=cls.RAPT_TOKEN,
)
def test_default_state(self):
@@ -63,14 +65,16 @@
assert credentials.token_uri == self.TOKEN_URI
assert credentials.client_id == self.CLIENT_ID
assert credentials.client_secret == self.CLIENT_SECRET
+ assert credentials.rapt_token == self.RAPT_TOKEN
- @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
)
def test_refresh_success(self, unused_utcnow, refresh_grant):
token = "token"
+ new_rapt_token = "new_rapt_token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
grant_response = {"id_token": mock.sentinel.id_token}
refresh_grant.return_value = (
@@ -82,6 +86,8 @@
expiry,
# Extra data
grant_response,
+ # rapt_token
+ new_rapt_token,
)
request = mock.create_autospec(transport.Request)
@@ -98,12 +104,14 @@
self.CLIENT_ID,
self.CLIENT_SECRET,
None,
+ self.RAPT_TOKEN,
)
# Check that the credentials have the token and expiry
assert credentials.token == token
assert credentials.expiry == expiry
assert credentials.id_token == mock.sentinel.id_token
+ assert credentials.rapt_token == new_rapt_token
# Check that the credentials are valid (have a token and are not
# expired)
@@ -118,7 +126,7 @@
request.assert_not_called()
- @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
@@ -129,8 +137,9 @@
scopes = ["email", "profile"]
default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
token = "token"
+ new_rapt_token = "new_rapt_token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {"id_token": mock.sentinel.id_token}
+ grant_response = {"id_token": mock.sentinel.id_token, "scope": "email profile"}
refresh_grant.return_value = (
# Access token
token,
@@ -140,6 +149,8 @@
expiry,
# Extra data
grant_response,
+ # rapt token
+ new_rapt_token,
)
request = mock.create_autospec(transport.Request)
@@ -151,6 +162,7 @@
client_secret=self.CLIENT_SECRET,
scopes=scopes,
default_scopes=default_scopes,
+ rapt_token=self.RAPT_TOKEN,
)
# Refresh credentials
@@ -164,6 +176,7 @@
self.CLIENT_ID,
self.CLIENT_SECRET,
scopes,
+ self.RAPT_TOKEN,
)
# Check that the credentials have the token and expiry
@@ -171,12 +184,13 @@
assert creds.expiry == expiry
assert creds.id_token == mock.sentinel.id_token
assert creds.has_scopes(scopes)
+ assert creds.rapt_token == new_rapt_token
# Check that the credentials are valid (have a token and are not
# expired.)
assert creds.valid
- @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
@@ -186,6 +200,7 @@
):
default_scopes = ["email", "profile"]
token = "token"
+ new_rapt_token = "new_rapt_token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
grant_response = {"id_token": mock.sentinel.id_token}
refresh_grant.return_value = (
@@ -197,6 +212,8 @@
expiry,
# Extra data
grant_response,
+ # rapt token
+ new_rapt_token,
)
request = mock.create_autospec(transport.Request)
@@ -207,6 +224,7 @@
client_id=self.CLIENT_ID,
client_secret=self.CLIENT_SECRET,
default_scopes=default_scopes,
+ rapt_token=self.RAPT_TOKEN,
)
# Refresh credentials
@@ -220,6 +238,7 @@
self.CLIENT_ID,
self.CLIENT_SECRET,
default_scopes,
+ self.RAPT_TOKEN,
)
# Check that the credentials have the token and expiry
@@ -227,12 +246,13 @@
assert creds.expiry == expiry
assert creds.id_token == mock.sentinel.id_token
assert creds.has_scopes(default_scopes)
+ assert creds.rapt_token == new_rapt_token
# Check that the credentials are valid (have a token and are not
# expired.)
assert creds.valid
- @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
@@ -242,6 +262,7 @@
):
scopes = ["email", "profile"]
token = "token"
+ new_rapt_token = "new_rapt_token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
grant_response = {
"id_token": mock.sentinel.id_token,
@@ -256,6 +277,8 @@
expiry,
# Extra data
grant_response,
+ # rapt token
+ new_rapt_token,
)
request = mock.create_autospec(transport.Request)
@@ -266,6 +289,7 @@
client_id=self.CLIENT_ID,
client_secret=self.CLIENT_SECRET,
scopes=scopes,
+ rapt_token=self.RAPT_TOKEN,
)
# Refresh credentials
@@ -279,6 +303,7 @@
self.CLIENT_ID,
self.CLIENT_SECRET,
scopes,
+ self.RAPT_TOKEN,
)
# Check that the credentials have the token and expiry
@@ -286,12 +311,13 @@
assert creds.expiry == expiry
assert creds.id_token == mock.sentinel.id_token
assert creds.has_scopes(scopes)
+ assert creds.rapt_token == new_rapt_token
# Check that the credentials are valid (have a token and are not
# expired.)
assert creds.valid
- @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
+ @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True)
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
@@ -302,10 +328,11 @@
scopes = ["email", "profile"]
scopes_returned = ["email"]
token = "token"
+ new_rapt_token = "new_rapt_token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
grant_response = {
"id_token": mock.sentinel.id_token,
- "scopes": " ".join(scopes_returned),
+ "scope": " ".join(scopes_returned),
}
refresh_grant.return_value = (
# Access token
@@ -316,6 +343,8 @@
expiry,
# Extra data
grant_response,
+ # rapt token
+ new_rapt_token,
)
request = mock.create_autospec(transport.Request)
@@ -326,6 +355,7 @@
client_id=self.CLIENT_ID,
client_secret=self.CLIENT_SECRET,
scopes=scopes,
+ rapt_token=self.RAPT_TOKEN,
)
# Refresh credentials
@@ -342,6 +372,7 @@
self.CLIENT_ID,
self.CLIENT_SECRET,
scopes,
+ self.RAPT_TOKEN,
)
# Check that the credentials have the token and expiry
@@ -349,6 +380,7 @@
assert creds.expiry == expiry
assert creds.id_token == mock.sentinel.id_token
assert creds.has_scopes(scopes)
+ assert creds.rapt_token == new_rapt_token
# Check that the credentials are valid (have a token and are not
# expired.)
diff --git a/tests/oauth2/test_reauth.py b/tests/oauth2/test_reauth.py
new file mode 100644
index 0000000..e9ffa8a
--- /dev/null
+++ b/tests/oauth2/test_reauth.py
@@ -0,0 +1,308 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+
+import mock
+import pytest
+
+from google.auth import exceptions
+from google.oauth2 import reauth
+
+
+MOCK_REQUEST = mock.Mock()
+CHALLENGES_RESPONSE_TEMPLATE = {
+ "status": "CHALLENGE_REQUIRED",
+ "sessionId": "123",
+ "challenges": [
+ {
+ "status": "READY",
+ "challengeId": 1,
+ "challengeType": "PASSWORD",
+ "securityKey": {},
+ }
+ ],
+}
+CHALLENGES_RESPONSE_AUTHENTICATED = {
+ "status": "AUTHENTICATED",
+ "sessionId": "123",
+ "encodedProofOfReauthToken": "new_rapt_token",
+}
+
+
+class MockChallenge(object):
+ def __init__(self, name, locally_eligible, challenge_input):
+ self.name = name
+ self.is_locally_eligible = locally_eligible
+ self.challenge_input = challenge_input
+
+ def obtain_challenge_input(self, metadata):
+ return self.challenge_input
+
+
+def test_is_interactive():
+ with mock.patch("sys.stdin.isatty", return_value=True):
+ assert reauth.is_interactive()
+
+
+def test__get_challenges():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request"
+ ) as mock_token_endpoint_request:
+ reauth._get_challenges(MOCK_REQUEST, ["SAML"], "token")
+ mock_token_endpoint_request.assert_called_with(
+ MOCK_REQUEST,
+ reauth._REAUTH_API + ":start",
+ {"supportedChallengeTypes": ["SAML"]},
+ access_token="token",
+ use_json=True,
+ )
+
+
+def test__get_challenges_with_scopes():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request"
+ ) as mock_token_endpoint_request:
+ reauth._get_challenges(
+ MOCK_REQUEST, ["SAML"], "token", requested_scopes=["scope"]
+ )
+ mock_token_endpoint_request.assert_called_with(
+ MOCK_REQUEST,
+ reauth._REAUTH_API + ":start",
+ {
+ "supportedChallengeTypes": ["SAML"],
+ "oauthScopesForDomainPolicyLookup": ["scope"],
+ },
+ access_token="token",
+ use_json=True,
+ )
+
+
+def test__send_challenge_result():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request"
+ ) as mock_token_endpoint_request:
+ reauth._send_challenge_result(
+ MOCK_REQUEST, "123", "1", {"credential": "password"}, "token"
+ )
+ mock_token_endpoint_request.assert_called_with(
+ MOCK_REQUEST,
+ reauth._REAUTH_API + "/123:continue",
+ {
+ "sessionId": "123",
+ "challengeId": "1",
+ "action": "RESPOND",
+ "proposalResponse": {"credential": "password"},
+ },
+ access_token="token",
+ use_json=True,
+ )
+
+
+def test__run_next_challenge_not_ready():
+ challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
+ challenges_response["challenges"][0]["status"] = "STATUS_UNSPECIFIED"
+ assert (
+ reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") is None
+ )
+
+
+def test__run_next_challenge_not_supported():
+ challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
+ challenges_response["challenges"][0]["challengeType"] = "CHALLENGE_TYPE_UNSPECIFIED"
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token")
+ assert excinfo.match(r"Unsupported challenge type CHALLENGE_TYPE_UNSPECIFIED")
+
+
+def test__run_next_challenge_not_locally_eligible():
+ mock_challenge = MockChallenge("PASSWORD", False, "challenge_input")
+ with mock.patch(
+ "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
+ ):
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._run_next_challenge(
+ CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
+ )
+ assert excinfo.match(r"Challenge PASSWORD is not locally eligible")
+
+
+def test__run_next_challenge_no_challenge_input():
+ mock_challenge = MockChallenge("PASSWORD", True, None)
+ with mock.patch(
+ "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
+ ):
+ assert (
+ reauth._run_next_challenge(
+ CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
+ )
+ is None
+ )
+
+
+def test__run_next_challenge_success():
+ mock_challenge = MockChallenge("PASSWORD", True, {"credential": "password"})
+ with mock.patch(
+ "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
+ ):
+ with mock.patch(
+ "google.oauth2.reauth._send_challenge_result"
+ ) as mock_send_challenge_result:
+ reauth._run_next_challenge(
+ CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
+ )
+ mock_send_challenge_result.assert_called_with(
+ MOCK_REQUEST, "123", 1, {"credential": "password"}, "token"
+ )
+
+
+def test__obtain_rapt_authenticated():
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges",
+ return_value=CHALLENGES_RESPONSE_AUTHENTICATED,
+ ):
+ assert reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
+
+
+def test__obtain_rapt_authenticated_after_run_next_challenge():
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges",
+ return_value=CHALLENGES_RESPONSE_TEMPLATE,
+ ):
+ with mock.patch(
+ "google.oauth2.reauth._run_next_challenge",
+ side_effect=[
+ CHALLENGES_RESPONSE_TEMPLATE,
+ CHALLENGES_RESPONSE_AUTHENTICATED,
+ ],
+ ):
+ with mock.patch("google.oauth2.reauth.is_interactive", return_value=True):
+ assert (
+ reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
+ )
+
+
+def test__obtain_rapt_unsupported_status():
+ challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
+ challenges_response["status"] = "STATUS_UNSPECIFIED"
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges", return_value=challenges_response
+ ):
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._obtain_rapt(MOCK_REQUEST, "token", None)
+ assert excinfo.match(r"API error: STATUS_UNSPECIFIED")
+
+
+def test__obtain_rapt_not_interactive():
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges",
+ return_value=CHALLENGES_RESPONSE_TEMPLATE,
+ ):
+ with mock.patch("google.oauth2.reauth.is_interactive", return_value=False):
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._obtain_rapt(MOCK_REQUEST, "token", None)
+ assert excinfo.match(r"not in an interactive session")
+
+
+def test__obtain_rapt_not_authenticated():
+ with mock.patch(
+ "google.oauth2.reauth._get_challenges",
+ return_value=CHALLENGES_RESPONSE_TEMPLATE,
+ ):
+ with mock.patch("google.oauth2.reauth.RUN_CHALLENGE_RETRY_LIMIT", 0):
+ with pytest.raises(exceptions.ReauthFailError) as excinfo:
+ reauth._obtain_rapt(MOCK_REQUEST, "token", None)
+ assert excinfo.match(r"Reauthentication failed")
+
+
+def test_get_rapt_token():
+ with mock.patch(
+ "google.oauth2._client.refresh_grant", return_value=("token", None, None, None)
+ ) as mock_refresh_grant:
+ with mock.patch(
+ "google.oauth2.reauth._obtain_rapt", return_value="new_rapt_token"
+ ) as mock_obtain_rapt:
+ assert (
+ reauth.get_rapt_token(
+ MOCK_REQUEST,
+ "client_id",
+ "client_secret",
+ "refresh_token",
+ "token_uri",
+ )
+ == "new_rapt_token"
+ )
+ mock_refresh_grant.assert_called_with(
+ request=MOCK_REQUEST,
+ client_id="client_id",
+ client_secret="client_secret",
+ refresh_token="refresh_token",
+ token_uri="token_uri",
+ scopes=[reauth._REAUTH_SCOPE],
+ )
+ mock_obtain_rapt.assert_called_with(
+ MOCK_REQUEST, "token", requested_scopes=None
+ )
+
+
+def test_refresh_grant_failed():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request_no_throw"
+ ) as mock_token_request:
+ mock_token_request.return_value = (False, {"error": "Bad request"})
+ with pytest.raises(exceptions.RefreshError) as excinfo:
+ reauth.refresh_grant(
+ MOCK_REQUEST,
+ "token_uri",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ scopes=["foo", "bar"],
+ rapt_token="rapt_token",
+ )
+ assert excinfo.match(r"Bad request")
+ mock_token_request.assert_called_with(
+ MOCK_REQUEST,
+ "token_uri",
+ {
+ "grant_type": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "refresh_token": "refresh_token",
+ "scope": "foo bar",
+ "rapt": "rapt_token",
+ },
+ )
+
+
+def test_refresh_grant_success():
+ with mock.patch(
+ "google.oauth2._client._token_endpoint_request_no_throw"
+ ) as mock_token_request:
+ mock_token_request.side_effect = [
+ (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}),
+ (True, {"access_token": "access_token"}),
+ ]
+ with mock.patch(
+ "google.oauth2.reauth.get_rapt_token", return_value="new_rapt_token"
+ ):
+ assert reauth.refresh_grant(
+ MOCK_REQUEST, "token_uri", "refresh_token", "client_id", "client_secret"
+ ) == (
+ "access_token",
+ "refresh_token",
+ None,
+ {"access_token": "access_token"},
+ "new_rapt_token",
+ )