bojeil-google | 5dcd2b1 | 2021-02-09 11:05:00 -0800 | [diff] [blame] | 1 | # Copyright 2020 Google LLC |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | import json |
| 16 | |
| 17 | import pytest |
| 18 | |
| 19 | from google.auth import exceptions |
| 20 | from google.oauth2 import utils |
| 21 | |
| 22 | |
| 23 | CLIENT_ID = "username" |
| 24 | CLIENT_SECRET = "password" |
| 25 | # Base64 encoding of "username:password" |
| 26 | BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" |
| 27 | # Base64 encoding of "username:" |
| 28 | BASIC_AUTH_ENCODING_SECRETLESS = "dXNlcm5hbWU6" |
| 29 | |
| 30 | |
| 31 | class AuthHandler(utils.OAuthClientAuthHandler): |
| 32 | def __init__(self, client_auth=None): |
| 33 | super(AuthHandler, self).__init__(client_auth) |
| 34 | |
| 35 | def apply_client_authentication_options( |
| 36 | self, headers, request_body=None, bearer_token=None |
| 37 | ): |
| 38 | return super(AuthHandler, self).apply_client_authentication_options( |
| 39 | headers, request_body, bearer_token |
| 40 | ) |
| 41 | |
| 42 | |
| 43 | class TestClientAuthentication(object): |
| 44 | @classmethod |
| 45 | def make_client_auth(cls, client_secret=None): |
| 46 | return utils.ClientAuthentication( |
| 47 | utils.ClientAuthType.basic, CLIENT_ID, client_secret |
| 48 | ) |
| 49 | |
| 50 | def test_initialization_with_client_secret(self): |
| 51 | client_auth = self.make_client_auth(CLIENT_SECRET) |
| 52 | |
| 53 | assert client_auth.client_auth_type == utils.ClientAuthType.basic |
| 54 | assert client_auth.client_id == CLIENT_ID |
| 55 | assert client_auth.client_secret == CLIENT_SECRET |
| 56 | |
| 57 | def test_initialization_no_client_secret(self): |
| 58 | client_auth = self.make_client_auth() |
| 59 | |
| 60 | assert client_auth.client_auth_type == utils.ClientAuthType.basic |
| 61 | assert client_auth.client_id == CLIENT_ID |
| 62 | assert client_auth.client_secret is None |
| 63 | |
| 64 | |
| 65 | class TestOAuthClientAuthHandler(object): |
| 66 | CLIENT_AUTH_BASIC = utils.ClientAuthentication( |
| 67 | utils.ClientAuthType.basic, CLIENT_ID, CLIENT_SECRET |
| 68 | ) |
| 69 | CLIENT_AUTH_BASIC_SECRETLESS = utils.ClientAuthentication( |
| 70 | utils.ClientAuthType.basic, CLIENT_ID |
| 71 | ) |
| 72 | CLIENT_AUTH_REQUEST_BODY = utils.ClientAuthentication( |
| 73 | utils.ClientAuthType.request_body, CLIENT_ID, CLIENT_SECRET |
| 74 | ) |
| 75 | CLIENT_AUTH_REQUEST_BODY_SECRETLESS = utils.ClientAuthentication( |
| 76 | utils.ClientAuthType.request_body, CLIENT_ID |
| 77 | ) |
| 78 | |
| 79 | @classmethod |
| 80 | def make_oauth_client_auth_handler(cls, client_auth=None): |
| 81 | return AuthHandler(client_auth) |
| 82 | |
| 83 | def test_apply_client_authentication_options_none(self): |
| 84 | headers = {"Content-Type": "application/json"} |
| 85 | request_body = {"foo": "bar"} |
| 86 | auth_handler = self.make_oauth_client_auth_handler() |
| 87 | |
| 88 | auth_handler.apply_client_authentication_options(headers, request_body) |
| 89 | |
| 90 | assert headers == {"Content-Type": "application/json"} |
| 91 | assert request_body == {"foo": "bar"} |
| 92 | |
| 93 | def test_apply_client_authentication_options_basic(self): |
| 94 | headers = {"Content-Type": "application/json"} |
| 95 | request_body = {"foo": "bar"} |
| 96 | auth_handler = self.make_oauth_client_auth_handler(self.CLIENT_AUTH_BASIC) |
| 97 | |
| 98 | auth_handler.apply_client_authentication_options(headers, request_body) |
| 99 | |
| 100 | assert headers == { |
| 101 | "Content-Type": "application/json", |
| 102 | "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), |
| 103 | } |
| 104 | assert request_body == {"foo": "bar"} |
| 105 | |
| 106 | def test_apply_client_authentication_options_basic_nosecret(self): |
| 107 | headers = {"Content-Type": "application/json"} |
| 108 | request_body = {"foo": "bar"} |
| 109 | auth_handler = self.make_oauth_client_auth_handler( |
| 110 | self.CLIENT_AUTH_BASIC_SECRETLESS |
| 111 | ) |
| 112 | |
| 113 | auth_handler.apply_client_authentication_options(headers, request_body) |
| 114 | |
| 115 | assert headers == { |
| 116 | "Content-Type": "application/json", |
| 117 | "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING_SECRETLESS), |
| 118 | } |
| 119 | assert request_body == {"foo": "bar"} |
| 120 | |
| 121 | def test_apply_client_authentication_options_request_body(self): |
| 122 | headers = {"Content-Type": "application/json"} |
| 123 | request_body = {"foo": "bar"} |
| 124 | auth_handler = self.make_oauth_client_auth_handler( |
| 125 | self.CLIENT_AUTH_REQUEST_BODY |
| 126 | ) |
| 127 | |
| 128 | auth_handler.apply_client_authentication_options(headers, request_body) |
| 129 | |
| 130 | assert headers == {"Content-Type": "application/json"} |
| 131 | assert request_body == { |
| 132 | "foo": "bar", |
| 133 | "client_id": CLIENT_ID, |
| 134 | "client_secret": CLIENT_SECRET, |
| 135 | } |
| 136 | |
| 137 | def test_apply_client_authentication_options_request_body_nosecret(self): |
| 138 | headers = {"Content-Type": "application/json"} |
| 139 | request_body = {"foo": "bar"} |
| 140 | auth_handler = self.make_oauth_client_auth_handler( |
| 141 | self.CLIENT_AUTH_REQUEST_BODY_SECRETLESS |
| 142 | ) |
| 143 | |
| 144 | auth_handler.apply_client_authentication_options(headers, request_body) |
| 145 | |
| 146 | assert headers == {"Content-Type": "application/json"} |
| 147 | assert request_body == { |
| 148 | "foo": "bar", |
| 149 | "client_id": CLIENT_ID, |
| 150 | "client_secret": "", |
| 151 | } |
| 152 | |
| 153 | def test_apply_client_authentication_options_request_body_no_body(self): |
| 154 | headers = {"Content-Type": "application/json"} |
| 155 | auth_handler = self.make_oauth_client_auth_handler( |
| 156 | self.CLIENT_AUTH_REQUEST_BODY |
| 157 | ) |
| 158 | |
| 159 | with pytest.raises(exceptions.OAuthError) as excinfo: |
| 160 | auth_handler.apply_client_authentication_options(headers) |
| 161 | |
| 162 | assert excinfo.match(r"HTTP request does not support request-body") |
| 163 | |
| 164 | def test_apply_client_authentication_options_bearer_token(self): |
| 165 | bearer_token = "ACCESS_TOKEN" |
| 166 | headers = {"Content-Type": "application/json"} |
| 167 | request_body = {"foo": "bar"} |
| 168 | auth_handler = self.make_oauth_client_auth_handler() |
| 169 | |
| 170 | auth_handler.apply_client_authentication_options( |
| 171 | headers, request_body, bearer_token |
| 172 | ) |
| 173 | |
| 174 | assert headers == { |
| 175 | "Content-Type": "application/json", |
| 176 | "Authorization": "Bearer {}".format(bearer_token), |
| 177 | } |
| 178 | assert request_body == {"foo": "bar"} |
| 179 | |
| 180 | def test_apply_client_authentication_options_bearer_and_basic(self): |
| 181 | bearer_token = "ACCESS_TOKEN" |
| 182 | headers = {"Content-Type": "application/json"} |
| 183 | request_body = {"foo": "bar"} |
| 184 | auth_handler = self.make_oauth_client_auth_handler(self.CLIENT_AUTH_BASIC) |
| 185 | |
| 186 | auth_handler.apply_client_authentication_options( |
| 187 | headers, request_body, bearer_token |
| 188 | ) |
| 189 | |
| 190 | # Bearer token should have higher priority. |
| 191 | assert headers == { |
| 192 | "Content-Type": "application/json", |
| 193 | "Authorization": "Bearer {}".format(bearer_token), |
| 194 | } |
| 195 | assert request_body == {"foo": "bar"} |
| 196 | |
| 197 | def test_apply_client_authentication_options_bearer_and_request_body(self): |
| 198 | bearer_token = "ACCESS_TOKEN" |
| 199 | headers = {"Content-Type": "application/json"} |
| 200 | request_body = {"foo": "bar"} |
| 201 | auth_handler = self.make_oauth_client_auth_handler( |
| 202 | self.CLIENT_AUTH_REQUEST_BODY |
| 203 | ) |
| 204 | |
| 205 | auth_handler.apply_client_authentication_options( |
| 206 | headers, request_body, bearer_token |
| 207 | ) |
| 208 | |
| 209 | # Bearer token should have higher priority. |
| 210 | assert headers == { |
| 211 | "Content-Type": "application/json", |
| 212 | "Authorization": "Bearer {}".format(bearer_token), |
| 213 | } |
| 214 | assert request_body == {"foo": "bar"} |
| 215 | |
| 216 | |
| 217 | def test__handle_error_response_code_only(): |
| 218 | error_resp = {"error": "unsupported_grant_type"} |
| 219 | response_data = json.dumps(error_resp) |
| 220 | |
| 221 | with pytest.raises(exceptions.OAuthError) as excinfo: |
| 222 | utils.handle_error_response(response_data) |
| 223 | |
| 224 | assert excinfo.match(r"Error code unsupported_grant_type") |
| 225 | |
| 226 | |
| 227 | def test__handle_error_response_code_description(): |
| 228 | error_resp = { |
| 229 | "error": "unsupported_grant_type", |
| 230 | "error_description": "The provided grant_type is unsupported", |
| 231 | } |
| 232 | response_data = json.dumps(error_resp) |
| 233 | |
| 234 | with pytest.raises(exceptions.OAuthError) as excinfo: |
| 235 | utils.handle_error_response(response_data) |
| 236 | |
| 237 | assert excinfo.match( |
| 238 | r"Error code unsupported_grant_type: The provided grant_type is unsupported" |
| 239 | ) |
| 240 | |
| 241 | |
| 242 | def test__handle_error_response_code_description_uri(): |
| 243 | error_resp = { |
| 244 | "error": "unsupported_grant_type", |
| 245 | "error_description": "The provided grant_type is unsupported", |
| 246 | "error_uri": "https://tools.ietf.org/html/rfc6749", |
| 247 | } |
| 248 | response_data = json.dumps(error_resp) |
| 249 | |
| 250 | with pytest.raises(exceptions.OAuthError) as excinfo: |
| 251 | utils.handle_error_response(response_data) |
| 252 | |
| 253 | assert excinfo.match( |
| 254 | r"Error code unsupported_grant_type: The provided grant_type is unsupported - https://tools.ietf.org/html/rfc6749" |
| 255 | ) |
| 256 | |
| 257 | |
| 258 | def test__handle_error_response_non_json(): |
| 259 | response_data = "Oops, something wrong happened" |
| 260 | |
| 261 | with pytest.raises(exceptions.OAuthError) as excinfo: |
| 262 | utils.handle_error_response(response_data) |
| 263 | |
| 264 | assert excinfo.match(r"Oops, something wrong happened") |