blob: e9ffa8a79e8fad5931153d821ec0ef3c73e056cb [file] [log] [blame]
arithmetic172882293fe2021-04-14 11:22:13 -07001# Copyright 2021 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import copy
16
17import mock
18import pytest
19
20from google.auth import exceptions
21from google.oauth2 import reauth
22
23
24MOCK_REQUEST = mock.Mock()
25CHALLENGES_RESPONSE_TEMPLATE = {
26 "status": "CHALLENGE_REQUIRED",
27 "sessionId": "123",
28 "challenges": [
29 {
30 "status": "READY",
31 "challengeId": 1,
32 "challengeType": "PASSWORD",
33 "securityKey": {},
34 }
35 ],
36}
37CHALLENGES_RESPONSE_AUTHENTICATED = {
38 "status": "AUTHENTICATED",
39 "sessionId": "123",
40 "encodedProofOfReauthToken": "new_rapt_token",
41}
42
43
44class MockChallenge(object):
45 def __init__(self, name, locally_eligible, challenge_input):
46 self.name = name
47 self.is_locally_eligible = locally_eligible
48 self.challenge_input = challenge_input
49
50 def obtain_challenge_input(self, metadata):
51 return self.challenge_input
52
53
54def test_is_interactive():
55 with mock.patch("sys.stdin.isatty", return_value=True):
56 assert reauth.is_interactive()
57
58
59def test__get_challenges():
60 with mock.patch(
61 "google.oauth2._client._token_endpoint_request"
62 ) as mock_token_endpoint_request:
63 reauth._get_challenges(MOCK_REQUEST, ["SAML"], "token")
64 mock_token_endpoint_request.assert_called_with(
65 MOCK_REQUEST,
66 reauth._REAUTH_API + ":start",
67 {"supportedChallengeTypes": ["SAML"]},
68 access_token="token",
69 use_json=True,
70 )
71
72
73def test__get_challenges_with_scopes():
74 with mock.patch(
75 "google.oauth2._client._token_endpoint_request"
76 ) as mock_token_endpoint_request:
77 reauth._get_challenges(
78 MOCK_REQUEST, ["SAML"], "token", requested_scopes=["scope"]
79 )
80 mock_token_endpoint_request.assert_called_with(
81 MOCK_REQUEST,
82 reauth._REAUTH_API + ":start",
83 {
84 "supportedChallengeTypes": ["SAML"],
85 "oauthScopesForDomainPolicyLookup": ["scope"],
86 },
87 access_token="token",
88 use_json=True,
89 )
90
91
92def test__send_challenge_result():
93 with mock.patch(
94 "google.oauth2._client._token_endpoint_request"
95 ) as mock_token_endpoint_request:
96 reauth._send_challenge_result(
97 MOCK_REQUEST, "123", "1", {"credential": "password"}, "token"
98 )
99 mock_token_endpoint_request.assert_called_with(
100 MOCK_REQUEST,
101 reauth._REAUTH_API + "/123:continue",
102 {
103 "sessionId": "123",
104 "challengeId": "1",
105 "action": "RESPOND",
106 "proposalResponse": {"credential": "password"},
107 },
108 access_token="token",
109 use_json=True,
110 )
111
112
113def test__run_next_challenge_not_ready():
114 challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
115 challenges_response["challenges"][0]["status"] = "STATUS_UNSPECIFIED"
116 assert (
117 reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token") is None
118 )
119
120
121def test__run_next_challenge_not_supported():
122 challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
123 challenges_response["challenges"][0]["challengeType"] = "CHALLENGE_TYPE_UNSPECIFIED"
124 with pytest.raises(exceptions.ReauthFailError) as excinfo:
125 reauth._run_next_challenge(challenges_response, MOCK_REQUEST, "token")
126 assert excinfo.match(r"Unsupported challenge type CHALLENGE_TYPE_UNSPECIFIED")
127
128
129def test__run_next_challenge_not_locally_eligible():
130 mock_challenge = MockChallenge("PASSWORD", False, "challenge_input")
131 with mock.patch(
132 "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
133 ):
134 with pytest.raises(exceptions.ReauthFailError) as excinfo:
135 reauth._run_next_challenge(
136 CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
137 )
138 assert excinfo.match(r"Challenge PASSWORD is not locally eligible")
139
140
141def test__run_next_challenge_no_challenge_input():
142 mock_challenge = MockChallenge("PASSWORD", True, None)
143 with mock.patch(
144 "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
145 ):
146 assert (
147 reauth._run_next_challenge(
148 CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
149 )
150 is None
151 )
152
153
154def test__run_next_challenge_success():
155 mock_challenge = MockChallenge("PASSWORD", True, {"credential": "password"})
156 with mock.patch(
157 "google.oauth2.challenges.AVAILABLE_CHALLENGES", {"PASSWORD": mock_challenge}
158 ):
159 with mock.patch(
160 "google.oauth2.reauth._send_challenge_result"
161 ) as mock_send_challenge_result:
162 reauth._run_next_challenge(
163 CHALLENGES_RESPONSE_TEMPLATE, MOCK_REQUEST, "token"
164 )
165 mock_send_challenge_result.assert_called_with(
166 MOCK_REQUEST, "123", 1, {"credential": "password"}, "token"
167 )
168
169
170def test__obtain_rapt_authenticated():
171 with mock.patch(
172 "google.oauth2.reauth._get_challenges",
173 return_value=CHALLENGES_RESPONSE_AUTHENTICATED,
174 ):
175 assert reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
176
177
178def test__obtain_rapt_authenticated_after_run_next_challenge():
179 with mock.patch(
180 "google.oauth2.reauth._get_challenges",
181 return_value=CHALLENGES_RESPONSE_TEMPLATE,
182 ):
183 with mock.patch(
184 "google.oauth2.reauth._run_next_challenge",
185 side_effect=[
186 CHALLENGES_RESPONSE_TEMPLATE,
187 CHALLENGES_RESPONSE_AUTHENTICATED,
188 ],
189 ):
190 with mock.patch("google.oauth2.reauth.is_interactive", return_value=True):
191 assert (
192 reauth._obtain_rapt(MOCK_REQUEST, "token", None) == "new_rapt_token"
193 )
194
195
196def test__obtain_rapt_unsupported_status():
197 challenges_response = copy.deepcopy(CHALLENGES_RESPONSE_TEMPLATE)
198 challenges_response["status"] = "STATUS_UNSPECIFIED"
199 with mock.patch(
200 "google.oauth2.reauth._get_challenges", return_value=challenges_response
201 ):
202 with pytest.raises(exceptions.ReauthFailError) as excinfo:
203 reauth._obtain_rapt(MOCK_REQUEST, "token", None)
204 assert excinfo.match(r"API error: STATUS_UNSPECIFIED")
205
206
207def test__obtain_rapt_not_interactive():
208 with mock.patch(
209 "google.oauth2.reauth._get_challenges",
210 return_value=CHALLENGES_RESPONSE_TEMPLATE,
211 ):
212 with mock.patch("google.oauth2.reauth.is_interactive", return_value=False):
213 with pytest.raises(exceptions.ReauthFailError) as excinfo:
214 reauth._obtain_rapt(MOCK_REQUEST, "token", None)
215 assert excinfo.match(r"not in an interactive session")
216
217
218def test__obtain_rapt_not_authenticated():
219 with mock.patch(
220 "google.oauth2.reauth._get_challenges",
221 return_value=CHALLENGES_RESPONSE_TEMPLATE,
222 ):
223 with mock.patch("google.oauth2.reauth.RUN_CHALLENGE_RETRY_LIMIT", 0):
224 with pytest.raises(exceptions.ReauthFailError) as excinfo:
225 reauth._obtain_rapt(MOCK_REQUEST, "token", None)
226 assert excinfo.match(r"Reauthentication failed")
227
228
229def test_get_rapt_token():
230 with mock.patch(
231 "google.oauth2._client.refresh_grant", return_value=("token", None, None, None)
232 ) as mock_refresh_grant:
233 with mock.patch(
234 "google.oauth2.reauth._obtain_rapt", return_value="new_rapt_token"
235 ) as mock_obtain_rapt:
236 assert (
237 reauth.get_rapt_token(
238 MOCK_REQUEST,
239 "client_id",
240 "client_secret",
241 "refresh_token",
242 "token_uri",
243 )
244 == "new_rapt_token"
245 )
246 mock_refresh_grant.assert_called_with(
247 request=MOCK_REQUEST,
248 client_id="client_id",
249 client_secret="client_secret",
250 refresh_token="refresh_token",
251 token_uri="token_uri",
252 scopes=[reauth._REAUTH_SCOPE],
253 )
254 mock_obtain_rapt.assert_called_with(
255 MOCK_REQUEST, "token", requested_scopes=None
256 )
257
258
259def test_refresh_grant_failed():
260 with mock.patch(
261 "google.oauth2._client._token_endpoint_request_no_throw"
262 ) as mock_token_request:
263 mock_token_request.return_value = (False, {"error": "Bad request"})
264 with pytest.raises(exceptions.RefreshError) as excinfo:
265 reauth.refresh_grant(
266 MOCK_REQUEST,
267 "token_uri",
268 "refresh_token",
269 "client_id",
270 "client_secret",
271 scopes=["foo", "bar"],
272 rapt_token="rapt_token",
273 )
274 assert excinfo.match(r"Bad request")
275 mock_token_request.assert_called_with(
276 MOCK_REQUEST,
277 "token_uri",
278 {
279 "grant_type": "refresh_token",
280 "client_id": "client_id",
281 "client_secret": "client_secret",
282 "refresh_token": "refresh_token",
283 "scope": "foo bar",
284 "rapt": "rapt_token",
285 },
286 )
287
288
289def test_refresh_grant_success():
290 with mock.patch(
291 "google.oauth2._client._token_endpoint_request_no_throw"
292 ) as mock_token_request:
293 mock_token_request.side_effect = [
294 (False, {"error": "invalid_grant", "error_subtype": "rapt_required"}),
295 (True, {"access_token": "access_token"}),
296 ]
297 with mock.patch(
298 "google.oauth2.reauth.get_rapt_token", return_value="new_rapt_token"
299 ):
300 assert reauth.refresh_grant(
301 MOCK_REQUEST, "token_uri", "refresh_token", "client_id", "client_secret"
302 ) == (
303 "access_token",
304 "refresh_token",
305 None,
306 {"access_token": "access_token"},
307 "new_rapt_token",
308 )