Sergey Shepelev | 0112eff | 2017-05-05 06:46:43 +0300 | [diff] [blame^] | 1 | import httplib2 |
| 2 | import pytest |
| 3 | import tests |
| 4 | from six.moves import urllib |
| 5 | |
| 6 | |
| 7 | def test_credentials(): |
| 8 | c = httplib2.Credentials() |
| 9 | c.add('joe', 'password') |
| 10 | assert tuple(c.iter('bitworking.org'))[0] == ('joe', 'password') |
| 11 | assert tuple(c.iter(''))[0] == ('joe', 'password') |
| 12 | c.add('fred', 'password2', 'wellformedweb.org') |
| 13 | assert tuple(c.iter('bitworking.org'))[0] == ('joe', 'password') |
| 14 | assert len(tuple(c.iter('bitworking.org'))) == 1 |
| 15 | assert len(tuple(c.iter('wellformedweb.org'))) == 2 |
| 16 | assert ('fred', 'password2') in tuple(c.iter('wellformedweb.org')) |
| 17 | c.clear() |
| 18 | assert len(tuple(c.iter('bitworking.org'))) == 0 |
| 19 | c.add('fred', 'password2', 'wellformedweb.org') |
| 20 | assert ('fred', 'password2') in tuple(c.iter('wellformedweb.org')) |
| 21 | assert len(tuple(c.iter('bitworking.org'))) == 0 |
| 22 | assert len(tuple(c.iter(''))) == 0 |
| 23 | |
| 24 | |
| 25 | def test_basic(): |
| 26 | # Test Basic Authentication |
| 27 | http = httplib2.Http() |
| 28 | password = tests.gen_password() |
| 29 | handler = tests.http_reflect_with_auth(allow_scheme='basic', allow_credentials=(('joe', password),)) |
| 30 | with tests.server_request(handler, request_count=3) as uri: |
| 31 | response, content = http.request(uri, 'GET') |
| 32 | assert response.status == 401 |
| 33 | http.add_credentials('joe', password) |
| 34 | response, content = http.request(uri, 'GET') |
| 35 | assert response.status == 200 |
| 36 | |
| 37 | |
| 38 | def test_basic_for_domain(): |
| 39 | # Test Basic Authentication |
| 40 | http = httplib2.Http() |
| 41 | password = tests.gen_password() |
| 42 | handler = tests.http_reflect_with_auth(allow_scheme='basic', allow_credentials=(('joe', password),)) |
| 43 | with tests.server_request(handler, request_count=4) as uri: |
| 44 | response, content = http.request(uri, 'GET') |
| 45 | assert response.status == 401 |
| 46 | http.add_credentials('joe', password, 'example.org') |
| 47 | response, content = http.request(uri, 'GET') |
| 48 | assert response.status == 401 |
| 49 | domain = urllib.parse.urlparse(uri)[1] |
| 50 | http.add_credentials('joe', password, domain) |
| 51 | response, content = http.request(uri, 'GET') |
| 52 | assert response.status == 200 |
| 53 | |
| 54 | |
| 55 | def test_basic_two_credentials(): |
| 56 | # Test Basic Authentication with multiple sets of credentials |
| 57 | http = httplib2.Http() |
| 58 | password1 = tests.gen_password() |
| 59 | password2 = tests.gen_password() |
| 60 | allowed = [('joe', password1)] # exploit shared mutable list |
| 61 | handler = tests.http_reflect_with_auth(allow_scheme='basic', allow_credentials=allowed) |
| 62 | with tests.server_request(handler, request_count=7) as uri: |
| 63 | http.add_credentials('fred', password2) |
| 64 | response, content = http.request(uri, 'GET') |
| 65 | assert response.status == 401 |
| 66 | http.add_credentials('joe', password1) |
| 67 | response, content = http.request(uri, 'GET') |
| 68 | assert response.status == 200 |
| 69 | allowed[0] = ('fred', password2) |
| 70 | response, content = http.request(uri, 'GET') |
| 71 | assert response.status == 200 |
| 72 | |
| 73 | |
| 74 | def test_digest(): |
| 75 | # Test that we support Digest Authentication |
| 76 | http = httplib2.Http() |
| 77 | password = tests.gen_password() |
| 78 | handler = tests.http_reflect_with_auth(allow_scheme='digest', allow_credentials=(('joe', password),)) |
| 79 | with tests.server_request(handler, request_count=3) as uri: |
| 80 | response, content = http.request(uri, 'GET') |
| 81 | assert response.status == 401 |
| 82 | http.add_credentials('joe', password) |
| 83 | response, content = http.request(uri, 'GET') |
| 84 | assert response.status == 200, content.decode() |
| 85 | |
| 86 | |
| 87 | def test_digest_next_nonce_nc(): |
| 88 | # Test that if the server sets nextnonce that we reset |
| 89 | # the nonce count back to 1 |
| 90 | http = httplib2.Http() |
| 91 | password = tests.gen_password() |
| 92 | grenew_nonce = [None] |
| 93 | handler = tests.http_reflect_with_auth( |
| 94 | allow_scheme='digest', |
| 95 | allow_credentials=(('joe', password),), |
| 96 | out_renew_nonce=grenew_nonce, |
| 97 | ) |
| 98 | with tests.server_request(handler, request_count=5) as uri: |
| 99 | http.add_credentials('joe', password) |
| 100 | response1, _ = http.request(uri, 'GET') |
| 101 | info = httplib2._parse_www_authenticate(response1, 'authentication-info') |
| 102 | assert response1.status == 200 |
| 103 | assert info.get('digest', {}).get('nc') == '00000001', info |
| 104 | assert not info.get('digest', {}).get('nextnonce'), info |
| 105 | response2, _ = http.request(uri, 'GET') |
| 106 | info2 = httplib2._parse_www_authenticate(response2, 'authentication-info') |
| 107 | assert info2.get('digest', {}).get('nc') == '00000002', info2 |
| 108 | grenew_nonce[0]() |
| 109 | response3, content = http.request(uri, 'GET') |
| 110 | info3 = httplib2._parse_www_authenticate(response3, 'authentication-info') |
| 111 | assert response3.status == 200 |
| 112 | assert info3.get('digest', {}).get('nc') == '00000001', info3 |
| 113 | |
| 114 | |
| 115 | def test_digest_auth_stale(): |
| 116 | # Test that we can handle a nonce becoming stale |
| 117 | http = httplib2.Http() |
| 118 | password = tests.gen_password() |
| 119 | grenew_nonce = [None] |
| 120 | requests = [] |
| 121 | handler = tests.http_reflect_with_auth( |
| 122 | allow_scheme='digest', |
| 123 | allow_credentials=(('joe', password),), |
| 124 | out_renew_nonce=grenew_nonce, |
| 125 | out_requests=requests, |
| 126 | ) |
| 127 | with tests.server_request(handler, request_count=4) as uri: |
| 128 | http.add_credentials('joe', password) |
| 129 | response, _ = http.request(uri, 'GET') |
| 130 | assert response.status == 200 |
| 131 | info = httplib2._parse_www_authenticate(requests[0][1].headers, 'www-authenticate') |
| 132 | grenew_nonce[0]() |
| 133 | response, _ = http.request(uri, 'GET') |
| 134 | assert response.status == 200 |
| 135 | assert not response.fromcache |
| 136 | assert getattr(response, '_stale_digest', False) |
| 137 | info2 = httplib2._parse_www_authenticate(requests[2][1].headers, 'www-authenticate') |
| 138 | nonce1 = info.get('digest', {}).get('nonce', '') |
| 139 | nonce2 = info2.get('digest', {}).get('nonce', '') |
| 140 | assert nonce1 != '' |
| 141 | assert nonce2 != '' |
| 142 | assert nonce1 != nonce2, (nonce1, nonce2) |
| 143 | |
| 144 | |
| 145 | @pytest.mark.parametrize( |
| 146 | 'data', ( |
| 147 | ({}, {}), |
| 148 | ({'www-authenticate': ''}, {}), |
| 149 | ({'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'}, |
| 150 | {'test': {'realm': 'test realm', 'foo': 'foo', 'bar': 'bar', 'baz': 'baz', 'qux': 'qux'}}), |
| 151 | ({'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'}, |
| 152 | {'t*!%#st': {'realm': 'to*!%#en', 'to*!%#en': 'quoted string'}}), |
| 153 | ({'www-authenticate': 'Test realm="a \\"test\\" realm"'}, |
| 154 | {'test': {'realm': 'a "test" realm'}}), |
| 155 | ({'www-authenticate': 'Basic realm="me"'}, |
| 156 | {'basic': {'realm': 'me'}}), |
| 157 | ({'www-authenticate': 'Basic realm="me", algorithm="MD5"'}, |
| 158 | {'basic': {'realm': 'me', 'algorithm': 'MD5'}}), |
| 159 | ({'www-authenticate': 'Basic realm="me", algorithm=MD5'}, |
| 160 | {'basic': {'realm': 'me', 'algorithm': 'MD5'}}), |
| 161 | ({'www-authenticate': 'Basic realm="me",other="fred" '}, |
| 162 | {'basic': {'realm': 'me', 'other': 'fred'}}), |
| 163 | ({'www-authenticate': 'Basic REAlm="me" '}, |
| 164 | {'basic': {'realm': 'me'}}), |
| 165 | ({'www-authenticate': 'Digest realm="digest1", qop="auth,auth-int", nonce="7102dd2", opaque="e9517f"'}, |
| 166 | {'digest': {'realm': 'digest1', 'qop': 'auth,auth-int', 'nonce': '7102dd2', 'opaque': 'e9517f'}}), |
| 167 | # multiple schema choice |
| 168 | ({'www-authenticate': 'Digest realm="multi-d", nonce="8b11d0f6", opaque="cc069c" Basic realm="multi-b" '}, |
| 169 | {'digest': {'realm': 'multi-d', 'nonce': '8b11d0f6', 'opaque': 'cc069c'}, |
| 170 | 'basic': {'realm': 'multi-b'}}), |
| 171 | # FIXME |
| 172 | # comma between schemas (glue for multiple headers with same name) |
| 173 | # ({'www-authenticate': 'Digest realm="2-comma-d", qop="auth-int", nonce="c0c8ff1", Basic realm="2-comma-b"'}, |
| 174 | # {'digest': {'realm': '2-comma-d', 'qop': 'auth-int', 'nonce': 'c0c8ff1'}, |
| 175 | # 'basic': {'realm': '2-comma-b'}}), |
| 176 | # FIXME |
| 177 | # comma between schemas + WSSE (glue for multiple headers with same name) |
| 178 | # ({'www-authenticate': 'Digest realm="com3d", Basic realm="com3b", WSSE realm="com3w", profile="token"'}, |
| 179 | # {'digest': {'realm': 'com3d'}, 'basic': {'realm': 'com3b'}, 'wsse': {'realm': 'com3w', profile': 'token'}}), |
| 180 | # FIXME |
| 181 | # multiple syntax figures |
| 182 | # ({'www-authenticate': |
| 183 | # 'Digest realm="brig", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc"' + |
| 184 | # ', Basic REAlm="zoo", WSSE realm="very", profile="UsernameToken"'}, |
| 185 | # {'digest': {'realm': 'brig', 'qop': 'auth,auth-int', 'nonce': '(*)&^&$%#', 'opaque': '5ccc'}, |
| 186 | # 'basic': {'realm': 'zoo'}, |
| 187 | # 'wsse': {'realm': 'very', 'profile': 'UsernameToken'}}), |
| 188 | # more quote combos |
| 189 | ({'www-authenticate': 'Digest realm="myrealm", nonce="KBAA=3", algorithm=MD5, qop="auth", stale=true'}, |
| 190 | {'digest': {'realm': 'myrealm', 'nonce': 'KBAA=3', 'algorithm': 'MD5', 'qop': 'auth', 'stale': 'true'}}), |
| 191 | ), ids=lambda data: str(data[0])) |
| 192 | @pytest.mark.parametrize('strict', (True, False), ids=('strict', 'relax')) |
| 193 | def test_parse_www_authenticate_correct(data, strict): |
| 194 | headers, info = data |
| 195 | # FIXME: move strict to parse argument |
| 196 | httplib2.USE_WWW_AUTH_STRICT_PARSING = strict |
| 197 | try: |
| 198 | assert httplib2._parse_www_authenticate(headers) == info |
| 199 | finally: |
| 200 | httplib2.USE_WWW_AUTH_STRICT_PARSING = 0 |
| 201 | |
| 202 | |
| 203 | def test_parse_www_authenticate_malformed(): |
| 204 | # TODO: test (and fix) header value 'barbqwnbm-bb...:asd' leads to dead loop |
| 205 | with tests.assert_raises(httplib2.MalformedHeader): |
| 206 | httplib2._parse_www_authenticate( |
| 207 | {'www-authenticate': 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'} |
| 208 | ) |
| 209 | |
| 210 | |
| 211 | def test_digest_object(): |
| 212 | credentials = ('joe', 'password') |
| 213 | host = None |
| 214 | request_uri = '/test/digest/' |
| 215 | headers = {} |
| 216 | response = { |
| 217 | 'www-authenticate': 'Digest realm="myrealm", nonce="KBAA=35", algorithm=MD5, qop="auth"' |
| 218 | } |
| 219 | content = b'' |
| 220 | |
| 221 | d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None) |
| 222 | d.request('GET', request_uri, headers, content, cnonce="33033375ec278a46") |
| 223 | our_request = 'authorization: ' + headers['authorization'] |
| 224 | working_request = ( |
| 225 | 'authorization: Digest username="joe", realm="myrealm", nonce="KBAA=35", uri="/test/digest/"' + |
| 226 | ', algorithm=MD5, response="de6d4a123b80801d0e94550411b6283f", qop=auth, nc=00000001, cnonce="33033375ec278a46"' |
| 227 | ) |
| 228 | assert our_request == working_request |
| 229 | |
| 230 | |
| 231 | def test_digest_object_with_opaque(): |
| 232 | credentials = ('joe', 'password') |
| 233 | host = None |
| 234 | request_uri = '/digest/opaque/' |
| 235 | headers = {} |
| 236 | response = { |
| 237 | 'www-authenticate': 'Digest realm="myrealm", nonce="30352fd", algorithm=MD5, qop="auth", opaque="atestopaque"', |
| 238 | } |
| 239 | content = '' |
| 240 | |
| 241 | d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None) |
| 242 | d.request('GET', request_uri, headers, content, cnonce="5ec2") |
| 243 | our_request = 'authorization: ' + headers['authorization'] |
| 244 | working_request = ( |
| 245 | 'authorization: Digest username="joe", realm="myrealm", nonce="30352fd", uri="/digest/opaque/", algorithm=MD5' + |
| 246 | ', response="a1fab43041f8f3789a447f48018bee48", qop=auth, nc=00000001, cnonce="5ec2", opaque="atestopaque"' |
| 247 | ) |
| 248 | assert our_request == working_request |
| 249 | |
| 250 | |
| 251 | def test_digest_object_stale(): |
| 252 | credentials = ('joe', 'password') |
| 253 | host = None |
| 254 | request_uri = '/digest/stale/' |
| 255 | headers = {} |
| 256 | response = httplib2.Response({}) |
| 257 | response['www-authenticate'] = 'Digest realm="myrealm", nonce="bd669f", algorithm=MD5, qop="auth", stale=true' |
| 258 | response.status = 401 |
| 259 | content = b'' |
| 260 | d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None) |
| 261 | # Returns true to force a retry |
| 262 | assert d.response(response, content) |
| 263 | |
| 264 | |
| 265 | def test_digest_object_auth_info(): |
| 266 | credentials = ('joe', 'password') |
| 267 | host = None |
| 268 | request_uri = '/digest/nextnonce/' |
| 269 | headers = {} |
| 270 | response = httplib2.Response({}) |
| 271 | response['www-authenticate'] = 'Digest realm="myrealm", nonce="barney", algorithm=MD5, qop="auth", stale=true' |
| 272 | response['authentication-info'] = 'nextnonce="fred"' |
| 273 | content = b'' |
| 274 | d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None) |
| 275 | # Returns true to force a retry |
| 276 | assert not d.response(response, content) |
| 277 | assert d.challenge['nonce'] == 'fred' |
| 278 | assert d.challenge['nc'] == 1 |
| 279 | |
| 280 | |
| 281 | def test_wsse_algorithm(): |
| 282 | digest = httplib2._wsse_username_token('d36e316282959a9ed4c89851497a717f', '2003-12-15T14:43:07Z', 'taadtaadpstcsm') |
| 283 | expected = b'quR/EWLAV4xLf9Zqyw4pDmfV9OY=' |
| 284 | assert expected == digest |