Alex Yu | aa1b95b | 2018-07-26 23:23:35 -0400 | [diff] [blame] | 1 | #!/usr/bin/env python3 |
| 2 | """A set of unit tests for httplib2.py.""" |
| 3 | |
| 4 | __author__ = "Joe Gregorio (joe@bitworking.org)" |
| 5 | __copyright__ = "Copyright 2006, Joe Gregorio" |
| 6 | __contributors__ = ["Mark Pilgrim"] |
| 7 | __license__ = "MIT" |
| 8 | __version__ = "0.2 ($Rev: 118 $)" |
| 9 | |
| 10 | import base64 |
| 11 | import http.client |
| 12 | import httplib2 |
| 13 | import io |
| 14 | import os |
| 15 | import pickle |
| 16 | import socket |
| 17 | import ssl |
| 18 | import sys |
| 19 | import time |
| 20 | import unittest |
| 21 | import urllib.parse |
| 22 | |
| 23 | base = "http://bitworking.org/projects/httplib2/test/" |
| 24 | cacheDirName = ".cache" |
| 25 | |
| 26 | |
| 27 | class CredentialsTest(unittest.TestCase): |
| 28 | def test(self): |
| 29 | c = httplib2.Credentials() |
| 30 | c.add("joe", "password") |
| 31 | self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) |
| 32 | self.assertEqual(("joe", "password"), list(c.iter(""))[0]) |
| 33 | c.add("fred", "password2", "wellformedweb.org") |
| 34 | self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) |
| 35 | self.assertEqual(1, len(list(c.iter("bitworking.org")))) |
| 36 | self.assertEqual(2, len(list(c.iter("wellformedweb.org")))) |
| 37 | self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) |
| 38 | c.clear() |
| 39 | self.assertEqual(0, len(list(c.iter("bitworking.org")))) |
| 40 | c.add("fred", "password2", "wellformedweb.org") |
| 41 | self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) |
| 42 | self.assertEqual(0, len(list(c.iter("bitworking.org")))) |
| 43 | self.assertEqual(0, len(list(c.iter("")))) |
| 44 | |
| 45 | |
| 46 | class ParserTest(unittest.TestCase): |
| 47 | def testFromStd66(self): |
| 48 | self.assertEqual( |
| 49 | ("http", "example.com", "", None, None), |
| 50 | httplib2.parse_uri("http://example.com"), |
| 51 | ) |
| 52 | self.assertEqual( |
| 53 | ("https", "example.com", "", None, None), |
| 54 | httplib2.parse_uri("https://example.com"), |
| 55 | ) |
| 56 | self.assertEqual( |
| 57 | ("https", "example.com:8080", "", None, None), |
| 58 | httplib2.parse_uri("https://example.com:8080"), |
| 59 | ) |
| 60 | self.assertEqual( |
| 61 | ("http", "example.com", "/", None, None), |
| 62 | httplib2.parse_uri("http://example.com/"), |
| 63 | ) |
| 64 | self.assertEqual( |
| 65 | ("http", "example.com", "/path", None, None), |
| 66 | httplib2.parse_uri("http://example.com/path"), |
| 67 | ) |
| 68 | self.assertEqual( |
| 69 | ("http", "example.com", "/path", "a=1&b=2", None), |
| 70 | httplib2.parse_uri("http://example.com/path?a=1&b=2"), |
| 71 | ) |
| 72 | self.assertEqual( |
| 73 | ("http", "example.com", "/path", "a=1&b=2", "fred"), |
| 74 | httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"), |
| 75 | ) |
| 76 | self.assertEqual( |
| 77 | ("http", "example.com", "/path", "a=1&b=2", "fred"), |
| 78 | httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"), |
| 79 | ) |
| 80 | |
| 81 | |
| 82 | class UrlNormTest(unittest.TestCase): |
| 83 | def test(self): |
| 84 | self.assertEqual( |
| 85 | "http://example.org/", httplib2.urlnorm("http://example.org")[-1] |
| 86 | ) |
| 87 | self.assertEqual( |
| 88 | "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1] |
| 89 | ) |
| 90 | self.assertEqual( |
| 91 | "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1] |
| 92 | ) |
| 93 | self.assertEqual( |
| 94 | "http://example.org/mypath?a=b", |
| 95 | httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1], |
| 96 | ) |
| 97 | self.assertEqual( |
| 98 | "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1] |
| 99 | ) |
| 100 | self.assertEqual( |
| 101 | httplib2.urlnorm("http://localhost:80/"), |
| 102 | httplib2.urlnorm("HTTP://LOCALHOST:80"), |
| 103 | ) |
| 104 | try: |
| 105 | httplib2.urlnorm("/") |
| 106 | self.fail("Non-absolute URIs should raise an exception") |
| 107 | except httplib2.RelativeURIError: |
| 108 | pass |
| 109 | |
| 110 | |
| 111 | class UrlSafenameTest(unittest.TestCase): |
| 112 | def test(self): |
| 113 | # Test that different URIs end up generating different safe names |
| 114 | self.assertEqual( |
| 115 | "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", |
| 116 | httplib2.safename("http://example.org/fred/?a=b"), |
| 117 | ) |
| 118 | self.assertEqual( |
| 119 | "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", |
| 120 | httplib2.safename("http://example.org/fred?/a=b"), |
| 121 | ) |
| 122 | self.assertEqual( |
| 123 | "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", |
| 124 | httplib2.safename("http://www.example.org/fred?/a=b"), |
| 125 | ) |
| 126 | self.assertEqual( |
| 127 | httplib2.safename(httplib2.urlnorm("http://www")[-1]), |
| 128 | httplib2.safename(httplib2.urlnorm("http://WWW")[-1]), |
| 129 | ) |
| 130 | self.assertEqual( |
| 131 | "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", |
| 132 | httplib2.safename("https://www.example.org/fred?/a=b"), |
| 133 | ) |
| 134 | self.assertNotEqual( |
| 135 | httplib2.safename("http://www"), httplib2.safename("https://www") |
| 136 | ) |
| 137 | # Test the max length limits |
| 138 | uri = "http://" + ("w" * 200) + ".org" |
| 139 | uri2 = "http://" + ("w" * 201) + ".org" |
| 140 | self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri)) |
| 141 | # Max length should be 200 + 1 (",") + 32 |
| 142 | self.assertEqual(233, len(httplib2.safename(uri2))) |
| 143 | self.assertEqual(233, len(httplib2.safename(uri))) |
| 144 | # Unicode |
| 145 | if sys.version_info >= (2, 3): |
| 146 | self.assertEqual( |
| 147 | "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", |
| 148 | httplib2.safename("http://\u2304.org/fred/?a=b"), |
| 149 | ) |
| 150 | |
| 151 | |
| 152 | class _MyResponse(io.BytesIO): |
| 153 | def __init__(self, body, **kwargs): |
| 154 | io.BytesIO.__init__(self, body) |
| 155 | self.headers = kwargs |
| 156 | |
| 157 | def items(self): |
| 158 | return self.headers.items() |
| 159 | |
| 160 | def iteritems(self): |
| 161 | return iter(self.headers.items()) |
| 162 | |
| 163 | |
| 164 | class _MyHTTPConnection(object): |
| 165 | "This class is just a mock of httplib.HTTPConnection used for testing" |
| 166 | |
| 167 | def __init__( |
| 168 | self, |
| 169 | host, |
| 170 | port=None, |
| 171 | key_file=None, |
| 172 | cert_file=None, |
| 173 | strict=None, |
| 174 | timeout=None, |
| 175 | proxy_info=None, |
| 176 | ): |
| 177 | self.host = host |
| 178 | self.port = port |
| 179 | self.timeout = timeout |
| 180 | self.log = "" |
| 181 | self.sock = None |
| 182 | |
| 183 | def set_debuglevel(self, level): |
| 184 | pass |
| 185 | |
| 186 | def connect(self): |
| 187 | "Connect to a host on a given port." |
| 188 | pass |
| 189 | |
| 190 | def close(self): |
| 191 | pass |
| 192 | |
| 193 | def request(self, method, request_uri, body, headers): |
| 194 | pass |
| 195 | |
| 196 | def getresponse(self): |
| 197 | return _MyResponse(b"the body", status="200") |
| 198 | |
| 199 | |
| 200 | class _MyHTTPBadStatusConnection(object): |
| 201 | "Mock of httplib.HTTPConnection that raises BadStatusLine." |
| 202 | |
| 203 | num_calls = 0 |
| 204 | |
| 205 | def __init__( |
| 206 | self, |
| 207 | host, |
| 208 | port=None, |
| 209 | key_file=None, |
| 210 | cert_file=None, |
| 211 | strict=None, |
| 212 | timeout=None, |
| 213 | proxy_info=None, |
| 214 | ): |
| 215 | self.host = host |
| 216 | self.port = port |
| 217 | self.timeout = timeout |
| 218 | self.log = "" |
| 219 | self.sock = None |
| 220 | _MyHTTPBadStatusConnection.num_calls = 0 |
| 221 | |
| 222 | def set_debuglevel(self, level): |
| 223 | pass |
| 224 | |
| 225 | def connect(self): |
| 226 | pass |
| 227 | |
| 228 | def close(self): |
| 229 | pass |
| 230 | |
| 231 | def request(self, method, request_uri, body, headers): |
| 232 | pass |
| 233 | |
| 234 | def getresponse(self): |
| 235 | _MyHTTPBadStatusConnection.num_calls += 1 |
| 236 | raise http.client.BadStatusLine("") |
| 237 | |
| 238 | |
| 239 | class HttpTest(unittest.TestCase): |
| 240 | def setUp(self): |
| 241 | if os.path.exists(cacheDirName): |
| 242 | [ |
| 243 | os.remove(os.path.join(cacheDirName, file)) |
| 244 | for file in os.listdir(cacheDirName) |
| 245 | ] |
| 246 | self.http = httplib2.Http(cacheDirName) |
| 247 | self.http.clear_credentials() |
| 248 | |
| 249 | def testIPv6NoSSL(self): |
| 250 | try: |
| 251 | self.http.request("http://[::1]/") |
| 252 | except socket.gaierror: |
| 253 | self.fail("should get the address family right for IPv6") |
| 254 | except socket.error: |
| 255 | # Even if IPv6 isn't installed on a machine it should just raise socket.error |
| 256 | pass |
| 257 | |
| 258 | def testIPv6SSL(self): |
| 259 | try: |
| 260 | self.http.request("https://[::1]/") |
| 261 | except socket.gaierror: |
| 262 | self.fail("should get the address family right for IPv6") |
| 263 | except socket.error: |
| 264 | # Even if IPv6 isn't installed on a machine it should just raise socket.error |
| 265 | pass |
| 266 | |
| 267 | def testConnectionType(self): |
| 268 | self.http.force_exception_to_status_code = False |
| 269 | response, content = self.http.request( |
| 270 | "http://bitworking.org", connection_type=_MyHTTPConnection |
| 271 | ) |
| 272 | self.assertEqual(response["content-location"], "http://bitworking.org") |
| 273 | self.assertEqual(content, b"the body") |
| 274 | |
| 275 | def testBadStatusLineRetry(self): |
| 276 | old_retries = httplib2.RETRIES |
| 277 | httplib2.RETRIES = 1 |
| 278 | self.http.force_exception_to_status_code = False |
| 279 | try: |
| 280 | response, content = self.http.request( |
| 281 | "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection |
| 282 | ) |
| 283 | except http.client.BadStatusLine: |
| 284 | self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls) |
| 285 | httplib2.RETRIES = old_retries |
| 286 | |
| 287 | def testGetUnknownServer(self): |
| 288 | self.http.force_exception_to_status_code = False |
| 289 | try: |
| 290 | self.http.request("http://fred.bitworking.org/") |
| 291 | self.fail( |
| 292 | "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server." |
| 293 | ) |
| 294 | except httplib2.ServerNotFoundError: |
| 295 | pass |
| 296 | |
| 297 | # Now test with exceptions turned off |
| 298 | self.http.force_exception_to_status_code = True |
| 299 | |
| 300 | (response, content) = self.http.request("http://fred.bitworking.org/") |
| 301 | self.assertEqual(response["content-type"], "text/plain") |
| 302 | self.assertTrue(content.startswith(b"Unable to find")) |
| 303 | self.assertEqual(response.status, 400) |
| 304 | |
| 305 | def testGetConnectionRefused(self): |
| 306 | self.http.force_exception_to_status_code = False |
| 307 | try: |
| 308 | self.http.request("http://localhost:7777/") |
| 309 | self.fail("An socket.error exception must be thrown on Connection Refused.") |
| 310 | except socket.error: |
| 311 | pass |
| 312 | |
| 313 | # Now test with exceptions turned off |
| 314 | self.http.force_exception_to_status_code = True |
| 315 | |
| 316 | (response, content) = self.http.request("http://localhost:7777/") |
| 317 | self.assertEqual(response["content-type"], "text/plain") |
| 318 | self.assertTrue(b"Connection refused" in content) |
| 319 | self.assertEqual(response.status, 400) |
| 320 | |
| 321 | def testGetIRI(self): |
| 322 | if sys.version_info >= (2, 3): |
| 323 | uri = urllib.parse.urljoin( |
| 324 | base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}" |
| 325 | ) |
| 326 | (response, content) = self.http.request(uri, "GET") |
| 327 | d = self.reflector(content) |
| 328 | self.assertTrue("QUERY_STRING" in d) |
| 329 | self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0) |
| 330 | |
| 331 | def testGetIsDefaultMethod(self): |
| 332 | # Test that GET is the default method |
| 333 | uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi") |
| 334 | (response, content) = self.http.request(uri) |
| 335 | self.assertEqual(response["x-method"], "GET") |
| 336 | |
| 337 | def testDifferentMethods(self): |
| 338 | # Test that all methods can be used |
| 339 | uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi") |
| 340 | for method in ["GET", "PUT", "DELETE", "POST"]: |
| 341 | (response, content) = self.http.request(uri, method, body=b" ") |
| 342 | self.assertEqual(response["x-method"], method) |
| 343 | |
| 344 | def testHeadRead(self): |
| 345 | # Test that we don't try to read the response of a HEAD request |
| 346 | # since httplib blocks response.read() for HEAD requests. |
| 347 | # Oddly enough this doesn't appear as a problem when doing HEAD requests |
| 348 | # against Apache servers. |
| 349 | uri = "http://www.google.com/" |
| 350 | (response, content) = self.http.request(uri, "HEAD") |
| 351 | self.assertEqual(response.status, 200) |
| 352 | self.assertEqual(content, b"") |
| 353 | |
| 354 | def testGetNoCache(self): |
| 355 | # Test that can do a GET w/o the cache turned on. |
| 356 | http = httplib2.Http() |
| 357 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 358 | (response, content) = http.request(uri, "GET") |
| 359 | self.assertEqual(response.status, 200) |
| 360 | self.assertEqual(response.previous, None) |
| 361 | |
| 362 | def testGetOnlyIfCachedCacheHit(self): |
| 363 | # Test that can do a GET with cache and 'only-if-cached' |
| 364 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 365 | (response, content) = self.http.request(uri, "GET") |
| 366 | (response, content) = self.http.request( |
| 367 | uri, "GET", headers={"cache-control": "only-if-cached"} |
| 368 | ) |
| 369 | self.assertEqual(response.fromcache, True) |
| 370 | self.assertEqual(response.status, 200) |
| 371 | |
| 372 | def testGetOnlyIfCachedCacheMiss(self): |
| 373 | # Test that can do a GET with no cache with 'only-if-cached' |
| 374 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 375 | (response, content) = self.http.request( |
| 376 | uri, "GET", headers={"cache-control": "only-if-cached"} |
| 377 | ) |
| 378 | self.assertEqual(response.fromcache, False) |
| 379 | self.assertEqual(response.status, 504) |
| 380 | |
| 381 | def testGetOnlyIfCachedNoCacheAtAll(self): |
| 382 | # Test that can do a GET with no cache with 'only-if-cached' |
| 383 | # Of course, there might be an intermediary beyond us |
| 384 | # that responds to the 'only-if-cached', so this |
| 385 | # test can't really be guaranteed to pass. |
| 386 | http = httplib2.Http() |
| 387 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 388 | (response, content) = http.request( |
| 389 | uri, "GET", headers={"cache-control": "only-if-cached"} |
| 390 | ) |
| 391 | self.assertEqual(response.fromcache, False) |
| 392 | self.assertEqual(response.status, 504) |
| 393 | |
| 394 | def testUserAgent(self): |
| 395 | # Test that we provide a default user-agent |
| 396 | uri = urllib.parse.urljoin(base, "user-agent/test.cgi") |
| 397 | (response, content) = self.http.request(uri, "GET") |
| 398 | self.assertEqual(response.status, 200) |
| 399 | self.assertTrue(content.startswith(b"Python-httplib2/")) |
| 400 | |
| 401 | def testUserAgentNonDefault(self): |
| 402 | # Test that the default user-agent can be over-ridden |
| 403 | |
| 404 | uri = urllib.parse.urljoin(base, "user-agent/test.cgi") |
| 405 | (response, content) = self.http.request( |
| 406 | uri, "GET", headers={"User-Agent": "fred/1.0"} |
| 407 | ) |
| 408 | self.assertEqual(response.status, 200) |
| 409 | self.assertTrue(content.startswith(b"fred/1.0")) |
| 410 | |
| 411 | def testGet300WithLocation(self): |
| 412 | # Test the we automatically follow 300 redirects if a Location: header is provided |
| 413 | uri = urllib.parse.urljoin(base, "300/with-location-header.asis") |
| 414 | (response, content) = self.http.request(uri, "GET") |
| 415 | self.assertEqual(response.status, 200) |
| 416 | self.assertEqual(content, b"This is the final destination.\n") |
| 417 | self.assertEqual(response.previous.status, 300) |
| 418 | self.assertEqual(response.previous.fromcache, False) |
| 419 | |
| 420 | # Confirm that the intermediate 300 is not cached |
| 421 | (response, content) = self.http.request(uri, "GET") |
| 422 | self.assertEqual(response.status, 200) |
| 423 | self.assertEqual(content, b"This is the final destination.\n") |
| 424 | self.assertEqual(response.previous.status, 300) |
| 425 | self.assertEqual(response.previous.fromcache, False) |
| 426 | |
| 427 | def testGet300WithLocationNoRedirect(self): |
| 428 | # Test the we automatically follow 300 redirects if a Location: header is provided |
| 429 | self.http.follow_redirects = False |
| 430 | uri = urllib.parse.urljoin(base, "300/with-location-header.asis") |
| 431 | (response, content) = self.http.request(uri, "GET") |
| 432 | self.assertEqual(response.status, 300) |
| 433 | |
| 434 | def testGet300WithoutLocation(self): |
| 435 | # Not giving a Location: header in a 300 response is acceptable |
| 436 | # In which case we just return the 300 response |
| 437 | uri = urllib.parse.urljoin(base, "300/without-location-header.asis") |
| 438 | (response, content) = self.http.request(uri, "GET") |
| 439 | self.assertEqual(response.status, 300) |
| 440 | self.assertTrue(response["content-type"].startswith("text/html")) |
| 441 | self.assertEqual(response.previous, None) |
| 442 | |
| 443 | def testGet301(self): |
| 444 | # Test that we automatically follow 301 redirects |
| 445 | # and that we cache the 301 response |
| 446 | uri = urllib.parse.urljoin(base, "301/onestep.asis") |
| 447 | destination = urllib.parse.urljoin(base, "302/final-destination.txt") |
| 448 | (response, content) = self.http.request(uri, "GET") |
| 449 | self.assertEqual(response.status, 200) |
| 450 | self.assertTrue("content-location" in response) |
| 451 | self.assertEqual(response["content-location"], destination) |
| 452 | self.assertEqual(content, b"This is the final destination.\n") |
| 453 | self.assertEqual(response.previous.status, 301) |
| 454 | self.assertEqual(response.previous.fromcache, False) |
| 455 | |
| 456 | (response, content) = self.http.request(uri, "GET") |
| 457 | self.assertEqual(response.status, 200) |
| 458 | self.assertEqual(response["content-location"], destination) |
| 459 | self.assertEqual(content, b"This is the final destination.\n") |
| 460 | self.assertEqual(response.previous.status, 301) |
| 461 | self.assertEqual(response.previous.fromcache, True) |
| 462 | |
| 463 | def testHead301(self): |
| 464 | # Test that we automatically follow 301 redirects |
| 465 | uri = urllib.parse.urljoin(base, "301/onestep.asis") |
| 466 | (response, content) = self.http.request(uri, "HEAD") |
| 467 | self.assertEqual(response.status, 200) |
| 468 | self.assertEqual(response.previous.status, 301) |
| 469 | self.assertEqual(response.previous.fromcache, False) |
| 470 | |
| 471 | def testGet301NoRedirect(self): |
| 472 | # Test that we automatically follow 301 redirects |
| 473 | # and that we cache the 301 response |
| 474 | self.http.follow_redirects = False |
| 475 | uri = urllib.parse.urljoin(base, "301/onestep.asis") |
| 476 | destination = urllib.parse.urljoin(base, "302/final-destination.txt") |
| 477 | (response, content) = self.http.request(uri, "GET") |
| 478 | self.assertEqual(response.status, 301) |
| 479 | |
| 480 | def testGet302(self): |
| 481 | # Test that we automatically follow 302 redirects |
| 482 | # and that we DO NOT cache the 302 response |
| 483 | uri = urllib.parse.urljoin(base, "302/onestep.asis") |
| 484 | destination = urllib.parse.urljoin(base, "302/final-destination.txt") |
| 485 | (response, content) = self.http.request(uri, "GET") |
| 486 | self.assertEqual(response.status, 200) |
| 487 | self.assertEqual(response["content-location"], destination) |
| 488 | self.assertEqual(content, b"This is the final destination.\n") |
| 489 | self.assertEqual(response.previous.status, 302) |
| 490 | self.assertEqual(response.previous.fromcache, False) |
| 491 | |
| 492 | uri = urllib.parse.urljoin(base, "302/onestep.asis") |
| 493 | (response, content) = self.http.request(uri, "GET") |
| 494 | self.assertEqual(response.status, 200) |
| 495 | self.assertEqual(response.fromcache, True) |
| 496 | self.assertEqual(response["content-location"], destination) |
| 497 | self.assertEqual(content, b"This is the final destination.\n") |
| 498 | self.assertEqual(response.previous.status, 302) |
| 499 | self.assertEqual(response.previous.fromcache, False) |
| 500 | self.assertEqual(response.previous["content-location"], uri) |
| 501 | |
| 502 | uri = urllib.parse.urljoin(base, "302/twostep.asis") |
| 503 | |
| 504 | (response, content) = self.http.request(uri, "GET") |
| 505 | self.assertEqual(response.status, 200) |
| 506 | self.assertEqual(response.fromcache, True) |
| 507 | self.assertEqual(content, b"This is the final destination.\n") |
| 508 | self.assertEqual(response.previous.status, 302) |
| 509 | self.assertEqual(response.previous.fromcache, False) |
| 510 | |
| 511 | def testGet302RedirectionLimit(self): |
| 512 | # Test that we can set a lower redirection limit |
| 513 | # and that we raise an exception when we exceed |
| 514 | # that limit. |
| 515 | self.http.force_exception_to_status_code = False |
| 516 | |
| 517 | uri = urllib.parse.urljoin(base, "302/twostep.asis") |
| 518 | try: |
| 519 | (response, content) = self.http.request(uri, "GET", redirections=1) |
| 520 | self.fail("This should not happen") |
| 521 | except httplib2.RedirectLimit: |
| 522 | pass |
| 523 | except Exception as e: |
| 524 | self.fail("Threw wrong kind of exception ") |
| 525 | |
| 526 | # Re-run the test with out the exceptions |
| 527 | self.http.force_exception_to_status_code = True |
| 528 | |
| 529 | (response, content) = self.http.request(uri, "GET", redirections=1) |
| 530 | self.assertEqual(response.status, 500) |
| 531 | self.assertTrue(response.reason.startswith("Redirected more")) |
| 532 | self.assertEqual("302", response["status"]) |
| 533 | self.assertTrue(content.startswith(b"<html>")) |
| 534 | self.assertTrue(response.previous != None) |
| 535 | |
| 536 | def testGet302NoLocation(self): |
| 537 | # Test that we throw an exception when we get |
| 538 | # a 302 with no Location: header. |
| 539 | self.http.force_exception_to_status_code = False |
| 540 | uri = urllib.parse.urljoin(base, "302/no-location.asis") |
| 541 | try: |
| 542 | (response, content) = self.http.request(uri, "GET") |
| 543 | self.fail("Should never reach here") |
| 544 | except httplib2.RedirectMissingLocation: |
| 545 | pass |
| 546 | except Exception as e: |
| 547 | self.fail("Threw wrong kind of exception ") |
| 548 | |
| 549 | # Re-run the test with out the exceptions |
| 550 | self.http.force_exception_to_status_code = True |
| 551 | |
| 552 | (response, content) = self.http.request(uri, "GET") |
| 553 | self.assertEqual(response.status, 500) |
| 554 | self.assertTrue(response.reason.startswith("Redirected but")) |
| 555 | self.assertEqual("302", response["status"]) |
| 556 | self.assertTrue(content.startswith(b"This is content")) |
| 557 | |
| 558 | def testGet301ViaHttps(self): |
| 559 | # Google always redirects to http://google.com |
| 560 | (response, content) = self.http.request("https://code.google.com/apis/", "GET") |
| 561 | self.assertEqual(200, response.status) |
| 562 | self.assertEqual(301, response.previous.status) |
| 563 | |
| 564 | def testGetViaHttps(self): |
| 565 | # Test that we can handle HTTPS |
| 566 | (response, content) = self.http.request("https://google.com/adsense/", "GET") |
| 567 | self.assertEqual(200, response.status) |
| 568 | |
| 569 | def testGetViaHttpsSpecViolationOnLocation(self): |
| 570 | # Test that we follow redirects through HTTPS |
| 571 | # even if they violate the spec by including |
| 572 | # a relative Location: header instead of an |
| 573 | # absolute one. |
| 574 | (response, content) = self.http.request("https://google.com/adsense", "GET") |
| 575 | self.assertEqual(200, response.status) |
| 576 | self.assertNotEqual(None, response.previous) |
| 577 | |
| 578 | def testGetViaHttpsKeyCert(self): |
| 579 | # At this point I can only test |
| 580 | # that the key and cert files are passed in |
| 581 | # correctly to httplib. It would be nice to have |
| 582 | # a real https endpoint to test against. |
| 583 | http = httplib2.Http(timeout=2) |
| 584 | |
| 585 | http.add_certificate("akeyfile", "acertfile", "bitworking.org") |
| 586 | try: |
| 587 | (response, content) = http.request("https://bitworking.org", "GET") |
| 588 | except AttributeError: |
| 589 | self.assertEqual( |
| 590 | http.connections["https:bitworking.org"].key_file, "akeyfile" |
| 591 | ) |
| 592 | self.assertEqual( |
| 593 | http.connections["https:bitworking.org"].cert_file, "acertfile" |
| 594 | ) |
| 595 | except IOError: |
| 596 | # Skip on 3.2 |
| 597 | pass |
| 598 | |
| 599 | try: |
| 600 | (response, content) = http.request("https://notthere.bitworking.org", "GET") |
| 601 | except httplib2.ServerNotFoundError: |
| 602 | self.assertEqual( |
| 603 | http.connections["https:notthere.bitworking.org"].key_file, None |
| 604 | ) |
| 605 | self.assertEqual( |
| 606 | http.connections["https:notthere.bitworking.org"].cert_file, None |
| 607 | ) |
| 608 | except IOError: |
| 609 | # Skip on 3.2 |
| 610 | pass |
| 611 | |
| 612 | def testSslCertValidation(self): |
| 613 | # Test that we get an ssl.SSLError when specifying a non-existent CA |
| 614 | # certs file. |
| 615 | http = httplib2.Http(ca_certs="/nosuchfile") |
| 616 | self.assertRaises(IOError, http.request, "https://www.google.com/", "GET") |
| 617 | |
| 618 | # Test that we get a SSLHandshakeError if we try to access |
| 619 | # https://www.google.com, using a CA cert file that doesn't contain |
| 620 | # the CA Google uses (i.e., simulating a cert that's not signed by a |
| 621 | # trusted CA). |
| 622 | other_ca_certs = os.path.join( |
| 623 | os.path.dirname(os.path.abspath(httplib2.__file__)), |
| 624 | "test", |
| 625 | "other_cacerts.txt", |
| 626 | ) |
| 627 | http = httplib2.Http(ca_certs=other_ca_certs) |
| 628 | self.assertRaises(ssl.SSLError, http.request, "https://www.google.com/", "GET") |
| 629 | |
| 630 | def testSniHostnameValidation(self): |
| 631 | self.http.request("https://google.com/", method="GET") |
| 632 | |
| 633 | def testGet303(self): |
| 634 | # Do a follow-up GET on a Location: header |
| 635 | # returned from a POST that gave a 303. |
| 636 | uri = urllib.parse.urljoin(base, "303/303.cgi") |
| 637 | (response, content) = self.http.request(uri, "POST", " ") |
| 638 | self.assertEqual(response.status, 200) |
| 639 | self.assertEqual(content, b"This is the final destination.\n") |
| 640 | self.assertEqual(response.previous.status, 303) |
| 641 | |
| 642 | def testGet303NoRedirect(self): |
| 643 | # Do a follow-up GET on a Location: header |
| 644 | # returned from a POST that gave a 303. |
| 645 | self.http.follow_redirects = False |
| 646 | uri = urllib.parse.urljoin(base, "303/303.cgi") |
| 647 | (response, content) = self.http.request(uri, "POST", " ") |
| 648 | self.assertEqual(response.status, 303) |
| 649 | |
| 650 | def test303ForDifferentMethods(self): |
| 651 | # Test that all methods can be used |
| 652 | uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi") |
| 653 | for (method, method_on_303) in [ |
| 654 | ("PUT", "GET"), |
| 655 | ("DELETE", "GET"), |
| 656 | ("POST", "GET"), |
| 657 | ("GET", "GET"), |
| 658 | ("HEAD", "GET"), |
| 659 | ]: |
| 660 | (response, content) = self.http.request(uri, method, body=b" ") |
| 661 | self.assertEqual(response["x-method"], method_on_303) |
| 662 | |
| 663 | def testGet304(self): |
| 664 | # Test that we use ETags properly to validate our cache |
| 665 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 666 | (response, content) = self.http.request( |
| 667 | uri, "GET", headers={"accept-encoding": "identity"} |
| 668 | ) |
| 669 | self.assertNotEqual(response["etag"], "") |
| 670 | |
| 671 | (response, content) = self.http.request( |
| 672 | uri, "GET", headers={"accept-encoding": "identity"} |
| 673 | ) |
| 674 | (response, content) = self.http.request( |
| 675 | uri, |
| 676 | "GET", |
| 677 | headers={"accept-encoding": "identity", "cache-control": "must-revalidate"}, |
| 678 | ) |
| 679 | self.assertEqual(response.status, 200) |
| 680 | self.assertEqual(response.fromcache, True) |
| 681 | |
| 682 | cache_file_name = os.path.join( |
| 683 | cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]) |
| 684 | ) |
| 685 | f = open(cache_file_name, "r") |
| 686 | status_line = f.readline() |
| 687 | f.close() |
| 688 | |
| 689 | self.assertTrue(status_line.startswith("status:")) |
| 690 | |
| 691 | (response, content) = self.http.request( |
| 692 | uri, "HEAD", headers={"accept-encoding": "identity"} |
| 693 | ) |
| 694 | self.assertEqual(response.status, 200) |
| 695 | self.assertEqual(response.fromcache, True) |
| 696 | |
| 697 | (response, content) = self.http.request( |
| 698 | uri, "GET", headers={"accept-encoding": "identity", "range": "bytes=0-0"} |
| 699 | ) |
| 700 | self.assertEqual(response.status, 206) |
| 701 | self.assertEqual(response.fromcache, False) |
| 702 | |
| 703 | def testGetIgnoreEtag(self): |
| 704 | # Test that we can forcibly ignore ETags |
| 705 | uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") |
| 706 | (response, content) = self.http.request( |
| 707 | uri, "GET", headers={"accept-encoding": "identity"} |
| 708 | ) |
| 709 | self.assertNotEqual(response["etag"], "") |
| 710 | |
| 711 | (response, content) = self.http.request( |
| 712 | uri, |
| 713 | "GET", |
| 714 | headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, |
| 715 | ) |
| 716 | d = self.reflector(content) |
| 717 | self.assertTrue("HTTP_IF_NONE_MATCH" in d) |
| 718 | |
| 719 | self.http.ignore_etag = True |
| 720 | (response, content) = self.http.request( |
| 721 | uri, |
| 722 | "GET", |
| 723 | headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, |
| 724 | ) |
| 725 | d = self.reflector(content) |
| 726 | self.assertEqual(response.fromcache, False) |
| 727 | self.assertFalse("HTTP_IF_NONE_MATCH" in d) |
| 728 | |
| 729 | def testOverrideEtag(self): |
| 730 | # Test that we can forcibly ignore ETags |
| 731 | uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") |
| 732 | (response, content) = self.http.request( |
| 733 | uri, "GET", headers={"accept-encoding": "identity"} |
| 734 | ) |
| 735 | self.assertNotEqual(response["etag"], "") |
| 736 | |
| 737 | (response, content) = self.http.request( |
| 738 | uri, |
| 739 | "GET", |
| 740 | headers={"accept-encoding": "identity", "cache-control": "max-age=0"}, |
| 741 | ) |
| 742 | d = self.reflector(content) |
| 743 | self.assertTrue("HTTP_IF_NONE_MATCH" in d) |
| 744 | self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred") |
| 745 | |
| 746 | (response, content) = self.http.request( |
| 747 | uri, |
| 748 | "GET", |
| 749 | headers={ |
| 750 | "accept-encoding": "identity", |
| 751 | "cache-control": "max-age=0", |
| 752 | "if-none-match": "fred", |
| 753 | }, |
| 754 | ) |
| 755 | d = self.reflector(content) |
| 756 | self.assertTrue("HTTP_IF_NONE_MATCH" in d) |
| 757 | self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred") |
| 758 | |
| 759 | # MAP-commented this out because it consistently fails |
| 760 | # def testGet304EndToEnd(self): |
| 761 | # # Test that end to end headers get overwritten in the cache |
| 762 | # uri = urllib.parse.urljoin(base, "304/end2end.cgi") |
| 763 | # (response, content) = self.http.request(uri, "GET") |
| 764 | # self.assertNotEqual(response['etag'], "") |
| 765 | # old_date = response['date'] |
| 766 | # time.sleep(2) |
| 767 | # |
| 768 | # (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'}) |
| 769 | # # The response should be from the cache, but the Date: header should be updated. |
| 770 | # new_date = response['date'] |
| 771 | # self.assertNotEqual(new_date, old_date) |
| 772 | # self.assertEqual(response.status, 200) |
| 773 | # self.assertEqual(response.fromcache, True) |
| 774 | |
| 775 | def testGet304LastModified(self): |
| 776 | # Test that we can still handle a 304 |
| 777 | # by only using the last-modified cache validator. |
| 778 | uri = urllib.parse.urljoin( |
| 779 | base, "304/last-modified-only/last-modified-only.txt" |
| 780 | ) |
| 781 | (response, content) = self.http.request(uri, "GET") |
| 782 | |
| 783 | self.assertNotEqual(response["last-modified"], "") |
| 784 | (response, content) = self.http.request(uri, "GET") |
| 785 | (response, content) = self.http.request(uri, "GET") |
| 786 | self.assertEqual(response.status, 200) |
| 787 | self.assertEqual(response.fromcache, True) |
| 788 | |
| 789 | def testGet307(self): |
| 790 | # Test that we do follow 307 redirects but |
| 791 | # do not cache the 307 |
| 792 | uri = urllib.parse.urljoin(base, "307/onestep.asis") |
| 793 | (response, content) = self.http.request(uri, "GET") |
| 794 | self.assertEqual(response.status, 200) |
| 795 | self.assertEqual(content, b"This is the final destination.\n") |
| 796 | self.assertEqual(response.previous.status, 307) |
| 797 | self.assertEqual(response.previous.fromcache, False) |
| 798 | |
| 799 | (response, content) = self.http.request(uri, "GET") |
| 800 | self.assertEqual(response.status, 200) |
| 801 | self.assertEqual(response.fromcache, True) |
| 802 | self.assertEqual(content, b"This is the final destination.\n") |
| 803 | self.assertEqual(response.previous.status, 307) |
| 804 | self.assertEqual(response.previous.fromcache, False) |
| 805 | |
| 806 | def testGet410(self): |
| 807 | # Test that we pass 410's through |
| 808 | uri = urllib.parse.urljoin(base, "410/410.asis") |
| 809 | (response, content) = self.http.request(uri, "GET") |
| 810 | self.assertEqual(response.status, 410) |
| 811 | |
| 812 | def testVaryHeaderSimple(self): |
| 813 | """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request. |
| 814 | |
| 815 | """ |
| 816 | # test that the vary header is sent |
| 817 | uri = urllib.parse.urljoin(base, "vary/accept.asis") |
| 818 | (response, content) = self.http.request( |
| 819 | uri, "GET", headers={"Accept": "text/plain"} |
| 820 | ) |
| 821 | self.assertEqual(response.status, 200) |
| 822 | self.assertTrue("vary" in response) |
| 823 | |
| 824 | # get the resource again, from the cache since accept header in this |
| 825 | # request is the same as the request |
| 826 | (response, content) = self.http.request( |
| 827 | uri, "GET", headers={"Accept": "text/plain"} |
| 828 | ) |
| 829 | self.assertEqual(response.status, 200) |
| 830 | self.assertEqual(response.fromcache, True, msg="Should be from cache") |
| 831 | |
| 832 | # get the resource again, not from cache since Accept headers does not match |
| 833 | (response, content) = self.http.request( |
| 834 | uri, "GET", headers={"Accept": "text/html"} |
| 835 | ) |
| 836 | self.assertEqual(response.status, 200) |
| 837 | self.assertEqual(response.fromcache, False, msg="Should not be from cache") |
| 838 | |
| 839 | # get the resource again, without any Accept header, so again no match |
| 840 | (response, content) = self.http.request(uri, "GET") |
| 841 | self.assertEqual(response.status, 200) |
| 842 | self.assertEqual(response.fromcache, False, msg="Should not be from cache") |
| 843 | |
| 844 | def testNoVary(self): |
| 845 | pass |
| 846 | # when there is no vary, a different Accept header (e.g.) should not |
| 847 | # impact if the cache is used |
| 848 | # test that the vary header is not sent |
| 849 | # uri = urllib.parse.urljoin(base, "vary/no-vary.asis") |
| 850 | # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) |
| 851 | # self.assertEqual(response.status, 200) |
| 852 | # self.assertFalse('vary' in response) |
| 853 | # |
| 854 | # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'}) |
| 855 | # self.assertEqual(response.status, 200) |
| 856 | # self.assertEqual(response.fromcache, True, msg="Should be from cache") |
| 857 | # |
| 858 | # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'}) |
| 859 | # self.assertEqual(response.status, 200) |
| 860 | # self.assertEqual(response.fromcache, True, msg="Should be from cache") |
| 861 | |
| 862 | def testVaryHeaderDouble(self): |
| 863 | uri = urllib.parse.urljoin(base, "vary/accept-double.asis") |
| 864 | (response, content) = self.http.request( |
| 865 | uri, |
| 866 | "GET", |
| 867 | headers={ |
| 868 | "Accept": "text/plain", |
| 869 | "Accept-Language": "da, en-gb;q=0.8, en;q=0.7", |
| 870 | }, |
| 871 | ) |
| 872 | self.assertEqual(response.status, 200) |
| 873 | self.assertTrue("vary" in response) |
| 874 | |
| 875 | # we are from cache |
| 876 | (response, content) = self.http.request( |
| 877 | uri, |
| 878 | "GET", |
| 879 | headers={ |
| 880 | "Accept": "text/plain", |
| 881 | "Accept-Language": "da, en-gb;q=0.8, en;q=0.7", |
| 882 | }, |
| 883 | ) |
| 884 | self.assertEqual(response.fromcache, True, msg="Should be from cache") |
| 885 | |
| 886 | (response, content) = self.http.request( |
| 887 | uri, "GET", headers={"Accept": "text/plain"} |
| 888 | ) |
| 889 | self.assertEqual(response.status, 200) |
| 890 | self.assertEqual(response.fromcache, False) |
| 891 | |
| 892 | # get the resource again, not from cache, varied headers don't match exact |
| 893 | (response, content) = self.http.request( |
| 894 | uri, "GET", headers={"Accept-Language": "da"} |
| 895 | ) |
| 896 | self.assertEqual(response.status, 200) |
| 897 | self.assertEqual(response.fromcache, False, msg="Should not be from cache") |
| 898 | |
| 899 | def testVaryUnusedHeader(self): |
| 900 | # A header's value is not considered to vary if it's not used at all. |
| 901 | uri = urllib.parse.urljoin(base, "vary/unused-header.asis") |
| 902 | (response, content) = self.http.request( |
| 903 | uri, "GET", headers={"Accept": "text/plain"} |
| 904 | ) |
| 905 | self.assertEqual(response.status, 200) |
| 906 | self.assertTrue("vary" in response) |
| 907 | |
| 908 | # we are from cache |
| 909 | (response, content) = self.http.request( |
| 910 | uri, "GET", headers={"Accept": "text/plain"} |
| 911 | ) |
| 912 | self.assertEqual(response.fromcache, True, msg="Should be from cache") |
| 913 | |
| 914 | def testHeadGZip(self): |
| 915 | # Test that we don't try to decompress a HEAD response |
| 916 | uri = urllib.parse.urljoin(base, "gzip/final-destination.txt") |
| 917 | (response, content) = self.http.request(uri, "HEAD") |
| 918 | self.assertEqual(response.status, 200) |
| 919 | self.assertNotEqual(int(response["content-length"]), 0) |
| 920 | self.assertEqual(content, b"") |
| 921 | |
| 922 | def testGetGZip(self): |
| 923 | # Test that we support gzip compression |
| 924 | uri = urllib.parse.urljoin(base, "gzip/final-destination.txt") |
| 925 | (response, content) = self.http.request(uri, "GET") |
| 926 | self.assertEqual(response.status, 200) |
| 927 | self.assertFalse("content-encoding" in response) |
| 928 | self.assertTrue("-content-encoding" in response) |
| 929 | self.assertEqual( |
| 930 | int(response["content-length"]), len(b"This is the final destination.\n") |
| 931 | ) |
| 932 | self.assertEqual(content, b"This is the final destination.\n") |
| 933 | |
| 934 | def testPostAndGZipResponse(self): |
| 935 | uri = urllib.parse.urljoin(base, "gzip/post.cgi") |
| 936 | (response, content) = self.http.request(uri, "POST", body=" ") |
| 937 | self.assertEqual(response.status, 200) |
| 938 | self.assertFalse("content-encoding" in response) |
| 939 | self.assertTrue("-content-encoding" in response) |
| 940 | |
| 941 | def testGetGZipFailure(self): |
| 942 | # Test that we raise a good exception when the gzip fails |
| 943 | self.http.force_exception_to_status_code = False |
| 944 | uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis") |
| 945 | try: |
| 946 | (response, content) = self.http.request(uri, "GET") |
| 947 | self.fail("Should never reach here") |
| 948 | except httplib2.FailedToDecompressContent: |
| 949 | pass |
| 950 | except Exception: |
| 951 | self.fail("Threw wrong kind of exception") |
| 952 | |
| 953 | # Re-run the test with out the exceptions |
| 954 | self.http.force_exception_to_status_code = True |
| 955 | |
| 956 | (response, content) = self.http.request(uri, "GET") |
| 957 | self.assertEqual(response.status, 500) |
| 958 | self.assertTrue(response.reason.startswith("Content purported")) |
| 959 | |
| 960 | def testIndividualTimeout(self): |
| 961 | uri = urllib.parse.urljoin(base, "timeout/timeout.cgi") |
| 962 | http = httplib2.Http(timeout=1) |
| 963 | http.force_exception_to_status_code = True |
| 964 | |
| 965 | (response, content) = http.request(uri) |
| 966 | self.assertEqual(response.status, 408) |
| 967 | self.assertTrue(response.reason.startswith("Request Timeout")) |
| 968 | self.assertTrue(content.startswith(b"Request Timeout")) |
| 969 | |
| 970 | def testGetDeflate(self): |
| 971 | # Test that we support deflate compression |
| 972 | uri = urllib.parse.urljoin(base, "deflate/deflated.asis") |
| 973 | (response, content) = self.http.request(uri, "GET") |
| 974 | self.assertEqual(response.status, 200) |
| 975 | self.assertFalse("content-encoding" in response) |
| 976 | self.assertEqual( |
| 977 | int(response["content-length"]), len("This is the final destination.") |
| 978 | ) |
| 979 | self.assertEqual(content, b"This is the final destination.") |
| 980 | |
| 981 | def testGetDeflateFailure(self): |
| 982 | # Test that we raise a good exception when the deflate fails |
| 983 | self.http.force_exception_to_status_code = False |
| 984 | |
| 985 | uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis") |
| 986 | try: |
| 987 | (response, content) = self.http.request(uri, "GET") |
| 988 | self.fail("Should never reach here") |
| 989 | except httplib2.FailedToDecompressContent: |
| 990 | pass |
| 991 | except Exception: |
| 992 | self.fail("Threw wrong kind of exception") |
| 993 | |
| 994 | # Re-run the test with out the exceptions |
| 995 | self.http.force_exception_to_status_code = True |
| 996 | |
| 997 | (response, content) = self.http.request(uri, "GET") |
| 998 | self.assertEqual(response.status, 500) |
| 999 | self.assertTrue(response.reason.startswith("Content purported")) |
| 1000 | |
| 1001 | def testGetDuplicateHeaders(self): |
| 1002 | # Test that duplicate headers get concatenated via ',' |
| 1003 | uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis") |
| 1004 | (response, content) = self.http.request(uri, "GET") |
| 1005 | self.assertEqual(response.status, 200) |
| 1006 | self.assertEqual(content, b"This is content\n") |
| 1007 | self.assertEqual( |
| 1008 | response["link"].split(",")[0], |
| 1009 | '<http://bitworking.org>; rel="home"; title="BitWorking"', |
| 1010 | ) |
| 1011 | |
| 1012 | def testGetCacheControlNoCache(self): |
| 1013 | # Test Cache-Control: no-cache on requests |
| 1014 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 1015 | (response, content) = self.http.request( |
| 1016 | uri, "GET", headers={"accept-encoding": "identity"} |
| 1017 | ) |
| 1018 | self.assertNotEqual(response["etag"], "") |
| 1019 | (response, content) = self.http.request( |
| 1020 | uri, "GET", headers={"accept-encoding": "identity"} |
| 1021 | ) |
| 1022 | self.assertEqual(response.status, 200) |
| 1023 | self.assertEqual(response.fromcache, True) |
| 1024 | |
| 1025 | (response, content) = self.http.request( |
| 1026 | uri, |
| 1027 | "GET", |
| 1028 | headers={"accept-encoding": "identity", "Cache-Control": "no-cache"}, |
| 1029 | ) |
| 1030 | self.assertEqual(response.status, 200) |
| 1031 | self.assertEqual(response.fromcache, False) |
| 1032 | |
| 1033 | def testGetCacheControlPragmaNoCache(self): |
| 1034 | # Test Pragma: no-cache on requests |
| 1035 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 1036 | (response, content) = self.http.request( |
| 1037 | uri, "GET", headers={"accept-encoding": "identity"} |
| 1038 | ) |
| 1039 | self.assertNotEqual(response["etag"], "") |
| 1040 | (response, content) = self.http.request( |
| 1041 | uri, "GET", headers={"accept-encoding": "identity"} |
| 1042 | ) |
| 1043 | self.assertEqual(response.status, 200) |
| 1044 | self.assertEqual(response.fromcache, True) |
| 1045 | |
| 1046 | (response, content) = self.http.request( |
| 1047 | uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"} |
| 1048 | ) |
| 1049 | self.assertEqual(response.status, 200) |
| 1050 | self.assertEqual(response.fromcache, False) |
| 1051 | |
| 1052 | def testGetCacheControlNoStoreRequest(self): |
| 1053 | # A no-store request means that the response should not be stored. |
| 1054 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 1055 | |
| 1056 | (response, content) = self.http.request( |
| 1057 | uri, "GET", headers={"Cache-Control": "no-store"} |
| 1058 | ) |
| 1059 | self.assertEqual(response.status, 200) |
| 1060 | self.assertEqual(response.fromcache, False) |
| 1061 | |
| 1062 | (response, content) = self.http.request( |
| 1063 | uri, "GET", headers={"Cache-Control": "no-store"} |
| 1064 | ) |
| 1065 | self.assertEqual(response.status, 200) |
| 1066 | self.assertEqual(response.fromcache, False) |
| 1067 | |
| 1068 | def testGetCacheControlNoStoreResponse(self): |
| 1069 | # A no-store response means that the response should not be stored. |
| 1070 | uri = urllib.parse.urljoin(base, "no-store/no-store.asis") |
| 1071 | |
| 1072 | (response, content) = self.http.request(uri, "GET") |
| 1073 | self.assertEqual(response.status, 200) |
| 1074 | self.assertEqual(response.fromcache, False) |
| 1075 | |
| 1076 | (response, content) = self.http.request(uri, "GET") |
| 1077 | self.assertEqual(response.status, 200) |
| 1078 | self.assertEqual(response.fromcache, False) |
| 1079 | |
| 1080 | def testGetCacheControlNoCacheNoStoreRequest(self): |
| 1081 | # Test that a no-store, no-cache clears the entry from the cache |
| 1082 | # even if it was cached previously. |
| 1083 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 1084 | |
| 1085 | (response, content) = self.http.request(uri, "GET") |
| 1086 | (response, content) = self.http.request(uri, "GET") |
| 1087 | self.assertEqual(response.fromcache, True) |
| 1088 | (response, content) = self.http.request( |
| 1089 | uri, "GET", headers={"Cache-Control": "no-store, no-cache"} |
| 1090 | ) |
| 1091 | (response, content) = self.http.request( |
| 1092 | uri, "GET", headers={"Cache-Control": "no-store, no-cache"} |
| 1093 | ) |
| 1094 | self.assertEqual(response.status, 200) |
| 1095 | self.assertEqual(response.fromcache, False) |
| 1096 | |
| 1097 | def testUpdateInvalidatesCache(self): |
| 1098 | # Test that calling PUT or DELETE on a |
| 1099 | # URI that is cache invalidates that cache. |
| 1100 | uri = urllib.parse.urljoin(base, "304/test_etag.txt") |
| 1101 | |
| 1102 | (response, content) = self.http.request(uri, "GET") |
| 1103 | (response, content) = self.http.request(uri, "GET") |
| 1104 | self.assertEqual(response.fromcache, True) |
| 1105 | (response, content) = self.http.request(uri, "DELETE") |
| 1106 | self.assertEqual(response.status, 405) |
| 1107 | |
| 1108 | (response, content) = self.http.request(uri, "GET") |
| 1109 | self.assertEqual(response.fromcache, False) |
| 1110 | |
| 1111 | def testUpdateUsesCachedETag(self): |
| 1112 | # Test that we natively support http://www.w3.org/1999/04/Editing/ |
| 1113 | uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") |
| 1114 | |
| 1115 | (response, content) = self.http.request(uri, "GET") |
| 1116 | self.assertEqual(response.status, 200) |
| 1117 | self.assertEqual(response.fromcache, False) |
| 1118 | (response, content) = self.http.request(uri, "GET") |
| 1119 | self.assertEqual(response.status, 200) |
| 1120 | self.assertEqual(response.fromcache, True) |
| 1121 | (response, content) = self.http.request(uri, "PUT", body="foo") |
| 1122 | self.assertEqual(response.status, 200) |
| 1123 | (response, content) = self.http.request(uri, "PUT", body="foo") |
| 1124 | self.assertEqual(response.status, 412) |
| 1125 | |
| 1126 | def testUpdatePatchUsesCachedETag(self): |
| 1127 | # Test that we natively support http://www.w3.org/1999/04/Editing/ |
| 1128 | uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") |
| 1129 | |
| 1130 | (response, content) = self.http.request(uri, "GET") |
| 1131 | self.assertEqual(response.status, 200) |
| 1132 | self.assertEqual(response.fromcache, False) |
| 1133 | (response, content) = self.http.request(uri, "GET") |
| 1134 | self.assertEqual(response.status, 200) |
| 1135 | self.assertEqual(response.fromcache, True) |
| 1136 | (response, content) = self.http.request(uri, "PATCH", body="foo") |
| 1137 | self.assertEqual(response.status, 200) |
| 1138 | (response, content) = self.http.request(uri, "PATCH", body="foo") |
| 1139 | self.assertEqual(response.status, 412) |
| 1140 | |
| 1141 | def testUpdateUsesCachedETagAndOCMethod(self): |
| 1142 | # Test that we natively support http://www.w3.org/1999/04/Editing/ |
| 1143 | uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") |
| 1144 | |
| 1145 | (response, content) = self.http.request(uri, "GET") |
| 1146 | self.assertEqual(response.status, 200) |
| 1147 | self.assertEqual(response.fromcache, False) |
| 1148 | (response, content) = self.http.request(uri, "GET") |
| 1149 | self.assertEqual(response.status, 200) |
| 1150 | self.assertEqual(response.fromcache, True) |
| 1151 | self.http.optimistic_concurrency_methods.append("DELETE") |
| 1152 | (response, content) = self.http.request(uri, "DELETE") |
| 1153 | self.assertEqual(response.status, 200) |
| 1154 | |
| 1155 | def testUpdateUsesCachedETagOverridden(self): |
| 1156 | # Test that we natively support http://www.w3.org/1999/04/Editing/ |
| 1157 | uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") |
| 1158 | |
| 1159 | (response, content) = self.http.request(uri, "GET") |
| 1160 | self.assertEqual(response.status, 200) |
| 1161 | self.assertEqual(response.fromcache, False) |
| 1162 | (response, content) = self.http.request(uri, "GET") |
| 1163 | self.assertEqual(response.status, 200) |
| 1164 | self.assertEqual(response.fromcache, True) |
| 1165 | (response, content) = self.http.request( |
| 1166 | uri, "PUT", body="foo", headers={"if-match": "fred"} |
| 1167 | ) |
| 1168 | self.assertEqual(response.status, 412) |
| 1169 | |
| 1170 | def testBasicAuth(self): |
| 1171 | # Test Basic Authentication |
| 1172 | uri = urllib.parse.urljoin(base, "basic/file.txt") |
| 1173 | (response, content) = self.http.request(uri, "GET") |
| 1174 | self.assertEqual(response.status, 401) |
| 1175 | |
| 1176 | uri = urllib.parse.urljoin(base, "basic/") |
| 1177 | (response, content) = self.http.request(uri, "GET") |
| 1178 | self.assertEqual(response.status, 401) |
| 1179 | |
| 1180 | self.http.add_credentials("joe", "password") |
| 1181 | (response, content) = self.http.request(uri, "GET") |
| 1182 | self.assertEqual(response.status, 200) |
| 1183 | |
| 1184 | uri = urllib.parse.urljoin(base, "basic/file.txt") |
| 1185 | (response, content) = self.http.request(uri, "GET") |
| 1186 | self.assertEqual(response.status, 200) |
| 1187 | |
| 1188 | def testBasicAuthWithDomain(self): |
| 1189 | # Test Basic Authentication |
| 1190 | uri = urllib.parse.urljoin(base, "basic/file.txt") |
| 1191 | (response, content) = self.http.request(uri, "GET") |
| 1192 | self.assertEqual(response.status, 401) |
| 1193 | |
| 1194 | uri = urllib.parse.urljoin(base, "basic/") |
| 1195 | (response, content) = self.http.request(uri, "GET") |
| 1196 | self.assertEqual(response.status, 401) |
| 1197 | |
| 1198 | self.http.add_credentials("joe", "password", "example.org") |
| 1199 | (response, content) = self.http.request(uri, "GET") |
| 1200 | self.assertEqual(response.status, 401) |
| 1201 | |
| 1202 | uri = urllib.parse.urljoin(base, "basic/file.txt") |
| 1203 | (response, content) = self.http.request(uri, "GET") |
| 1204 | self.assertEqual(response.status, 401) |
| 1205 | |
| 1206 | domain = urllib.parse.urlparse(base)[1] |
| 1207 | self.http.add_credentials("joe", "password", domain) |
| 1208 | (response, content) = self.http.request(uri, "GET") |
| 1209 | self.assertEqual(response.status, 200) |
| 1210 | |
| 1211 | uri = urllib.parse.urljoin(base, "basic/file.txt") |
| 1212 | (response, content) = self.http.request(uri, "GET") |
| 1213 | self.assertEqual(response.status, 200) |
| 1214 | |
| 1215 | def testBasicAuthTwoDifferentCredentials(self): |
| 1216 | # Test Basic Authentication with multiple sets of credentials |
| 1217 | uri = urllib.parse.urljoin(base, "basic2/file.txt") |
| 1218 | (response, content) = self.http.request(uri, "GET") |
| 1219 | self.assertEqual(response.status, 401) |
| 1220 | |
| 1221 | uri = urllib.parse.urljoin(base, "basic2/") |
| 1222 | (response, content) = self.http.request(uri, "GET") |
| 1223 | self.assertEqual(response.status, 401) |
| 1224 | |
| 1225 | self.http.add_credentials("fred", "barney") |
| 1226 | (response, content) = self.http.request(uri, "GET") |
| 1227 | self.assertEqual(response.status, 200) |
| 1228 | |
| 1229 | uri = urllib.parse.urljoin(base, "basic2/file.txt") |
| 1230 | (response, content) = self.http.request(uri, "GET") |
| 1231 | self.assertEqual(response.status, 200) |
| 1232 | |
| 1233 | def testBasicAuthNested(self): |
| 1234 | # Test Basic Authentication with resources |
| 1235 | # that are nested |
| 1236 | uri = urllib.parse.urljoin(base, "basic-nested/") |
| 1237 | (response, content) = self.http.request(uri, "GET") |
| 1238 | self.assertEqual(response.status, 401) |
| 1239 | |
| 1240 | uri = urllib.parse.urljoin(base, "basic-nested/subdir") |
| 1241 | (response, content) = self.http.request(uri, "GET") |
| 1242 | self.assertEqual(response.status, 401) |
| 1243 | |
| 1244 | # Now add in credentials one at a time and test. |
| 1245 | self.http.add_credentials("joe", "password") |
| 1246 | |
| 1247 | uri = urllib.parse.urljoin(base, "basic-nested/") |
| 1248 | (response, content) = self.http.request(uri, "GET") |
| 1249 | self.assertEqual(response.status, 200) |
| 1250 | |
| 1251 | uri = urllib.parse.urljoin(base, "basic-nested/subdir") |
| 1252 | (response, content) = self.http.request(uri, "GET") |
| 1253 | self.assertEqual(response.status, 401) |
| 1254 | |
| 1255 | self.http.add_credentials("fred", "barney") |
| 1256 | |
| 1257 | uri = urllib.parse.urljoin(base, "basic-nested/") |
| 1258 | (response, content) = self.http.request(uri, "GET") |
| 1259 | self.assertEqual(response.status, 200) |
| 1260 | |
| 1261 | uri = urllib.parse.urljoin(base, "basic-nested/subdir") |
| 1262 | (response, content) = self.http.request(uri, "GET") |
| 1263 | self.assertEqual(response.status, 200) |
| 1264 | |
| 1265 | def testDigestAuth(self): |
| 1266 | # Test that we support Digest Authentication |
| 1267 | uri = urllib.parse.urljoin(base, "digest/") |
| 1268 | (response, content) = self.http.request(uri, "GET") |
| 1269 | self.assertEqual(response.status, 401) |
| 1270 | |
| 1271 | self.http.add_credentials("joe", "password") |
| 1272 | (response, content) = self.http.request(uri, "GET") |
| 1273 | self.assertEqual(response.status, 200) |
| 1274 | |
| 1275 | uri = urllib.parse.urljoin(base, "digest/file.txt") |
| 1276 | (response, content) = self.http.request(uri, "GET") |
| 1277 | |
| 1278 | def testDigestAuthNextNonceAndNC(self): |
| 1279 | # Test that if the server sets nextnonce that we reset |
| 1280 | # the nonce count back to 1 |
| 1281 | uri = urllib.parse.urljoin(base, "digest/file.txt") |
| 1282 | self.http.add_credentials("joe", "password") |
| 1283 | (response, content) = self.http.request( |
| 1284 | uri, "GET", headers={"cache-control": "no-cache"} |
| 1285 | ) |
| 1286 | info = httplib2._parse_www_authenticate(response, "authentication-info") |
| 1287 | self.assertEqual(response.status, 200) |
| 1288 | (response, content) = self.http.request( |
| 1289 | uri, "GET", headers={"cache-control": "no-cache"} |
| 1290 | ) |
| 1291 | info2 = httplib2._parse_www_authenticate(response, "authentication-info") |
| 1292 | self.assertEqual(response.status, 200) |
| 1293 | |
| 1294 | if "nextnonce" in info: |
| 1295 | self.assertEqual(info2["nc"], 1) |
| 1296 | |
| 1297 | def testDigestAuthStale(self): |
| 1298 | # Test that we can handle a nonce becoming stale |
| 1299 | uri = urllib.parse.urljoin(base, "digest-expire/file.txt") |
| 1300 | self.http.add_credentials("joe", "password") |
| 1301 | (response, content) = self.http.request( |
| 1302 | uri, "GET", headers={"cache-control": "no-cache"} |
| 1303 | ) |
| 1304 | info = httplib2._parse_www_authenticate(response, "authentication-info") |
| 1305 | self.assertEqual(response.status, 200) |
| 1306 | |
| 1307 | time.sleep(3) |
| 1308 | # Sleep long enough that the nonce becomes stale |
| 1309 | |
| 1310 | (response, content) = self.http.request( |
| 1311 | uri, "GET", headers={"cache-control": "no-cache"} |
| 1312 | ) |
| 1313 | self.assertFalse(response.fromcache) |
| 1314 | self.assertTrue(response._stale_digest) |
| 1315 | info3 = httplib2._parse_www_authenticate(response, "authentication-info") |
| 1316 | self.assertEqual(response.status, 200) |
| 1317 | |
| 1318 | def reflector(self, content): |
| 1319 | return dict( |
| 1320 | [ |
| 1321 | tuple(x.split("=", 1)) |
| 1322 | for x in content.decode("utf-8").strip().split("\n") |
| 1323 | ] |
| 1324 | ) |
| 1325 | |
| 1326 | def testReflector(self): |
| 1327 | uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") |
| 1328 | (response, content) = self.http.request(uri, "GET") |
| 1329 | d = self.reflector(content) |
| 1330 | self.assertTrue("HTTP_USER_AGENT" in d) |
| 1331 | |
| 1332 | def testConnectionClose(self): |
| 1333 | uri = "http://www.google.com/" |
| 1334 | (response, content) = self.http.request(uri, "GET") |
| 1335 | for c in self.http.connections.values(): |
| 1336 | self.assertNotEqual(None, c.sock) |
| 1337 | (response, content) = self.http.request( |
| 1338 | uri, "GET", headers={"connection": "close"} |
| 1339 | ) |
| 1340 | for c in self.http.connections.values(): |
| 1341 | self.assertEqual(None, c.sock) |
| 1342 | |
| 1343 | def testPickleHttp(self): |
| 1344 | pickled_http = pickle.dumps(self.http) |
| 1345 | new_http = pickle.loads(pickled_http) |
| 1346 | |
| 1347 | self.assertEqual( |
| 1348 | sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys()) |
| 1349 | ) |
| 1350 | for key in new_http.__dict__: |
| 1351 | if key in ("certificates", "credentials"): |
| 1352 | self.assertEqual( |
| 1353 | new_http.__dict__[key].credentials, |
| 1354 | self.http.__dict__[key].credentials, |
| 1355 | ) |
| 1356 | elif key == "cache": |
| 1357 | self.assertEqual( |
| 1358 | new_http.__dict__[key].cache, self.http.__dict__[key].cache |
| 1359 | ) |
| 1360 | else: |
| 1361 | self.assertEqual(new_http.__dict__[key], self.http.__dict__[key]) |
| 1362 | |
| 1363 | def testPickleHttpWithConnection(self): |
| 1364 | self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection) |
| 1365 | pickled_http = pickle.dumps(self.http) |
| 1366 | new_http = pickle.loads(pickled_http) |
| 1367 | |
| 1368 | self.assertEqual(list(self.http.connections.keys()), ["http:bitworking.org"]) |
| 1369 | self.assertEqual(new_http.connections, {}) |
| 1370 | |
| 1371 | def testPickleCustomRequestHttp(self): |
| 1372 | def dummy_request(*args, **kwargs): |
| 1373 | return new_request(*args, **kwargs) |
| 1374 | |
| 1375 | dummy_request.dummy_attr = "dummy_value" |
| 1376 | |
| 1377 | self.http.request = dummy_request |
| 1378 | pickled_http = pickle.dumps(self.http) |
| 1379 | self.assertFalse(b"S'request'" in pickled_http) |
| 1380 | |
| 1381 | |
| 1382 | try: |
| 1383 | import memcache |
| 1384 | |
| 1385 | class HttpTestMemCached(HttpTest): |
| 1386 | def setUp(self): |
| 1387 | self.cache = memcache.Client(["127.0.0.1:11211"], debug=0) |
| 1388 | # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1) |
| 1389 | self.http = httplib2.Http(self.cache) |
| 1390 | self.cache.flush_all() |
| 1391 | # Not exactly sure why the sleep is needed here, but |
| 1392 | # if not present then some unit tests that rely on caching |
| 1393 | # fail. Memcached seems to lose some sets immediately |
| 1394 | # after a flush_all if the set is to a value that |
| 1395 | # was previously cached. (Maybe the flush is handled async?) |
| 1396 | time.sleep(1) |
| 1397 | self.http.clear_credentials() |
| 1398 | |
| 1399 | |
| 1400 | except: |
| 1401 | pass |
| 1402 | |
| 1403 | # ------------------------------------------------------------------------ |
| 1404 | |
| 1405 | |
| 1406 | class HttpPrivateTest(unittest.TestCase): |
| 1407 | def testParseCacheControl(self): |
| 1408 | # Test that we can parse the Cache-Control header |
| 1409 | self.assertEqual({}, httplib2._parse_cache_control({})) |
| 1410 | self.assertEqual( |
| 1411 | {"no-cache": 1}, |
| 1412 | httplib2._parse_cache_control({"cache-control": " no-cache"}), |
| 1413 | ) |
| 1414 | cc = httplib2._parse_cache_control( |
| 1415 | {"cache-control": " no-cache, max-age = 7200"} |
| 1416 | ) |
| 1417 | self.assertEqual(cc["no-cache"], 1) |
| 1418 | self.assertEqual(cc["max-age"], "7200") |
| 1419 | cc = httplib2._parse_cache_control({"cache-control": " , "}) |
| 1420 | self.assertEqual(cc[""], 1) |
| 1421 | |
| 1422 | try: |
| 1423 | cc = httplib2._parse_cache_control( |
| 1424 | {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"} |
| 1425 | ) |
| 1426 | self.assertTrue("max-age" in cc) |
| 1427 | except: |
| 1428 | self.fail("Should not throw exception") |
| 1429 | |
| 1430 | def testNormalizeHeaders(self): |
| 1431 | # Test that we normalize headers to lowercase |
| 1432 | h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"}) |
| 1433 | self.assertTrue("cache-control" in h) |
| 1434 | self.assertTrue("other" in h) |
| 1435 | self.assertEqual("Stuff", h["other"]) |
| 1436 | |
| 1437 | def testConvertByteStr(self): |
| 1438 | with self.assertRaises(TypeError): |
| 1439 | httplib2._convert_byte_str(4) |
| 1440 | self.assertEqual("Hello World", httplib2._convert_byte_str(b"Hello World")) |
| 1441 | self.assertEqual("Bye World", httplib2._convert_byte_str("Bye World")) |
| 1442 | |
| 1443 | def testExpirationModelTransparent(self): |
| 1444 | # Test that no-cache makes our request TRANSPARENT |
| 1445 | response_headers = {"cache-control": "max-age=7200"} |
| 1446 | request_headers = {"cache-control": "no-cache"} |
| 1447 | self.assertEqual( |
| 1448 | "TRANSPARENT", |
| 1449 | httplib2._entry_disposition(response_headers, request_headers), |
| 1450 | ) |
| 1451 | |
| 1452 | def testMaxAgeNonNumeric(self): |
| 1453 | # Test that no-cache makes our request TRANSPARENT |
| 1454 | response_headers = {"cache-control": "max-age=fred, min-fresh=barney"} |
| 1455 | request_headers = {} |
| 1456 | self.assertEqual( |
| 1457 | "STALE", httplib2._entry_disposition(response_headers, request_headers) |
| 1458 | ) |
| 1459 | |
| 1460 | def testExpirationModelNoCacheResponse(self): |
| 1461 | # The date and expires point to an entry that should be |
| 1462 | # FRESH, but the no-cache over-rides that. |
| 1463 | now = time.time() |
| 1464 | response_headers = { |
| 1465 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), |
| 1466 | "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)), |
| 1467 | "cache-control": "no-cache", |
| 1468 | } |
| 1469 | request_headers = {} |
| 1470 | self.assertEqual( |
| 1471 | "STALE", httplib2._entry_disposition(response_headers, request_headers) |
| 1472 | ) |
| 1473 | |
| 1474 | def testExpirationModelStaleRequestMustReval(self): |
| 1475 | # must-revalidate forces STALE |
| 1476 | self.assertEqual( |
| 1477 | "STALE", |
| 1478 | httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}), |
| 1479 | ) |
| 1480 | |
| 1481 | def testExpirationModelStaleResponseMustReval(self): |
| 1482 | # must-revalidate forces STALE |
| 1483 | self.assertEqual( |
| 1484 | "STALE", |
| 1485 | httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}), |
| 1486 | ) |
| 1487 | |
| 1488 | def testExpirationModelFresh(self): |
| 1489 | response_headers = { |
| 1490 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), |
| 1491 | "cache-control": "max-age=2", |
| 1492 | } |
| 1493 | request_headers = {} |
| 1494 | self.assertEqual( |
| 1495 | "FRESH", httplib2._entry_disposition(response_headers, request_headers) |
| 1496 | ) |
| 1497 | time.sleep(3) |
| 1498 | self.assertEqual( |
| 1499 | "STALE", httplib2._entry_disposition(response_headers, request_headers) |
| 1500 | ) |
| 1501 | |
| 1502 | def testExpirationMaxAge0(self): |
| 1503 | response_headers = { |
| 1504 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), |
| 1505 | "cache-control": "max-age=0", |
| 1506 | } |
| 1507 | request_headers = {} |
| 1508 | self.assertEqual( |
| 1509 | "STALE", httplib2._entry_disposition(response_headers, request_headers) |
| 1510 | ) |
| 1511 | |
| 1512 | def testExpirationModelDateAndExpires(self): |
| 1513 | now = time.time() |
| 1514 | response_headers = { |
| 1515 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), |
| 1516 | "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)), |
| 1517 | } |
| 1518 | request_headers = {} |
| 1519 | self.assertEqual( |
| 1520 | "FRESH", httplib2._entry_disposition(response_headers, request_headers) |
| 1521 | ) |
| 1522 | time.sleep(3) |
| 1523 | self.assertEqual( |
| 1524 | "STALE", httplib2._entry_disposition(response_headers, request_headers) |
| 1525 | ) |
| 1526 | |
| 1527 | def testExpiresZero(self): |
| 1528 | now = time.time() |
| 1529 | response_headers = { |
| 1530 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), |
| 1531 | "expires": "0", |
| 1532 | } |
| 1533 | request_headers = {} |
| 1534 | self.assertEqual( |
| 1535 | "STALE", httplib2._entry_disposition(response_headers, request_headers) |
| 1536 | ) |
| 1537 | |
| 1538 | def testExpirationModelDateOnly(self): |
| 1539 | now = time.time() |
| 1540 | response_headers = { |
| 1541 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3)) |
| 1542 | } |
| 1543 | request_headers = {} |
| 1544 | self.assertEqual( |
| 1545 | "STALE", httplib2._entry_disposition(response_headers, request_headers) |
| 1546 | ) |
| 1547 | |
| 1548 | def testExpirationModelOnlyIfCached(self): |
| 1549 | response_headers = {} |
| 1550 | request_headers = {"cache-control": "only-if-cached"} |
| 1551 | self.assertEqual( |
| 1552 | "FRESH", httplib2._entry_disposition(response_headers, request_headers) |
| 1553 | ) |
| 1554 | |
| 1555 | def testExpirationModelMaxAgeBoth(self): |
| 1556 | now = time.time() |
| 1557 | response_headers = { |
| 1558 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), |
| 1559 | "cache-control": "max-age=2", |
| 1560 | } |
| 1561 | request_headers = {"cache-control": "max-age=0"} |
| 1562 | self.assertEqual( |
| 1563 | "STALE", httplib2._entry_disposition(response_headers, request_headers) |
| 1564 | ) |
| 1565 | |
| 1566 | def testExpirationModelDateAndExpiresMinFresh1(self): |
| 1567 | now = time.time() |
| 1568 | response_headers = { |
| 1569 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), |
| 1570 | "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)), |
| 1571 | } |
| 1572 | request_headers = {"cache-control": "min-fresh=2"} |
| 1573 | self.assertEqual( |
| 1574 | "STALE", httplib2._entry_disposition(response_headers, request_headers) |
| 1575 | ) |
| 1576 | |
| 1577 | def testExpirationModelDateAndExpiresMinFresh2(self): |
| 1578 | now = time.time() |
| 1579 | response_headers = { |
| 1580 | "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), |
| 1581 | "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)), |
| 1582 | } |
| 1583 | request_headers = {"cache-control": "min-fresh=2"} |
| 1584 | self.assertEqual( |
| 1585 | "FRESH", httplib2._entry_disposition(response_headers, request_headers) |
| 1586 | ) |
| 1587 | |
| 1588 | def testParseWWWAuthenticateEmpty(self): |
| 1589 | res = httplib2._parse_www_authenticate({}) |
| 1590 | self.assertEqual(len(list(res.keys())), 0) |
| 1591 | |
| 1592 | def testParseWWWAuthenticate(self): |
| 1593 | # different uses of spaces around commas |
| 1594 | res = httplib2._parse_www_authenticate( |
| 1595 | { |
| 1596 | "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux' |
| 1597 | } |
| 1598 | ) |
| 1599 | self.assertEqual(len(list(res.keys())), 1) |
| 1600 | self.assertEqual(len(list(res["test"].keys())), 5) |
| 1601 | |
| 1602 | # tokens with non-alphanum |
| 1603 | res = httplib2._parse_www_authenticate( |
| 1604 | {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'} |
| 1605 | ) |
| 1606 | self.assertEqual(len(list(res.keys())), 1) |
| 1607 | self.assertEqual(len(list(res["t*!%#st"].keys())), 2) |
| 1608 | |
| 1609 | # quoted string with quoted pairs |
| 1610 | res = httplib2._parse_www_authenticate( |
| 1611 | {"www-authenticate": 'Test realm="a \\"test\\" realm"'} |
| 1612 | ) |
| 1613 | self.assertEqual(len(list(res.keys())), 1) |
| 1614 | self.assertEqual(res["test"]["realm"], 'a "test" realm') |
| 1615 | |
| 1616 | def testParseWWWAuthenticateStrict(self): |
| 1617 | httplib2.USE_WWW_AUTH_STRICT_PARSING = 1 |
| 1618 | self.testParseWWWAuthenticate() |
| 1619 | httplib2.USE_WWW_AUTH_STRICT_PARSING = 0 |
| 1620 | |
| 1621 | def testParseWWWAuthenticateBasic(self): |
| 1622 | res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'}) |
| 1623 | basic = res["basic"] |
| 1624 | self.assertEqual("me", basic["realm"]) |
| 1625 | |
| 1626 | res = httplib2._parse_www_authenticate( |
| 1627 | {"www-authenticate": 'Basic realm="me", algorithm="MD5"'} |
| 1628 | ) |
| 1629 | basic = res["basic"] |
| 1630 | self.assertEqual("me", basic["realm"]) |
| 1631 | self.assertEqual("MD5", basic["algorithm"]) |
| 1632 | |
| 1633 | res = httplib2._parse_www_authenticate( |
| 1634 | {"www-authenticate": 'Basic realm="me", algorithm=MD5'} |
| 1635 | ) |
| 1636 | basic = res["basic"] |
| 1637 | self.assertEqual("me", basic["realm"]) |
| 1638 | self.assertEqual("MD5", basic["algorithm"]) |
| 1639 | |
| 1640 | def testParseWWWAuthenticateBasic2(self): |
| 1641 | res = httplib2._parse_www_authenticate( |
| 1642 | {"www-authenticate": 'Basic realm="me",other="fred" '} |
| 1643 | ) |
| 1644 | basic = res["basic"] |
| 1645 | self.assertEqual("me", basic["realm"]) |
| 1646 | self.assertEqual("fred", basic["other"]) |
| 1647 | |
| 1648 | def testParseWWWAuthenticateBasic3(self): |
| 1649 | res = httplib2._parse_www_authenticate( |
| 1650 | {"www-authenticate": 'Basic REAlm="me" '} |
| 1651 | ) |
| 1652 | basic = res["basic"] |
| 1653 | self.assertEqual("me", basic["realm"]) |
| 1654 | |
| 1655 | def testParseWWWAuthenticateDigest(self): |
| 1656 | res = httplib2._parse_www_authenticate( |
| 1657 | { |
| 1658 | "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"' |
| 1659 | } |
| 1660 | ) |
| 1661 | digest = res["digest"] |
| 1662 | self.assertEqual("testrealm@host.com", digest["realm"]) |
| 1663 | self.assertEqual("auth,auth-int", digest["qop"]) |
| 1664 | |
| 1665 | def testParseWWWAuthenticateMultiple(self): |
| 1666 | res = httplib2._parse_www_authenticate( |
| 1667 | { |
| 1668 | "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" ' |
| 1669 | } |
| 1670 | ) |
| 1671 | digest = res["digest"] |
| 1672 | self.assertEqual("testrealm@host.com", digest["realm"]) |
| 1673 | self.assertEqual("auth,auth-int", digest["qop"]) |
| 1674 | self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) |
| 1675 | self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) |
| 1676 | basic = res["basic"] |
| 1677 | self.assertEqual("me", basic["realm"]) |
| 1678 | |
| 1679 | def testParseWWWAuthenticateMultiple2(self): |
| 1680 | # Handle an added comma between challenges, which might get thrown in if the challenges were |
| 1681 | # originally sent in separate www-authenticate headers. |
| 1682 | res = httplib2._parse_www_authenticate( |
| 1683 | { |
| 1684 | "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" ' |
| 1685 | } |
| 1686 | ) |
| 1687 | digest = res["digest"] |
| 1688 | self.assertEqual("testrealm@host.com", digest["realm"]) |
| 1689 | self.assertEqual("auth,auth-int", digest["qop"]) |
| 1690 | self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) |
| 1691 | self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) |
| 1692 | basic = res["basic"] |
| 1693 | self.assertEqual("me", basic["realm"]) |
| 1694 | |
| 1695 | def testParseWWWAuthenticateMultiple3(self): |
| 1696 | # Handle an added comma between challenges, which might get thrown in if the challenges were |
| 1697 | # originally sent in separate www-authenticate headers. |
| 1698 | res = httplib2._parse_www_authenticate( |
| 1699 | { |
| 1700 | "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"' |
| 1701 | } |
| 1702 | ) |
| 1703 | digest = res["digest"] |
| 1704 | self.assertEqual("testrealm@host.com", digest["realm"]) |
| 1705 | self.assertEqual("auth,auth-int", digest["qop"]) |
| 1706 | self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"]) |
| 1707 | self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"]) |
| 1708 | basic = res["basic"] |
| 1709 | self.assertEqual("me", basic["realm"]) |
| 1710 | wsse = res["wsse"] |
| 1711 | self.assertEqual("foo", wsse["realm"]) |
| 1712 | self.assertEqual("UsernameToken", wsse["profile"]) |
| 1713 | |
| 1714 | def testParseWWWAuthenticateMultiple4(self): |
| 1715 | res = httplib2._parse_www_authenticate( |
| 1716 | { |
| 1717 | "www-authenticate": 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"' |
| 1718 | } |
| 1719 | ) |
| 1720 | digest = res["digest"] |
| 1721 | self.assertEqual("test-real.m@host.com", digest["realm"]) |
| 1722 | self.assertEqual("\tauth,auth-int", digest["qop"]) |
| 1723 | self.assertEqual("(*)&^&$%#", digest["nonce"]) |
| 1724 | |
| 1725 | def testParseWWWAuthenticateMoreQuoteCombos(self): |
| 1726 | res = httplib2._parse_www_authenticate( |
| 1727 | { |
| 1728 | "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true' |
| 1729 | } |
| 1730 | ) |
| 1731 | digest = res["digest"] |
| 1732 | self.assertEqual("myrealm", digest["realm"]) |
| 1733 | |
| 1734 | def testParseWWWAuthenticateMalformed(self): |
| 1735 | try: |
| 1736 | res = httplib2._parse_www_authenticate( |
| 1737 | { |
| 1738 | "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."' |
| 1739 | } |
| 1740 | ) |
| 1741 | self.fail("should raise an exception") |
| 1742 | except httplib2.MalformedHeader: |
| 1743 | pass |
| 1744 | |
| 1745 | def testDigestObject(self): |
| 1746 | credentials = ("joe", "password") |
| 1747 | host = None |
| 1748 | request_uri = "/projects/httplib2/test/digest/" |
| 1749 | headers = {} |
| 1750 | response = { |
| 1751 | "www-authenticate": 'Digest realm="myrealm", ' |
| 1752 | 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", ' |
| 1753 | 'algorithm=MD5, qop="auth"' |
| 1754 | } |
| 1755 | content = b"" |
| 1756 | |
| 1757 | d = httplib2.DigestAuthentication( |
| 1758 | credentials, host, request_uri, headers, response, content, None |
| 1759 | ) |
| 1760 | d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") |
| 1761 | our_request = "authorization: %s" % headers["authorization"] |
| 1762 | working_request = ( |
| 1763 | 'authorization: Digest username="joe", realm="myrealm", ' |
| 1764 | 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' |
| 1765 | ' uri="/projects/httplib2/test/digest/", algorithm=MD5, ' |
| 1766 | 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, ' |
| 1767 | 'nc=00000001, cnonce="33033375ec278a46"' |
| 1768 | ) |
| 1769 | self.assertEqual(our_request, working_request) |
| 1770 | |
| 1771 | def testDigestObjectWithOpaque(self): |
| 1772 | credentials = ("joe", "password") |
| 1773 | host = None |
| 1774 | request_uri = "/projects/httplib2/test/digest/" |
| 1775 | headers = {} |
| 1776 | response = { |
| 1777 | "www-authenticate": 'Digest realm="myrealm", ' |
| 1778 | 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", ' |
| 1779 | 'algorithm=MD5, qop="auth", opaque="atestopaque"' |
| 1780 | } |
| 1781 | content = "" |
| 1782 | |
| 1783 | d = httplib2.DigestAuthentication( |
| 1784 | credentials, host, request_uri, headers, response, content, None |
| 1785 | ) |
| 1786 | d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") |
| 1787 | our_request = "authorization: %s" % headers["authorization"] |
| 1788 | working_request = ( |
| 1789 | 'authorization: Digest username="joe", realm="myrealm", ' |
| 1790 | 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' |
| 1791 | ' uri="/projects/httplib2/test/digest/", algorithm=MD5, ' |
| 1792 | 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, ' |
| 1793 | 'nc=00000001, cnonce="33033375ec278a46", ' |
| 1794 | 'opaque="atestopaque"' |
| 1795 | ) |
| 1796 | self.assertEqual(our_request, working_request) |
| 1797 | |
| 1798 | def testDigestObjectStale(self): |
| 1799 | credentials = ("joe", "password") |
| 1800 | host = None |
| 1801 | request_uri = "/projects/httplib2/test/digest/" |
| 1802 | headers = {} |
| 1803 | response = httplib2.Response({}) |
| 1804 | response["www-authenticate"] = ( |
| 1805 | 'Digest realm="myrealm", ' |
| 1806 | 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' |
| 1807 | ' algorithm=MD5, qop="auth", stale=true' |
| 1808 | ) |
| 1809 | response.status = 401 |
| 1810 | content = b"" |
| 1811 | d = httplib2.DigestAuthentication( |
| 1812 | credentials, host, request_uri, headers, response, content, None |
| 1813 | ) |
| 1814 | # Returns true to force a retry |
| 1815 | self.assertTrue(d.response(response, content)) |
| 1816 | |
| 1817 | def testDigestObjectAuthInfo(self): |
| 1818 | credentials = ("joe", "password") |
| 1819 | host = None |
| 1820 | request_uri = "/projects/httplib2/test/digest/" |
| 1821 | headers = {} |
| 1822 | response = httplib2.Response({}) |
| 1823 | response["www-authenticate"] = ( |
| 1824 | 'Digest realm="myrealm", ' |
| 1825 | 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",' |
| 1826 | ' algorithm=MD5, qop="auth", stale=true' |
| 1827 | ) |
| 1828 | response["authentication-info"] = 'nextnonce="fred"' |
| 1829 | content = b"" |
| 1830 | d = httplib2.DigestAuthentication( |
| 1831 | credentials, host, request_uri, headers, response, content, None |
| 1832 | ) |
| 1833 | # Returns true to force a retry |
| 1834 | self.assertFalse(d.response(response, content)) |
| 1835 | self.assertEqual("fred", d.challenge["nonce"]) |
| 1836 | self.assertEqual(1, d.challenge["nc"]) |
| 1837 | |
| 1838 | def testWsseAlgorithm(self): |
| 1839 | digest = httplib2._wsse_username_token( |
| 1840 | "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm" |
| 1841 | ) |
| 1842 | expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY=" |
| 1843 | self.assertEqual(expected, digest) |
| 1844 | |
| 1845 | def testEnd2End(self): |
| 1846 | # one end to end header |
| 1847 | response = {"content-type": "application/atom+xml", "te": "deflate"} |
| 1848 | end2end = httplib2._get_end2end_headers(response) |
| 1849 | self.assertTrue("content-type" in end2end) |
| 1850 | self.assertTrue("te" not in end2end) |
| 1851 | self.assertTrue("connection" not in end2end) |
| 1852 | |
| 1853 | # one end to end header that gets eliminated |
| 1854 | response = { |
| 1855 | "connection": "content-type", |
| 1856 | "content-type": "application/atom+xml", |
| 1857 | "te": "deflate", |
| 1858 | } |
| 1859 | end2end = httplib2._get_end2end_headers(response) |
| 1860 | self.assertTrue("content-type" not in end2end) |
| 1861 | self.assertTrue("te" not in end2end) |
| 1862 | self.assertTrue("connection" not in end2end) |
| 1863 | |
| 1864 | # Degenerate case of no headers |
| 1865 | response = {} |
| 1866 | end2end = httplib2._get_end2end_headers(response) |
| 1867 | self.assertEqual(0, len(end2end)) |
| 1868 | |
| 1869 | # Degenerate case of connection referrring to a header not passed in |
| 1870 | response = {"connection": "content-type"} |
| 1871 | end2end = httplib2._get_end2end_headers(response) |
| 1872 | self.assertEqual(0, len(end2end)) |
| 1873 | |
| 1874 | |
| 1875 | class TestProxyInfo(unittest.TestCase): |
| 1876 | def setUp(self): |
| 1877 | self.orig_env = dict(os.environ) |
| 1878 | |
| 1879 | def tearDown(self): |
| 1880 | os.environ.clear() |
| 1881 | os.environ.update(self.orig_env) |
| 1882 | |
| 1883 | def test_from_url(self): |
| 1884 | pi = httplib2.proxy_info_from_url("http://myproxy.example.com") |
| 1885 | self.assertEqual(pi.proxy_host, "myproxy.example.com") |
| 1886 | self.assertEqual(pi.proxy_port, 80) |
| 1887 | self.assertEqual(pi.proxy_user, None) |
| 1888 | |
| 1889 | def test_from_url_ident(self): |
| 1890 | pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99") |
| 1891 | self.assertEqual(pi.proxy_host, "someproxy") |
| 1892 | self.assertEqual(pi.proxy_port, 99) |
| 1893 | self.assertEqual(pi.proxy_user, "zoidberg") |
| 1894 | self.assertEqual(pi.proxy_pass, "fish") |
| 1895 | |
| 1896 | def test_from_env(self): |
| 1897 | os.environ["http_proxy"] = "http://myproxy.example.com:8080" |
| 1898 | pi = httplib2.proxy_info_from_environment() |
| 1899 | self.assertEqual(pi.proxy_host, "myproxy.example.com") |
| 1900 | self.assertEqual(pi.proxy_port, 8080) |
| 1901 | |
| 1902 | def test_from_env_no_proxy(self): |
| 1903 | os.environ["http_proxy"] = "http://myproxy.example.com:80" |
| 1904 | os.environ["https_proxy"] = "http://myproxy.example.com:81" |
| 1905 | pi = httplib2.proxy_info_from_environment("https") |
| 1906 | self.assertEqual(pi.proxy_host, "myproxy.example.com") |
| 1907 | self.assertEqual(pi.proxy_port, 81) |
| 1908 | |
| 1909 | def test_from_env_none(self): |
| 1910 | os.environ.clear() |
| 1911 | pi = httplib2.proxy_info_from_environment() |
| 1912 | self.assertEqual(pi, None) |
| 1913 | |
| 1914 | def test_proxy_headers(self): |
| 1915 | headers = {"key0": "val0", "key1": "val1"} |
| 1916 | pi = httplib2.ProxyInfo( |
| 1917 | httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers |
| 1918 | ) |
| 1919 | self.assertEqual(pi.proxy_headers, headers) |
| 1920 | |
| 1921 | # regression: ensure that httplib2.HTTPConnectionWithTimeout initializes when proxy_info is not supplied |
| 1922 | def test_proxy_init(self): |
| 1923 | connection = httplib2.HTTPConnectionWithTimeout("www.google.com", 80) |
| 1924 | connection.request("GET", "/") |
| 1925 | connection.close() |
| 1926 | |
| 1927 | |
| 1928 | if __name__ == "__main__": |
| 1929 | unittest.main() |