blob: c1fd484e326730592b4d18299966829657dc82ea [file] [log] [blame]
Alex Yuaa1b95b2018-07-26 23:23:35 -04001#!/usr/bin/env python3
2"""A set of unit tests for httplib2.py."""
3
4__author__ = "Joe Gregorio (joe@bitworking.org)"
5__copyright__ = "Copyright 2006, Joe Gregorio"
6__contributors__ = ["Mark Pilgrim"]
7__license__ = "MIT"
8__version__ = "0.2 ($Rev: 118 $)"
9
10import base64
11import http.client
12import httplib2
13import io
14import os
15import pickle
16import socket
17import ssl
18import sys
19import time
20import unittest
21import urllib.parse
22
23base = "http://bitworking.org/projects/httplib2/test/"
24cacheDirName = ".cache"
25
26
27class CredentialsTest(unittest.TestCase):
28 def test(self):
29 c = httplib2.Credentials()
30 c.add("joe", "password")
31 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
32 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
33 c.add("fred", "password2", "wellformedweb.org")
34 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
35 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
36 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
37 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
38 c.clear()
39 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
40 c.add("fred", "password2", "wellformedweb.org")
41 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
42 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
43 self.assertEqual(0, len(list(c.iter(""))))
44
45
46class ParserTest(unittest.TestCase):
47 def testFromStd66(self):
48 self.assertEqual(
49 ("http", "example.com", "", None, None),
50 httplib2.parse_uri("http://example.com"),
51 )
52 self.assertEqual(
53 ("https", "example.com", "", None, None),
54 httplib2.parse_uri("https://example.com"),
55 )
56 self.assertEqual(
57 ("https", "example.com:8080", "", None, None),
58 httplib2.parse_uri("https://example.com:8080"),
59 )
60 self.assertEqual(
61 ("http", "example.com", "/", None, None),
62 httplib2.parse_uri("http://example.com/"),
63 )
64 self.assertEqual(
65 ("http", "example.com", "/path", None, None),
66 httplib2.parse_uri("http://example.com/path"),
67 )
68 self.assertEqual(
69 ("http", "example.com", "/path", "a=1&b=2", None),
70 httplib2.parse_uri("http://example.com/path?a=1&b=2"),
71 )
72 self.assertEqual(
73 ("http", "example.com", "/path", "a=1&b=2", "fred"),
74 httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
75 )
76 self.assertEqual(
77 ("http", "example.com", "/path", "a=1&b=2", "fred"),
78 httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"),
79 )
80
81
82class UrlNormTest(unittest.TestCase):
83 def test(self):
84 self.assertEqual(
85 "http://example.org/", httplib2.urlnorm("http://example.org")[-1]
86 )
87 self.assertEqual(
88 "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1]
89 )
90 self.assertEqual(
91 "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1]
92 )
93 self.assertEqual(
94 "http://example.org/mypath?a=b",
95 httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1],
96 )
97 self.assertEqual(
98 "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1]
99 )
100 self.assertEqual(
101 httplib2.urlnorm("http://localhost:80/"),
102 httplib2.urlnorm("HTTP://LOCALHOST:80"),
103 )
104 try:
105 httplib2.urlnorm("/")
106 self.fail("Non-absolute URIs should raise an exception")
107 except httplib2.RelativeURIError:
108 pass
109
110
111class UrlSafenameTest(unittest.TestCase):
112 def test(self):
113 # Test that different URIs end up generating different safe names
114 self.assertEqual(
115 "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f",
116 httplib2.safename("http://example.org/fred/?a=b"),
117 )
118 self.assertEqual(
119 "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b",
120 httplib2.safename("http://example.org/fred?/a=b"),
121 )
122 self.assertEqual(
123 "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968",
124 httplib2.safename("http://www.example.org/fred?/a=b"),
125 )
126 self.assertEqual(
127 httplib2.safename(httplib2.urlnorm("http://www")[-1]),
128 httplib2.safename(httplib2.urlnorm("http://WWW")[-1]),
129 )
130 self.assertEqual(
131 "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d",
132 httplib2.safename("https://www.example.org/fred?/a=b"),
133 )
134 self.assertNotEqual(
135 httplib2.safename("http://www"), httplib2.safename("https://www")
136 )
137 # Test the max length limits
138 uri = "http://" + ("w" * 200) + ".org"
139 uri2 = "http://" + ("w" * 201) + ".org"
140 self.assertNotEqual(httplib2.safename(uri2), httplib2.safename(uri))
141 # Max length should be 200 + 1 (",") + 32
142 self.assertEqual(233, len(httplib2.safename(uri2)))
143 self.assertEqual(233, len(httplib2.safename(uri)))
144 # Unicode
145 if sys.version_info >= (2, 3):
146 self.assertEqual(
147 "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193",
148 httplib2.safename("http://\u2304.org/fred/?a=b"),
149 )
150
151
152class _MyResponse(io.BytesIO):
153 def __init__(self, body, **kwargs):
154 io.BytesIO.__init__(self, body)
155 self.headers = kwargs
156
157 def items(self):
158 return self.headers.items()
159
160 def iteritems(self):
161 return iter(self.headers.items())
162
163
164class _MyHTTPConnection(object):
165 "This class is just a mock of httplib.HTTPConnection used for testing"
166
167 def __init__(
168 self,
169 host,
170 port=None,
171 key_file=None,
172 cert_file=None,
173 strict=None,
174 timeout=None,
175 proxy_info=None,
176 ):
177 self.host = host
178 self.port = port
179 self.timeout = timeout
180 self.log = ""
181 self.sock = None
182
183 def set_debuglevel(self, level):
184 pass
185
186 def connect(self):
187 "Connect to a host on a given port."
188 pass
189
190 def close(self):
191 pass
192
193 def request(self, method, request_uri, body, headers):
194 pass
195
196 def getresponse(self):
197 return _MyResponse(b"the body", status="200")
198
199
200class _MyHTTPBadStatusConnection(object):
201 "Mock of httplib.HTTPConnection that raises BadStatusLine."
202
203 num_calls = 0
204
205 def __init__(
206 self,
207 host,
208 port=None,
209 key_file=None,
210 cert_file=None,
211 strict=None,
212 timeout=None,
213 proxy_info=None,
214 ):
215 self.host = host
216 self.port = port
217 self.timeout = timeout
218 self.log = ""
219 self.sock = None
220 _MyHTTPBadStatusConnection.num_calls = 0
221
222 def set_debuglevel(self, level):
223 pass
224
225 def connect(self):
226 pass
227
228 def close(self):
229 pass
230
231 def request(self, method, request_uri, body, headers):
232 pass
233
234 def getresponse(self):
235 _MyHTTPBadStatusConnection.num_calls += 1
236 raise http.client.BadStatusLine("")
237
238
239class HttpTest(unittest.TestCase):
240 def setUp(self):
241 if os.path.exists(cacheDirName):
242 [
243 os.remove(os.path.join(cacheDirName, file))
244 for file in os.listdir(cacheDirName)
245 ]
246 self.http = httplib2.Http(cacheDirName)
247 self.http.clear_credentials()
248
249 def testIPv6NoSSL(self):
250 try:
251 self.http.request("http://[::1]/")
252 except socket.gaierror:
253 self.fail("should get the address family right for IPv6")
254 except socket.error:
255 # Even if IPv6 isn't installed on a machine it should just raise socket.error
256 pass
257
258 def testIPv6SSL(self):
259 try:
260 self.http.request("https://[::1]/")
261 except socket.gaierror:
262 self.fail("should get the address family right for IPv6")
263 except socket.error:
264 # Even if IPv6 isn't installed on a machine it should just raise socket.error
265 pass
266
267 def testConnectionType(self):
268 self.http.force_exception_to_status_code = False
269 response, content = self.http.request(
270 "http://bitworking.org", connection_type=_MyHTTPConnection
271 )
272 self.assertEqual(response["content-location"], "http://bitworking.org")
273 self.assertEqual(content, b"the body")
274
275 def testBadStatusLineRetry(self):
276 old_retries = httplib2.RETRIES
277 httplib2.RETRIES = 1
278 self.http.force_exception_to_status_code = False
279 try:
280 response, content = self.http.request(
281 "http://bitworking.org", connection_type=_MyHTTPBadStatusConnection
282 )
283 except http.client.BadStatusLine:
284 self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls)
285 httplib2.RETRIES = old_retries
286
287 def testGetUnknownServer(self):
288 self.http.force_exception_to_status_code = False
289 try:
290 self.http.request("http://fred.bitworking.org/")
291 self.fail(
292 "An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server."
293 )
294 except httplib2.ServerNotFoundError:
295 pass
296
297 # Now test with exceptions turned off
298 self.http.force_exception_to_status_code = True
299
300 (response, content) = self.http.request("http://fred.bitworking.org/")
301 self.assertEqual(response["content-type"], "text/plain")
302 self.assertTrue(content.startswith(b"Unable to find"))
303 self.assertEqual(response.status, 400)
304
305 def testGetConnectionRefused(self):
306 self.http.force_exception_to_status_code = False
307 try:
308 self.http.request("http://localhost:7777/")
309 self.fail("An socket.error exception must be thrown on Connection Refused.")
310 except socket.error:
311 pass
312
313 # Now test with exceptions turned off
314 self.http.force_exception_to_status_code = True
315
316 (response, content) = self.http.request("http://localhost:7777/")
317 self.assertEqual(response["content-type"], "text/plain")
318 self.assertTrue(b"Connection refused" in content)
319 self.assertEqual(response.status, 400)
320
321 def testGetIRI(self):
322 if sys.version_info >= (2, 3):
323 uri = urllib.parse.urljoin(
324 base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}"
325 )
326 (response, content) = self.http.request(uri, "GET")
327 d = self.reflector(content)
328 self.assertTrue("QUERY_STRING" in d)
329 self.assertTrue(d["QUERY_STRING"].find("%D0%82") > 0)
330
331 def testGetIsDefaultMethod(self):
332 # Test that GET is the default method
333 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
334 (response, content) = self.http.request(uri)
335 self.assertEqual(response["x-method"], "GET")
336
337 def testDifferentMethods(self):
338 # Test that all methods can be used
339 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
340 for method in ["GET", "PUT", "DELETE", "POST"]:
341 (response, content) = self.http.request(uri, method, body=b" ")
342 self.assertEqual(response["x-method"], method)
343
344 def testHeadRead(self):
345 # Test that we don't try to read the response of a HEAD request
346 # since httplib blocks response.read() for HEAD requests.
347 # Oddly enough this doesn't appear as a problem when doing HEAD requests
348 # against Apache servers.
349 uri = "http://www.google.com/"
350 (response, content) = self.http.request(uri, "HEAD")
351 self.assertEqual(response.status, 200)
352 self.assertEqual(content, b"")
353
354 def testGetNoCache(self):
355 # Test that can do a GET w/o the cache turned on.
356 http = httplib2.Http()
357 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
358 (response, content) = http.request(uri, "GET")
359 self.assertEqual(response.status, 200)
360 self.assertEqual(response.previous, None)
361
362 def testGetOnlyIfCachedCacheHit(self):
363 # Test that can do a GET with cache and 'only-if-cached'
364 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
365 (response, content) = self.http.request(uri, "GET")
366 (response, content) = self.http.request(
367 uri, "GET", headers={"cache-control": "only-if-cached"}
368 )
369 self.assertEqual(response.fromcache, True)
370 self.assertEqual(response.status, 200)
371
372 def testGetOnlyIfCachedCacheMiss(self):
373 # Test that can do a GET with no cache with 'only-if-cached'
374 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
375 (response, content) = self.http.request(
376 uri, "GET", headers={"cache-control": "only-if-cached"}
377 )
378 self.assertEqual(response.fromcache, False)
379 self.assertEqual(response.status, 504)
380
381 def testGetOnlyIfCachedNoCacheAtAll(self):
382 # Test that can do a GET with no cache with 'only-if-cached'
383 # Of course, there might be an intermediary beyond us
384 # that responds to the 'only-if-cached', so this
385 # test can't really be guaranteed to pass.
386 http = httplib2.Http()
387 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
388 (response, content) = http.request(
389 uri, "GET", headers={"cache-control": "only-if-cached"}
390 )
391 self.assertEqual(response.fromcache, False)
392 self.assertEqual(response.status, 504)
393
394 def testUserAgent(self):
395 # Test that we provide a default user-agent
396 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
397 (response, content) = self.http.request(uri, "GET")
398 self.assertEqual(response.status, 200)
399 self.assertTrue(content.startswith(b"Python-httplib2/"))
400
401 def testUserAgentNonDefault(self):
402 # Test that the default user-agent can be over-ridden
403
404 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
405 (response, content) = self.http.request(
406 uri, "GET", headers={"User-Agent": "fred/1.0"}
407 )
408 self.assertEqual(response.status, 200)
409 self.assertTrue(content.startswith(b"fred/1.0"))
410
411 def testGet300WithLocation(self):
412 # Test the we automatically follow 300 redirects if a Location: header is provided
413 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
414 (response, content) = self.http.request(uri, "GET")
415 self.assertEqual(response.status, 200)
416 self.assertEqual(content, b"This is the final destination.\n")
417 self.assertEqual(response.previous.status, 300)
418 self.assertEqual(response.previous.fromcache, False)
419
420 # Confirm that the intermediate 300 is not cached
421 (response, content) = self.http.request(uri, "GET")
422 self.assertEqual(response.status, 200)
423 self.assertEqual(content, b"This is the final destination.\n")
424 self.assertEqual(response.previous.status, 300)
425 self.assertEqual(response.previous.fromcache, False)
426
427 def testGet300WithLocationNoRedirect(self):
428 # Test the we automatically follow 300 redirects if a Location: header is provided
429 self.http.follow_redirects = False
430 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
431 (response, content) = self.http.request(uri, "GET")
432 self.assertEqual(response.status, 300)
433
434 def testGet300WithoutLocation(self):
435 # Not giving a Location: header in a 300 response is acceptable
436 # In which case we just return the 300 response
437 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
438 (response, content) = self.http.request(uri, "GET")
439 self.assertEqual(response.status, 300)
440 self.assertTrue(response["content-type"].startswith("text/html"))
441 self.assertEqual(response.previous, None)
442
443 def testGet301(self):
444 # Test that we automatically follow 301 redirects
445 # and that we cache the 301 response
446 uri = urllib.parse.urljoin(base, "301/onestep.asis")
447 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
448 (response, content) = self.http.request(uri, "GET")
449 self.assertEqual(response.status, 200)
450 self.assertTrue("content-location" in response)
451 self.assertEqual(response["content-location"], destination)
452 self.assertEqual(content, b"This is the final destination.\n")
453 self.assertEqual(response.previous.status, 301)
454 self.assertEqual(response.previous.fromcache, False)
455
456 (response, content) = self.http.request(uri, "GET")
457 self.assertEqual(response.status, 200)
458 self.assertEqual(response["content-location"], destination)
459 self.assertEqual(content, b"This is the final destination.\n")
460 self.assertEqual(response.previous.status, 301)
461 self.assertEqual(response.previous.fromcache, True)
462
463 def testHead301(self):
464 # Test that we automatically follow 301 redirects
465 uri = urllib.parse.urljoin(base, "301/onestep.asis")
466 (response, content) = self.http.request(uri, "HEAD")
467 self.assertEqual(response.status, 200)
468 self.assertEqual(response.previous.status, 301)
469 self.assertEqual(response.previous.fromcache, False)
470
471 def testGet301NoRedirect(self):
472 # Test that we automatically follow 301 redirects
473 # and that we cache the 301 response
474 self.http.follow_redirects = False
475 uri = urllib.parse.urljoin(base, "301/onestep.asis")
476 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
477 (response, content) = self.http.request(uri, "GET")
478 self.assertEqual(response.status, 301)
479
480 def testGet302(self):
481 # Test that we automatically follow 302 redirects
482 # and that we DO NOT cache the 302 response
483 uri = urllib.parse.urljoin(base, "302/onestep.asis")
484 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
485 (response, content) = self.http.request(uri, "GET")
486 self.assertEqual(response.status, 200)
487 self.assertEqual(response["content-location"], destination)
488 self.assertEqual(content, b"This is the final destination.\n")
489 self.assertEqual(response.previous.status, 302)
490 self.assertEqual(response.previous.fromcache, False)
491
492 uri = urllib.parse.urljoin(base, "302/onestep.asis")
493 (response, content) = self.http.request(uri, "GET")
494 self.assertEqual(response.status, 200)
495 self.assertEqual(response.fromcache, True)
496 self.assertEqual(response["content-location"], destination)
497 self.assertEqual(content, b"This is the final destination.\n")
498 self.assertEqual(response.previous.status, 302)
499 self.assertEqual(response.previous.fromcache, False)
500 self.assertEqual(response.previous["content-location"], uri)
501
502 uri = urllib.parse.urljoin(base, "302/twostep.asis")
503
504 (response, content) = self.http.request(uri, "GET")
505 self.assertEqual(response.status, 200)
506 self.assertEqual(response.fromcache, True)
507 self.assertEqual(content, b"This is the final destination.\n")
508 self.assertEqual(response.previous.status, 302)
509 self.assertEqual(response.previous.fromcache, False)
510
511 def testGet302RedirectionLimit(self):
512 # Test that we can set a lower redirection limit
513 # and that we raise an exception when we exceed
514 # that limit.
515 self.http.force_exception_to_status_code = False
516
517 uri = urllib.parse.urljoin(base, "302/twostep.asis")
518 try:
519 (response, content) = self.http.request(uri, "GET", redirections=1)
520 self.fail("This should not happen")
521 except httplib2.RedirectLimit:
522 pass
523 except Exception as e:
524 self.fail("Threw wrong kind of exception ")
525
526 # Re-run the test with out the exceptions
527 self.http.force_exception_to_status_code = True
528
529 (response, content) = self.http.request(uri, "GET", redirections=1)
530 self.assertEqual(response.status, 500)
531 self.assertTrue(response.reason.startswith("Redirected more"))
532 self.assertEqual("302", response["status"])
533 self.assertTrue(content.startswith(b"<html>"))
534 self.assertTrue(response.previous != None)
535
536 def testGet302NoLocation(self):
537 # Test that we throw an exception when we get
538 # a 302 with no Location: header.
539 self.http.force_exception_to_status_code = False
540 uri = urllib.parse.urljoin(base, "302/no-location.asis")
541 try:
542 (response, content) = self.http.request(uri, "GET")
543 self.fail("Should never reach here")
544 except httplib2.RedirectMissingLocation:
545 pass
546 except Exception as e:
547 self.fail("Threw wrong kind of exception ")
548
549 # Re-run the test with out the exceptions
550 self.http.force_exception_to_status_code = True
551
552 (response, content) = self.http.request(uri, "GET")
553 self.assertEqual(response.status, 500)
554 self.assertTrue(response.reason.startswith("Redirected but"))
555 self.assertEqual("302", response["status"])
556 self.assertTrue(content.startswith(b"This is content"))
557
558 def testGet301ViaHttps(self):
559 # Google always redirects to http://google.com
560 (response, content) = self.http.request("https://code.google.com/apis/", "GET")
561 self.assertEqual(200, response.status)
562 self.assertEqual(301, response.previous.status)
563
564 def testGetViaHttps(self):
565 # Test that we can handle HTTPS
566 (response, content) = self.http.request("https://google.com/adsense/", "GET")
567 self.assertEqual(200, response.status)
568
569 def testGetViaHttpsSpecViolationOnLocation(self):
570 # Test that we follow redirects through HTTPS
571 # even if they violate the spec by including
572 # a relative Location: header instead of an
573 # absolute one.
574 (response, content) = self.http.request("https://google.com/adsense", "GET")
575 self.assertEqual(200, response.status)
576 self.assertNotEqual(None, response.previous)
577
578 def testGetViaHttpsKeyCert(self):
579 # At this point I can only test
580 # that the key and cert files are passed in
581 # correctly to httplib. It would be nice to have
582 # a real https endpoint to test against.
583 http = httplib2.Http(timeout=2)
584
585 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
586 try:
587 (response, content) = http.request("https://bitworking.org", "GET")
588 except AttributeError:
589 self.assertEqual(
590 http.connections["https:bitworking.org"].key_file, "akeyfile"
591 )
592 self.assertEqual(
593 http.connections["https:bitworking.org"].cert_file, "acertfile"
594 )
595 except IOError:
596 # Skip on 3.2
597 pass
598
599 try:
600 (response, content) = http.request("https://notthere.bitworking.org", "GET")
601 except httplib2.ServerNotFoundError:
602 self.assertEqual(
603 http.connections["https:notthere.bitworking.org"].key_file, None
604 )
605 self.assertEqual(
606 http.connections["https:notthere.bitworking.org"].cert_file, None
607 )
608 except IOError:
609 # Skip on 3.2
610 pass
611
612 def testSslCertValidation(self):
613 # Test that we get an ssl.SSLError when specifying a non-existent CA
614 # certs file.
615 http = httplib2.Http(ca_certs="/nosuchfile")
616 self.assertRaises(IOError, http.request, "https://www.google.com/", "GET")
617
618 # Test that we get a SSLHandshakeError if we try to access
619 # https://www.google.com, using a CA cert file that doesn't contain
620 # the CA Google uses (i.e., simulating a cert that's not signed by a
621 # trusted CA).
622 other_ca_certs = os.path.join(
623 os.path.dirname(os.path.abspath(httplib2.__file__)),
624 "test",
625 "other_cacerts.txt",
626 )
627 http = httplib2.Http(ca_certs=other_ca_certs)
628 self.assertRaises(ssl.SSLError, http.request, "https://www.google.com/", "GET")
629
630 def testSniHostnameValidation(self):
631 self.http.request("https://google.com/", method="GET")
632
633 def testGet303(self):
634 # Do a follow-up GET on a Location: header
635 # returned from a POST that gave a 303.
636 uri = urllib.parse.urljoin(base, "303/303.cgi")
637 (response, content) = self.http.request(uri, "POST", " ")
638 self.assertEqual(response.status, 200)
639 self.assertEqual(content, b"This is the final destination.\n")
640 self.assertEqual(response.previous.status, 303)
641
642 def testGet303NoRedirect(self):
643 # Do a follow-up GET on a Location: header
644 # returned from a POST that gave a 303.
645 self.http.follow_redirects = False
646 uri = urllib.parse.urljoin(base, "303/303.cgi")
647 (response, content) = self.http.request(uri, "POST", " ")
648 self.assertEqual(response.status, 303)
649
650 def test303ForDifferentMethods(self):
651 # Test that all methods can be used
652 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
653 for (method, method_on_303) in [
654 ("PUT", "GET"),
655 ("DELETE", "GET"),
656 ("POST", "GET"),
657 ("GET", "GET"),
658 ("HEAD", "GET"),
659 ]:
660 (response, content) = self.http.request(uri, method, body=b" ")
661 self.assertEqual(response["x-method"], method_on_303)
662
663 def testGet304(self):
664 # Test that we use ETags properly to validate our cache
665 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
666 (response, content) = self.http.request(
667 uri, "GET", headers={"accept-encoding": "identity"}
668 )
669 self.assertNotEqual(response["etag"], "")
670
671 (response, content) = self.http.request(
672 uri, "GET", headers={"accept-encoding": "identity"}
673 )
674 (response, content) = self.http.request(
675 uri,
676 "GET",
677 headers={"accept-encoding": "identity", "cache-control": "must-revalidate"},
678 )
679 self.assertEqual(response.status, 200)
680 self.assertEqual(response.fromcache, True)
681
682 cache_file_name = os.path.join(
683 cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1])
684 )
685 f = open(cache_file_name, "r")
686 status_line = f.readline()
687 f.close()
688
689 self.assertTrue(status_line.startswith("status:"))
690
691 (response, content) = self.http.request(
692 uri, "HEAD", headers={"accept-encoding": "identity"}
693 )
694 self.assertEqual(response.status, 200)
695 self.assertEqual(response.fromcache, True)
696
697 (response, content) = self.http.request(
698 uri, "GET", headers={"accept-encoding": "identity", "range": "bytes=0-0"}
699 )
700 self.assertEqual(response.status, 206)
701 self.assertEqual(response.fromcache, False)
702
703 def testGetIgnoreEtag(self):
704 # Test that we can forcibly ignore ETags
705 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
706 (response, content) = self.http.request(
707 uri, "GET", headers={"accept-encoding": "identity"}
708 )
709 self.assertNotEqual(response["etag"], "")
710
711 (response, content) = self.http.request(
712 uri,
713 "GET",
714 headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
715 )
716 d = self.reflector(content)
717 self.assertTrue("HTTP_IF_NONE_MATCH" in d)
718
719 self.http.ignore_etag = True
720 (response, content) = self.http.request(
721 uri,
722 "GET",
723 headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
724 )
725 d = self.reflector(content)
726 self.assertEqual(response.fromcache, False)
727 self.assertFalse("HTTP_IF_NONE_MATCH" in d)
728
729 def testOverrideEtag(self):
730 # Test that we can forcibly ignore ETags
731 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
732 (response, content) = self.http.request(
733 uri, "GET", headers={"accept-encoding": "identity"}
734 )
735 self.assertNotEqual(response["etag"], "")
736
737 (response, content) = self.http.request(
738 uri,
739 "GET",
740 headers={"accept-encoding": "identity", "cache-control": "max-age=0"},
741 )
742 d = self.reflector(content)
743 self.assertTrue("HTTP_IF_NONE_MATCH" in d)
744 self.assertNotEqual(d["HTTP_IF_NONE_MATCH"], "fred")
745
746 (response, content) = self.http.request(
747 uri,
748 "GET",
749 headers={
750 "accept-encoding": "identity",
751 "cache-control": "max-age=0",
752 "if-none-match": "fred",
753 },
754 )
755 d = self.reflector(content)
756 self.assertTrue("HTTP_IF_NONE_MATCH" in d)
757 self.assertEqual(d["HTTP_IF_NONE_MATCH"], "fred")
758
759 # MAP-commented this out because it consistently fails
760 # def testGet304EndToEnd(self):
761 # # Test that end to end headers get overwritten in the cache
762 # uri = urllib.parse.urljoin(base, "304/end2end.cgi")
763 # (response, content) = self.http.request(uri, "GET")
764 # self.assertNotEqual(response['etag'], "")
765 # old_date = response['date']
766 # time.sleep(2)
767 #
768 # (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
769 # # The response should be from the cache, but the Date: header should be updated.
770 # new_date = response['date']
771 # self.assertNotEqual(new_date, old_date)
772 # self.assertEqual(response.status, 200)
773 # self.assertEqual(response.fromcache, True)
774
775 def testGet304LastModified(self):
776 # Test that we can still handle a 304
777 # by only using the last-modified cache validator.
778 uri = urllib.parse.urljoin(
779 base, "304/last-modified-only/last-modified-only.txt"
780 )
781 (response, content) = self.http.request(uri, "GET")
782
783 self.assertNotEqual(response["last-modified"], "")
784 (response, content) = self.http.request(uri, "GET")
785 (response, content) = self.http.request(uri, "GET")
786 self.assertEqual(response.status, 200)
787 self.assertEqual(response.fromcache, True)
788
789 def testGet307(self):
790 # Test that we do follow 307 redirects but
791 # do not cache the 307
792 uri = urllib.parse.urljoin(base, "307/onestep.asis")
793 (response, content) = self.http.request(uri, "GET")
794 self.assertEqual(response.status, 200)
795 self.assertEqual(content, b"This is the final destination.\n")
796 self.assertEqual(response.previous.status, 307)
797 self.assertEqual(response.previous.fromcache, False)
798
799 (response, content) = self.http.request(uri, "GET")
800 self.assertEqual(response.status, 200)
801 self.assertEqual(response.fromcache, True)
802 self.assertEqual(content, b"This is the final destination.\n")
803 self.assertEqual(response.previous.status, 307)
804 self.assertEqual(response.previous.fromcache, False)
805
806 def testGet410(self):
807 # Test that we pass 410's through
808 uri = urllib.parse.urljoin(base, "410/410.asis")
809 (response, content) = self.http.request(uri, "GET")
810 self.assertEqual(response.status, 410)
811
812 def testVaryHeaderSimple(self):
813 """RFC 2616 13.6 When the cache receives a subsequent request whose Request-URI specifies one or more cache entries including a Vary header field, the cache MUST NOT use such a cache entry to construct a response to the new request unless all of the selecting request-headers present in the new request match the corresponding stored request-headers in the original request.
814
815 """
816 # test that the vary header is sent
817 uri = urllib.parse.urljoin(base, "vary/accept.asis")
818 (response, content) = self.http.request(
819 uri, "GET", headers={"Accept": "text/plain"}
820 )
821 self.assertEqual(response.status, 200)
822 self.assertTrue("vary" in response)
823
824 # get the resource again, from the cache since accept header in this
825 # request is the same as the request
826 (response, content) = self.http.request(
827 uri, "GET", headers={"Accept": "text/plain"}
828 )
829 self.assertEqual(response.status, 200)
830 self.assertEqual(response.fromcache, True, msg="Should be from cache")
831
832 # get the resource again, not from cache since Accept headers does not match
833 (response, content) = self.http.request(
834 uri, "GET", headers={"Accept": "text/html"}
835 )
836 self.assertEqual(response.status, 200)
837 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
838
839 # get the resource again, without any Accept header, so again no match
840 (response, content) = self.http.request(uri, "GET")
841 self.assertEqual(response.status, 200)
842 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
843
844 def testNoVary(self):
845 pass
846 # when there is no vary, a different Accept header (e.g.) should not
847 # impact if the cache is used
848 # test that the vary header is not sent
849 # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
850 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
851 # self.assertEqual(response.status, 200)
852 # self.assertFalse('vary' in response)
853 #
854 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
855 # self.assertEqual(response.status, 200)
856 # self.assertEqual(response.fromcache, True, msg="Should be from cache")
857 #
858 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
859 # self.assertEqual(response.status, 200)
860 # self.assertEqual(response.fromcache, True, msg="Should be from cache")
861
862 def testVaryHeaderDouble(self):
863 uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
864 (response, content) = self.http.request(
865 uri,
866 "GET",
867 headers={
868 "Accept": "text/plain",
869 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
870 },
871 )
872 self.assertEqual(response.status, 200)
873 self.assertTrue("vary" in response)
874
875 # we are from cache
876 (response, content) = self.http.request(
877 uri,
878 "GET",
879 headers={
880 "Accept": "text/plain",
881 "Accept-Language": "da, en-gb;q=0.8, en;q=0.7",
882 },
883 )
884 self.assertEqual(response.fromcache, True, msg="Should be from cache")
885
886 (response, content) = self.http.request(
887 uri, "GET", headers={"Accept": "text/plain"}
888 )
889 self.assertEqual(response.status, 200)
890 self.assertEqual(response.fromcache, False)
891
892 # get the resource again, not from cache, varied headers don't match exact
893 (response, content) = self.http.request(
894 uri, "GET", headers={"Accept-Language": "da"}
895 )
896 self.assertEqual(response.status, 200)
897 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
898
899 def testVaryUnusedHeader(self):
900 # A header's value is not considered to vary if it's not used at all.
901 uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
902 (response, content) = self.http.request(
903 uri, "GET", headers={"Accept": "text/plain"}
904 )
905 self.assertEqual(response.status, 200)
906 self.assertTrue("vary" in response)
907
908 # we are from cache
909 (response, content) = self.http.request(
910 uri, "GET", headers={"Accept": "text/plain"}
911 )
912 self.assertEqual(response.fromcache, True, msg="Should be from cache")
913
914 def testHeadGZip(self):
915 # Test that we don't try to decompress a HEAD response
916 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
917 (response, content) = self.http.request(uri, "HEAD")
918 self.assertEqual(response.status, 200)
919 self.assertNotEqual(int(response["content-length"]), 0)
920 self.assertEqual(content, b"")
921
922 def testGetGZip(self):
923 # Test that we support gzip compression
924 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
925 (response, content) = self.http.request(uri, "GET")
926 self.assertEqual(response.status, 200)
927 self.assertFalse("content-encoding" in response)
928 self.assertTrue("-content-encoding" in response)
929 self.assertEqual(
930 int(response["content-length"]), len(b"This is the final destination.\n")
931 )
932 self.assertEqual(content, b"This is the final destination.\n")
933
934 def testPostAndGZipResponse(self):
935 uri = urllib.parse.urljoin(base, "gzip/post.cgi")
936 (response, content) = self.http.request(uri, "POST", body=" ")
937 self.assertEqual(response.status, 200)
938 self.assertFalse("content-encoding" in response)
939 self.assertTrue("-content-encoding" in response)
940
941 def testGetGZipFailure(self):
942 # Test that we raise a good exception when the gzip fails
943 self.http.force_exception_to_status_code = False
944 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
945 try:
946 (response, content) = self.http.request(uri, "GET")
947 self.fail("Should never reach here")
948 except httplib2.FailedToDecompressContent:
949 pass
950 except Exception:
951 self.fail("Threw wrong kind of exception")
952
953 # Re-run the test with out the exceptions
954 self.http.force_exception_to_status_code = True
955
956 (response, content) = self.http.request(uri, "GET")
957 self.assertEqual(response.status, 500)
958 self.assertTrue(response.reason.startswith("Content purported"))
959
960 def testIndividualTimeout(self):
961 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
962 http = httplib2.Http(timeout=1)
963 http.force_exception_to_status_code = True
964
965 (response, content) = http.request(uri)
966 self.assertEqual(response.status, 408)
967 self.assertTrue(response.reason.startswith("Request Timeout"))
968 self.assertTrue(content.startswith(b"Request Timeout"))
969
970 def testGetDeflate(self):
971 # Test that we support deflate compression
972 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
973 (response, content) = self.http.request(uri, "GET")
974 self.assertEqual(response.status, 200)
975 self.assertFalse("content-encoding" in response)
976 self.assertEqual(
977 int(response["content-length"]), len("This is the final destination.")
978 )
979 self.assertEqual(content, b"This is the final destination.")
980
981 def testGetDeflateFailure(self):
982 # Test that we raise a good exception when the deflate fails
983 self.http.force_exception_to_status_code = False
984
985 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
986 try:
987 (response, content) = self.http.request(uri, "GET")
988 self.fail("Should never reach here")
989 except httplib2.FailedToDecompressContent:
990 pass
991 except Exception:
992 self.fail("Threw wrong kind of exception")
993
994 # Re-run the test with out the exceptions
995 self.http.force_exception_to_status_code = True
996
997 (response, content) = self.http.request(uri, "GET")
998 self.assertEqual(response.status, 500)
999 self.assertTrue(response.reason.startswith("Content purported"))
1000
1001 def testGetDuplicateHeaders(self):
1002 # Test that duplicate headers get concatenated via ','
1003 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
1004 (response, content) = self.http.request(uri, "GET")
1005 self.assertEqual(response.status, 200)
1006 self.assertEqual(content, b"This is content\n")
1007 self.assertEqual(
1008 response["link"].split(",")[0],
1009 '<http://bitworking.org>; rel="home"; title="BitWorking"',
1010 )
1011
1012 def testGetCacheControlNoCache(self):
1013 # Test Cache-Control: no-cache on requests
1014 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1015 (response, content) = self.http.request(
1016 uri, "GET", headers={"accept-encoding": "identity"}
1017 )
1018 self.assertNotEqual(response["etag"], "")
1019 (response, content) = self.http.request(
1020 uri, "GET", headers={"accept-encoding": "identity"}
1021 )
1022 self.assertEqual(response.status, 200)
1023 self.assertEqual(response.fromcache, True)
1024
1025 (response, content) = self.http.request(
1026 uri,
1027 "GET",
1028 headers={"accept-encoding": "identity", "Cache-Control": "no-cache"},
1029 )
1030 self.assertEqual(response.status, 200)
1031 self.assertEqual(response.fromcache, False)
1032
1033 def testGetCacheControlPragmaNoCache(self):
1034 # Test Pragma: no-cache on requests
1035 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1036 (response, content) = self.http.request(
1037 uri, "GET", headers={"accept-encoding": "identity"}
1038 )
1039 self.assertNotEqual(response["etag"], "")
1040 (response, content) = self.http.request(
1041 uri, "GET", headers={"accept-encoding": "identity"}
1042 )
1043 self.assertEqual(response.status, 200)
1044 self.assertEqual(response.fromcache, True)
1045
1046 (response, content) = self.http.request(
1047 uri, "GET", headers={"accept-encoding": "identity", "Pragma": "no-cache"}
1048 )
1049 self.assertEqual(response.status, 200)
1050 self.assertEqual(response.fromcache, False)
1051
1052 def testGetCacheControlNoStoreRequest(self):
1053 # A no-store request means that the response should not be stored.
1054 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1055
1056 (response, content) = self.http.request(
1057 uri, "GET", headers={"Cache-Control": "no-store"}
1058 )
1059 self.assertEqual(response.status, 200)
1060 self.assertEqual(response.fromcache, False)
1061
1062 (response, content) = self.http.request(
1063 uri, "GET", headers={"Cache-Control": "no-store"}
1064 )
1065 self.assertEqual(response.status, 200)
1066 self.assertEqual(response.fromcache, False)
1067
1068 def testGetCacheControlNoStoreResponse(self):
1069 # A no-store response means that the response should not be stored.
1070 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
1071
1072 (response, content) = self.http.request(uri, "GET")
1073 self.assertEqual(response.status, 200)
1074 self.assertEqual(response.fromcache, False)
1075
1076 (response, content) = self.http.request(uri, "GET")
1077 self.assertEqual(response.status, 200)
1078 self.assertEqual(response.fromcache, False)
1079
1080 def testGetCacheControlNoCacheNoStoreRequest(self):
1081 # Test that a no-store, no-cache clears the entry from the cache
1082 # even if it was cached previously.
1083 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1084
1085 (response, content) = self.http.request(uri, "GET")
1086 (response, content) = self.http.request(uri, "GET")
1087 self.assertEqual(response.fromcache, True)
1088 (response, content) = self.http.request(
1089 uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
1090 )
1091 (response, content) = self.http.request(
1092 uri, "GET", headers={"Cache-Control": "no-store, no-cache"}
1093 )
1094 self.assertEqual(response.status, 200)
1095 self.assertEqual(response.fromcache, False)
1096
1097 def testUpdateInvalidatesCache(self):
1098 # Test that calling PUT or DELETE on a
1099 # URI that is cache invalidates that cache.
1100 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
1101
1102 (response, content) = self.http.request(uri, "GET")
1103 (response, content) = self.http.request(uri, "GET")
1104 self.assertEqual(response.fromcache, True)
1105 (response, content) = self.http.request(uri, "DELETE")
1106 self.assertEqual(response.status, 405)
1107
1108 (response, content) = self.http.request(uri, "GET")
1109 self.assertEqual(response.fromcache, False)
1110
1111 def testUpdateUsesCachedETag(self):
1112 # Test that we natively support http://www.w3.org/1999/04/Editing/
1113 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
1114
1115 (response, content) = self.http.request(uri, "GET")
1116 self.assertEqual(response.status, 200)
1117 self.assertEqual(response.fromcache, False)
1118 (response, content) = self.http.request(uri, "GET")
1119 self.assertEqual(response.status, 200)
1120 self.assertEqual(response.fromcache, True)
1121 (response, content) = self.http.request(uri, "PUT", body="foo")
1122 self.assertEqual(response.status, 200)
1123 (response, content) = self.http.request(uri, "PUT", body="foo")
1124 self.assertEqual(response.status, 412)
1125
1126 def testUpdatePatchUsesCachedETag(self):
1127 # Test that we natively support http://www.w3.org/1999/04/Editing/
1128 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
1129
1130 (response, content) = self.http.request(uri, "GET")
1131 self.assertEqual(response.status, 200)
1132 self.assertEqual(response.fromcache, False)
1133 (response, content) = self.http.request(uri, "GET")
1134 self.assertEqual(response.status, 200)
1135 self.assertEqual(response.fromcache, True)
1136 (response, content) = self.http.request(uri, "PATCH", body="foo")
1137 self.assertEqual(response.status, 200)
1138 (response, content) = self.http.request(uri, "PATCH", body="foo")
1139 self.assertEqual(response.status, 412)
1140
1141 def testUpdateUsesCachedETagAndOCMethod(self):
1142 # Test that we natively support http://www.w3.org/1999/04/Editing/
1143 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
1144
1145 (response, content) = self.http.request(uri, "GET")
1146 self.assertEqual(response.status, 200)
1147 self.assertEqual(response.fromcache, False)
1148 (response, content) = self.http.request(uri, "GET")
1149 self.assertEqual(response.status, 200)
1150 self.assertEqual(response.fromcache, True)
1151 self.http.optimistic_concurrency_methods.append("DELETE")
1152 (response, content) = self.http.request(uri, "DELETE")
1153 self.assertEqual(response.status, 200)
1154
1155 def testUpdateUsesCachedETagOverridden(self):
1156 # Test that we natively support http://www.w3.org/1999/04/Editing/
1157 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
1158
1159 (response, content) = self.http.request(uri, "GET")
1160 self.assertEqual(response.status, 200)
1161 self.assertEqual(response.fromcache, False)
1162 (response, content) = self.http.request(uri, "GET")
1163 self.assertEqual(response.status, 200)
1164 self.assertEqual(response.fromcache, True)
1165 (response, content) = self.http.request(
1166 uri, "PUT", body="foo", headers={"if-match": "fred"}
1167 )
1168 self.assertEqual(response.status, 412)
1169
1170 def testBasicAuth(self):
1171 # Test Basic Authentication
1172 uri = urllib.parse.urljoin(base, "basic/file.txt")
1173 (response, content) = self.http.request(uri, "GET")
1174 self.assertEqual(response.status, 401)
1175
1176 uri = urllib.parse.urljoin(base, "basic/")
1177 (response, content) = self.http.request(uri, "GET")
1178 self.assertEqual(response.status, 401)
1179
1180 self.http.add_credentials("joe", "password")
1181 (response, content) = self.http.request(uri, "GET")
1182 self.assertEqual(response.status, 200)
1183
1184 uri = urllib.parse.urljoin(base, "basic/file.txt")
1185 (response, content) = self.http.request(uri, "GET")
1186 self.assertEqual(response.status, 200)
1187
1188 def testBasicAuthWithDomain(self):
1189 # Test Basic Authentication
1190 uri = urllib.parse.urljoin(base, "basic/file.txt")
1191 (response, content) = self.http.request(uri, "GET")
1192 self.assertEqual(response.status, 401)
1193
1194 uri = urllib.parse.urljoin(base, "basic/")
1195 (response, content) = self.http.request(uri, "GET")
1196 self.assertEqual(response.status, 401)
1197
1198 self.http.add_credentials("joe", "password", "example.org")
1199 (response, content) = self.http.request(uri, "GET")
1200 self.assertEqual(response.status, 401)
1201
1202 uri = urllib.parse.urljoin(base, "basic/file.txt")
1203 (response, content) = self.http.request(uri, "GET")
1204 self.assertEqual(response.status, 401)
1205
1206 domain = urllib.parse.urlparse(base)[1]
1207 self.http.add_credentials("joe", "password", domain)
1208 (response, content) = self.http.request(uri, "GET")
1209 self.assertEqual(response.status, 200)
1210
1211 uri = urllib.parse.urljoin(base, "basic/file.txt")
1212 (response, content) = self.http.request(uri, "GET")
1213 self.assertEqual(response.status, 200)
1214
1215 def testBasicAuthTwoDifferentCredentials(self):
1216 # Test Basic Authentication with multiple sets of credentials
1217 uri = urllib.parse.urljoin(base, "basic2/file.txt")
1218 (response, content) = self.http.request(uri, "GET")
1219 self.assertEqual(response.status, 401)
1220
1221 uri = urllib.parse.urljoin(base, "basic2/")
1222 (response, content) = self.http.request(uri, "GET")
1223 self.assertEqual(response.status, 401)
1224
1225 self.http.add_credentials("fred", "barney")
1226 (response, content) = self.http.request(uri, "GET")
1227 self.assertEqual(response.status, 200)
1228
1229 uri = urllib.parse.urljoin(base, "basic2/file.txt")
1230 (response, content) = self.http.request(uri, "GET")
1231 self.assertEqual(response.status, 200)
1232
1233 def testBasicAuthNested(self):
1234 # Test Basic Authentication with resources
1235 # that are nested
1236 uri = urllib.parse.urljoin(base, "basic-nested/")
1237 (response, content) = self.http.request(uri, "GET")
1238 self.assertEqual(response.status, 401)
1239
1240 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1241 (response, content) = self.http.request(uri, "GET")
1242 self.assertEqual(response.status, 401)
1243
1244 # Now add in credentials one at a time and test.
1245 self.http.add_credentials("joe", "password")
1246
1247 uri = urllib.parse.urljoin(base, "basic-nested/")
1248 (response, content) = self.http.request(uri, "GET")
1249 self.assertEqual(response.status, 200)
1250
1251 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1252 (response, content) = self.http.request(uri, "GET")
1253 self.assertEqual(response.status, 401)
1254
1255 self.http.add_credentials("fred", "barney")
1256
1257 uri = urllib.parse.urljoin(base, "basic-nested/")
1258 (response, content) = self.http.request(uri, "GET")
1259 self.assertEqual(response.status, 200)
1260
1261 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1262 (response, content) = self.http.request(uri, "GET")
1263 self.assertEqual(response.status, 200)
1264
1265 def testDigestAuth(self):
1266 # Test that we support Digest Authentication
1267 uri = urllib.parse.urljoin(base, "digest/")
1268 (response, content) = self.http.request(uri, "GET")
1269 self.assertEqual(response.status, 401)
1270
1271 self.http.add_credentials("joe", "password")
1272 (response, content) = self.http.request(uri, "GET")
1273 self.assertEqual(response.status, 200)
1274
1275 uri = urllib.parse.urljoin(base, "digest/file.txt")
1276 (response, content) = self.http.request(uri, "GET")
1277
1278 def testDigestAuthNextNonceAndNC(self):
1279 # Test that if the server sets nextnonce that we reset
1280 # the nonce count back to 1
1281 uri = urllib.parse.urljoin(base, "digest/file.txt")
1282 self.http.add_credentials("joe", "password")
1283 (response, content) = self.http.request(
1284 uri, "GET", headers={"cache-control": "no-cache"}
1285 )
1286 info = httplib2._parse_www_authenticate(response, "authentication-info")
1287 self.assertEqual(response.status, 200)
1288 (response, content) = self.http.request(
1289 uri, "GET", headers={"cache-control": "no-cache"}
1290 )
1291 info2 = httplib2._parse_www_authenticate(response, "authentication-info")
1292 self.assertEqual(response.status, 200)
1293
1294 if "nextnonce" in info:
1295 self.assertEqual(info2["nc"], 1)
1296
1297 def testDigestAuthStale(self):
1298 # Test that we can handle a nonce becoming stale
1299 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
1300 self.http.add_credentials("joe", "password")
1301 (response, content) = self.http.request(
1302 uri, "GET", headers={"cache-control": "no-cache"}
1303 )
1304 info = httplib2._parse_www_authenticate(response, "authentication-info")
1305 self.assertEqual(response.status, 200)
1306
1307 time.sleep(3)
1308 # Sleep long enough that the nonce becomes stale
1309
1310 (response, content) = self.http.request(
1311 uri, "GET", headers={"cache-control": "no-cache"}
1312 )
1313 self.assertFalse(response.fromcache)
1314 self.assertTrue(response._stale_digest)
1315 info3 = httplib2._parse_www_authenticate(response, "authentication-info")
1316 self.assertEqual(response.status, 200)
1317
1318 def reflector(self, content):
1319 return dict(
1320 [
1321 tuple(x.split("=", 1))
1322 for x in content.decode("utf-8").strip().split("\n")
1323 ]
1324 )
1325
1326 def testReflector(self):
1327 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1328 (response, content) = self.http.request(uri, "GET")
1329 d = self.reflector(content)
1330 self.assertTrue("HTTP_USER_AGENT" in d)
1331
1332 def testConnectionClose(self):
1333 uri = "http://www.google.com/"
1334 (response, content) = self.http.request(uri, "GET")
1335 for c in self.http.connections.values():
1336 self.assertNotEqual(None, c.sock)
1337 (response, content) = self.http.request(
1338 uri, "GET", headers={"connection": "close"}
1339 )
1340 for c in self.http.connections.values():
1341 self.assertEqual(None, c.sock)
1342
1343 def testPickleHttp(self):
1344 pickled_http = pickle.dumps(self.http)
1345 new_http = pickle.loads(pickled_http)
1346
1347 self.assertEqual(
1348 sorted(new_http.__dict__.keys()), sorted(self.http.__dict__.keys())
1349 )
1350 for key in new_http.__dict__:
1351 if key in ("certificates", "credentials"):
1352 self.assertEqual(
1353 new_http.__dict__[key].credentials,
1354 self.http.__dict__[key].credentials,
1355 )
1356 elif key == "cache":
1357 self.assertEqual(
1358 new_http.__dict__[key].cache, self.http.__dict__[key].cache
1359 )
1360 else:
1361 self.assertEqual(new_http.__dict__[key], self.http.__dict__[key])
1362
1363 def testPickleHttpWithConnection(self):
1364 self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
1365 pickled_http = pickle.dumps(self.http)
1366 new_http = pickle.loads(pickled_http)
1367
1368 self.assertEqual(list(self.http.connections.keys()), ["http:bitworking.org"])
1369 self.assertEqual(new_http.connections, {})
1370
1371 def testPickleCustomRequestHttp(self):
1372 def dummy_request(*args, **kwargs):
1373 return new_request(*args, **kwargs)
1374
1375 dummy_request.dummy_attr = "dummy_value"
1376
1377 self.http.request = dummy_request
1378 pickled_http = pickle.dumps(self.http)
1379 self.assertFalse(b"S'request'" in pickled_http)
1380
1381
1382try:
1383 import memcache
1384
1385 class HttpTestMemCached(HttpTest):
1386 def setUp(self):
1387 self.cache = memcache.Client(["127.0.0.1:11211"], debug=0)
1388 # self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1389 self.http = httplib2.Http(self.cache)
1390 self.cache.flush_all()
1391 # Not exactly sure why the sleep is needed here, but
1392 # if not present then some unit tests that rely on caching
1393 # fail. Memcached seems to lose some sets immediately
1394 # after a flush_all if the set is to a value that
1395 # was previously cached. (Maybe the flush is handled async?)
1396 time.sleep(1)
1397 self.http.clear_credentials()
1398
1399
1400except:
1401 pass
1402
1403# ------------------------------------------------------------------------
1404
1405
1406class HttpPrivateTest(unittest.TestCase):
1407 def testParseCacheControl(self):
1408 # Test that we can parse the Cache-Control header
1409 self.assertEqual({}, httplib2._parse_cache_control({}))
1410 self.assertEqual(
1411 {"no-cache": 1},
1412 httplib2._parse_cache_control({"cache-control": " no-cache"}),
1413 )
1414 cc = httplib2._parse_cache_control(
1415 {"cache-control": " no-cache, max-age = 7200"}
1416 )
1417 self.assertEqual(cc["no-cache"], 1)
1418 self.assertEqual(cc["max-age"], "7200")
1419 cc = httplib2._parse_cache_control({"cache-control": " , "})
1420 self.assertEqual(cc[""], 1)
1421
1422 try:
1423 cc = httplib2._parse_cache_control(
1424 {"cache-control": "Max-age=3600;post-check=1800,pre-check=3600"}
1425 )
1426 self.assertTrue("max-age" in cc)
1427 except:
1428 self.fail("Should not throw exception")
1429
1430 def testNormalizeHeaders(self):
1431 # Test that we normalize headers to lowercase
1432 h = httplib2._normalize_headers({"Cache-Control": "no-cache", "Other": "Stuff"})
1433 self.assertTrue("cache-control" in h)
1434 self.assertTrue("other" in h)
1435 self.assertEqual("Stuff", h["other"])
1436
1437 def testConvertByteStr(self):
1438 with self.assertRaises(TypeError):
1439 httplib2._convert_byte_str(4)
1440 self.assertEqual("Hello World", httplib2._convert_byte_str(b"Hello World"))
1441 self.assertEqual("Bye World", httplib2._convert_byte_str("Bye World"))
1442
1443 def testExpirationModelTransparent(self):
1444 # Test that no-cache makes our request TRANSPARENT
1445 response_headers = {"cache-control": "max-age=7200"}
1446 request_headers = {"cache-control": "no-cache"}
1447 self.assertEqual(
1448 "TRANSPARENT",
1449 httplib2._entry_disposition(response_headers, request_headers),
1450 )
1451
1452 def testMaxAgeNonNumeric(self):
1453 # Test that no-cache makes our request TRANSPARENT
1454 response_headers = {"cache-control": "max-age=fred, min-fresh=barney"}
1455 request_headers = {}
1456 self.assertEqual(
1457 "STALE", httplib2._entry_disposition(response_headers, request_headers)
1458 )
1459
1460 def testExpirationModelNoCacheResponse(self):
1461 # The date and expires point to an entry that should be
1462 # FRESH, but the no-cache over-rides that.
1463 now = time.time()
1464 response_headers = {
1465 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1466 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
1467 "cache-control": "no-cache",
1468 }
1469 request_headers = {}
1470 self.assertEqual(
1471 "STALE", httplib2._entry_disposition(response_headers, request_headers)
1472 )
1473
1474 def testExpirationModelStaleRequestMustReval(self):
1475 # must-revalidate forces STALE
1476 self.assertEqual(
1477 "STALE",
1478 httplib2._entry_disposition({}, {"cache-control": "must-revalidate"}),
1479 )
1480
1481 def testExpirationModelStaleResponseMustReval(self):
1482 # must-revalidate forces STALE
1483 self.assertEqual(
1484 "STALE",
1485 httplib2._entry_disposition({"cache-control": "must-revalidate"}, {}),
1486 )
1487
1488 def testExpirationModelFresh(self):
1489 response_headers = {
1490 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1491 "cache-control": "max-age=2",
1492 }
1493 request_headers = {}
1494 self.assertEqual(
1495 "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1496 )
1497 time.sleep(3)
1498 self.assertEqual(
1499 "STALE", httplib2._entry_disposition(response_headers, request_headers)
1500 )
1501
1502 def testExpirationMaxAge0(self):
1503 response_headers = {
1504 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1505 "cache-control": "max-age=0",
1506 }
1507 request_headers = {}
1508 self.assertEqual(
1509 "STALE", httplib2._entry_disposition(response_headers, request_headers)
1510 )
1511
1512 def testExpirationModelDateAndExpires(self):
1513 now = time.time()
1514 response_headers = {
1515 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1516 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
1517 }
1518 request_headers = {}
1519 self.assertEqual(
1520 "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1521 )
1522 time.sleep(3)
1523 self.assertEqual(
1524 "STALE", httplib2._entry_disposition(response_headers, request_headers)
1525 )
1526
1527 def testExpiresZero(self):
1528 now = time.time()
1529 response_headers = {
1530 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1531 "expires": "0",
1532 }
1533 request_headers = {}
1534 self.assertEqual(
1535 "STALE", httplib2._entry_disposition(response_headers, request_headers)
1536 )
1537
1538 def testExpirationModelDateOnly(self):
1539 now = time.time()
1540 response_headers = {
1541 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 3))
1542 }
1543 request_headers = {}
1544 self.assertEqual(
1545 "STALE", httplib2._entry_disposition(response_headers, request_headers)
1546 )
1547
1548 def testExpirationModelOnlyIfCached(self):
1549 response_headers = {}
1550 request_headers = {"cache-control": "only-if-cached"}
1551 self.assertEqual(
1552 "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1553 )
1554
1555 def testExpirationModelMaxAgeBoth(self):
1556 now = time.time()
1557 response_headers = {
1558 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1559 "cache-control": "max-age=2",
1560 }
1561 request_headers = {"cache-control": "max-age=0"}
1562 self.assertEqual(
1563 "STALE", httplib2._entry_disposition(response_headers, request_headers)
1564 )
1565
1566 def testExpirationModelDateAndExpiresMinFresh1(self):
1567 now = time.time()
1568 response_headers = {
1569 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1570 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 2)),
1571 }
1572 request_headers = {"cache-control": "min-fresh=2"}
1573 self.assertEqual(
1574 "STALE", httplib2._entry_disposition(response_headers, request_headers)
1575 )
1576
1577 def testExpirationModelDateAndExpiresMinFresh2(self):
1578 now = time.time()
1579 response_headers = {
1580 "date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1581 "expires": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now + 4)),
1582 }
1583 request_headers = {"cache-control": "min-fresh=2"}
1584 self.assertEqual(
1585 "FRESH", httplib2._entry_disposition(response_headers, request_headers)
1586 )
1587
1588 def testParseWWWAuthenticateEmpty(self):
1589 res = httplib2._parse_www_authenticate({})
1590 self.assertEqual(len(list(res.keys())), 0)
1591
1592 def testParseWWWAuthenticate(self):
1593 # different uses of spaces around commas
1594 res = httplib2._parse_www_authenticate(
1595 {
1596 "www-authenticate": 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'
1597 }
1598 )
1599 self.assertEqual(len(list(res.keys())), 1)
1600 self.assertEqual(len(list(res["test"].keys())), 5)
1601
1602 # tokens with non-alphanum
1603 res = httplib2._parse_www_authenticate(
1604 {"www-authenticate": 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'}
1605 )
1606 self.assertEqual(len(list(res.keys())), 1)
1607 self.assertEqual(len(list(res["t*!%#st"].keys())), 2)
1608
1609 # quoted string with quoted pairs
1610 res = httplib2._parse_www_authenticate(
1611 {"www-authenticate": 'Test realm="a \\"test\\" realm"'}
1612 )
1613 self.assertEqual(len(list(res.keys())), 1)
1614 self.assertEqual(res["test"]["realm"], 'a "test" realm')
1615
1616 def testParseWWWAuthenticateStrict(self):
1617 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1
1618 self.testParseWWWAuthenticate()
1619 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0
1620
1621 def testParseWWWAuthenticateBasic(self):
1622 res = httplib2._parse_www_authenticate({"www-authenticate": 'Basic realm="me"'})
1623 basic = res["basic"]
1624 self.assertEqual("me", basic["realm"])
1625
1626 res = httplib2._parse_www_authenticate(
1627 {"www-authenticate": 'Basic realm="me", algorithm="MD5"'}
1628 )
1629 basic = res["basic"]
1630 self.assertEqual("me", basic["realm"])
1631 self.assertEqual("MD5", basic["algorithm"])
1632
1633 res = httplib2._parse_www_authenticate(
1634 {"www-authenticate": 'Basic realm="me", algorithm=MD5'}
1635 )
1636 basic = res["basic"]
1637 self.assertEqual("me", basic["realm"])
1638 self.assertEqual("MD5", basic["algorithm"])
1639
1640 def testParseWWWAuthenticateBasic2(self):
1641 res = httplib2._parse_www_authenticate(
1642 {"www-authenticate": 'Basic realm="me",other="fred" '}
1643 )
1644 basic = res["basic"]
1645 self.assertEqual("me", basic["realm"])
1646 self.assertEqual("fred", basic["other"])
1647
1648 def testParseWWWAuthenticateBasic3(self):
1649 res = httplib2._parse_www_authenticate(
1650 {"www-authenticate": 'Basic REAlm="me" '}
1651 )
1652 basic = res["basic"]
1653 self.assertEqual("me", basic["realm"])
1654
1655 def testParseWWWAuthenticateDigest(self):
1656 res = httplib2._parse_www_authenticate(
1657 {
1658 "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'
1659 }
1660 )
1661 digest = res["digest"]
1662 self.assertEqual("testrealm@host.com", digest["realm"])
1663 self.assertEqual("auth,auth-int", digest["qop"])
1664
1665 def testParseWWWAuthenticateMultiple(self):
1666 res = httplib2._parse_www_authenticate(
1667 {
1668 "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '
1669 }
1670 )
1671 digest = res["digest"]
1672 self.assertEqual("testrealm@host.com", digest["realm"])
1673 self.assertEqual("auth,auth-int", digest["qop"])
1674 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
1675 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
1676 basic = res["basic"]
1677 self.assertEqual("me", basic["realm"])
1678
1679 def testParseWWWAuthenticateMultiple2(self):
1680 # Handle an added comma between challenges, which might get thrown in if the challenges were
1681 # originally sent in separate www-authenticate headers.
1682 res = httplib2._parse_www_authenticate(
1683 {
1684 "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '
1685 }
1686 )
1687 digest = res["digest"]
1688 self.assertEqual("testrealm@host.com", digest["realm"])
1689 self.assertEqual("auth,auth-int", digest["qop"])
1690 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
1691 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
1692 basic = res["basic"]
1693 self.assertEqual("me", basic["realm"])
1694
1695 def testParseWWWAuthenticateMultiple3(self):
1696 # Handle an added comma between challenges, which might get thrown in if the challenges were
1697 # originally sent in separate www-authenticate headers.
1698 res = httplib2._parse_www_authenticate(
1699 {
1700 "www-authenticate": 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
1701 }
1702 )
1703 digest = res["digest"]
1704 self.assertEqual("testrealm@host.com", digest["realm"])
1705 self.assertEqual("auth,auth-int", digest["qop"])
1706 self.assertEqual("dcd98b7102dd2f0e8b11d0f600bfb0c093", digest["nonce"])
1707 self.assertEqual("5ccc069c403ebaf9f0171e9517f40e41", digest["opaque"])
1708 basic = res["basic"]
1709 self.assertEqual("me", basic["realm"])
1710 wsse = res["wsse"]
1711 self.assertEqual("foo", wsse["realm"])
1712 self.assertEqual("UsernameToken", wsse["profile"])
1713
1714 def testParseWWWAuthenticateMultiple4(self):
1715 res = httplib2._parse_www_authenticate(
1716 {
1717 "www-authenticate": 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'
1718 }
1719 )
1720 digest = res["digest"]
1721 self.assertEqual("test-real.m@host.com", digest["realm"])
1722 self.assertEqual("\tauth,auth-int", digest["qop"])
1723 self.assertEqual("(*)&^&$%#", digest["nonce"])
1724
1725 def testParseWWWAuthenticateMoreQuoteCombos(self):
1726 res = httplib2._parse_www_authenticate(
1727 {
1728 "www-authenticate": 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1729 }
1730 )
1731 digest = res["digest"]
1732 self.assertEqual("myrealm", digest["realm"])
1733
1734 def testParseWWWAuthenticateMalformed(self):
1735 try:
1736 res = httplib2._parse_www_authenticate(
1737 {
1738 "www-authenticate": 'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'
1739 }
1740 )
1741 self.fail("should raise an exception")
1742 except httplib2.MalformedHeader:
1743 pass
1744
1745 def testDigestObject(self):
1746 credentials = ("joe", "password")
1747 host = None
1748 request_uri = "/projects/httplib2/test/digest/"
1749 headers = {}
1750 response = {
1751 "www-authenticate": 'Digest realm="myrealm", '
1752 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
1753 'algorithm=MD5, qop="auth"'
1754 }
1755 content = b""
1756
1757 d = httplib2.DigestAuthentication(
1758 credentials, host, request_uri, headers, response, content, None
1759 )
1760 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1761 our_request = "authorization: %s" % headers["authorization"]
1762 working_request = (
1763 'authorization: Digest username="joe", realm="myrealm", '
1764 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1765 ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
1766 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
1767 'nc=00000001, cnonce="33033375ec278a46"'
1768 )
1769 self.assertEqual(our_request, working_request)
1770
1771 def testDigestObjectWithOpaque(self):
1772 credentials = ("joe", "password")
1773 host = None
1774 request_uri = "/projects/httplib2/test/digest/"
1775 headers = {}
1776 response = {
1777 "www-authenticate": 'Digest realm="myrealm", '
1778 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", '
1779 'algorithm=MD5, qop="auth", opaque="atestopaque"'
1780 }
1781 content = ""
1782
1783 d = httplib2.DigestAuthentication(
1784 credentials, host, request_uri, headers, response, content, None
1785 )
1786 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1787 our_request = "authorization: %s" % headers["authorization"]
1788 working_request = (
1789 'authorization: Digest username="joe", realm="myrealm", '
1790 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1791 ' uri="/projects/httplib2/test/digest/", algorithm=MD5, '
1792 'response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, '
1793 'nc=00000001, cnonce="33033375ec278a46", '
1794 'opaque="atestopaque"'
1795 )
1796 self.assertEqual(our_request, working_request)
1797
1798 def testDigestObjectStale(self):
1799 credentials = ("joe", "password")
1800 host = None
1801 request_uri = "/projects/httplib2/test/digest/"
1802 headers = {}
1803 response = httplib2.Response({})
1804 response["www-authenticate"] = (
1805 'Digest realm="myrealm", '
1806 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1807 ' algorithm=MD5, qop="auth", stale=true'
1808 )
1809 response.status = 401
1810 content = b""
1811 d = httplib2.DigestAuthentication(
1812 credentials, host, request_uri, headers, response, content, None
1813 )
1814 # Returns true to force a retry
1815 self.assertTrue(d.response(response, content))
1816
1817 def testDigestObjectAuthInfo(self):
1818 credentials = ("joe", "password")
1819 host = None
1820 request_uri = "/projects/httplib2/test/digest/"
1821 headers = {}
1822 response = httplib2.Response({})
1823 response["www-authenticate"] = (
1824 'Digest realm="myrealm", '
1825 'nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306",'
1826 ' algorithm=MD5, qop="auth", stale=true'
1827 )
1828 response["authentication-info"] = 'nextnonce="fred"'
1829 content = b""
1830 d = httplib2.DigestAuthentication(
1831 credentials, host, request_uri, headers, response, content, None
1832 )
1833 # Returns true to force a retry
1834 self.assertFalse(d.response(response, content))
1835 self.assertEqual("fred", d.challenge["nonce"])
1836 self.assertEqual(1, d.challenge["nc"])
1837
1838 def testWsseAlgorithm(self):
1839 digest = httplib2._wsse_username_token(
1840 "d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm"
1841 )
1842 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1843 self.assertEqual(expected, digest)
1844
1845 def testEnd2End(self):
1846 # one end to end header
1847 response = {"content-type": "application/atom+xml", "te": "deflate"}
1848 end2end = httplib2._get_end2end_headers(response)
1849 self.assertTrue("content-type" in end2end)
1850 self.assertTrue("te" not in end2end)
1851 self.assertTrue("connection" not in end2end)
1852
1853 # one end to end header that gets eliminated
1854 response = {
1855 "connection": "content-type",
1856 "content-type": "application/atom+xml",
1857 "te": "deflate",
1858 }
1859 end2end = httplib2._get_end2end_headers(response)
1860 self.assertTrue("content-type" not in end2end)
1861 self.assertTrue("te" not in end2end)
1862 self.assertTrue("connection" not in end2end)
1863
1864 # Degenerate case of no headers
1865 response = {}
1866 end2end = httplib2._get_end2end_headers(response)
1867 self.assertEqual(0, len(end2end))
1868
1869 # Degenerate case of connection referrring to a header not passed in
1870 response = {"connection": "content-type"}
1871 end2end = httplib2._get_end2end_headers(response)
1872 self.assertEqual(0, len(end2end))
1873
1874
1875class TestProxyInfo(unittest.TestCase):
1876 def setUp(self):
1877 self.orig_env = dict(os.environ)
1878
1879 def tearDown(self):
1880 os.environ.clear()
1881 os.environ.update(self.orig_env)
1882
1883 def test_from_url(self):
1884 pi = httplib2.proxy_info_from_url("http://myproxy.example.com")
1885 self.assertEqual(pi.proxy_host, "myproxy.example.com")
1886 self.assertEqual(pi.proxy_port, 80)
1887 self.assertEqual(pi.proxy_user, None)
1888
1889 def test_from_url_ident(self):
1890 pi = httplib2.proxy_info_from_url("http://zoidberg:fish@someproxy:99")
1891 self.assertEqual(pi.proxy_host, "someproxy")
1892 self.assertEqual(pi.proxy_port, 99)
1893 self.assertEqual(pi.proxy_user, "zoidberg")
1894 self.assertEqual(pi.proxy_pass, "fish")
1895
1896 def test_from_env(self):
1897 os.environ["http_proxy"] = "http://myproxy.example.com:8080"
1898 pi = httplib2.proxy_info_from_environment()
1899 self.assertEqual(pi.proxy_host, "myproxy.example.com")
1900 self.assertEqual(pi.proxy_port, 8080)
1901
1902 def test_from_env_no_proxy(self):
1903 os.environ["http_proxy"] = "http://myproxy.example.com:80"
1904 os.environ["https_proxy"] = "http://myproxy.example.com:81"
1905 pi = httplib2.proxy_info_from_environment("https")
1906 self.assertEqual(pi.proxy_host, "myproxy.example.com")
1907 self.assertEqual(pi.proxy_port, 81)
1908
1909 def test_from_env_none(self):
1910 os.environ.clear()
1911 pi = httplib2.proxy_info_from_environment()
1912 self.assertEqual(pi, None)
1913
1914 def test_proxy_headers(self):
1915 headers = {"key0": "val0", "key1": "val1"}
1916 pi = httplib2.ProxyInfo(
1917 httplib2.socks.PROXY_TYPE_HTTP, "localhost", 1234, proxy_headers=headers
1918 )
1919 self.assertEqual(pi.proxy_headers, headers)
1920
1921 # regression: ensure that httplib2.HTTPConnectionWithTimeout initializes when proxy_info is not supplied
1922 def test_proxy_init(self):
1923 connection = httplib2.HTTPConnectionWithTimeout("www.google.com", 80)
1924 connection.request("GET", "/")
1925 connection.close()
1926
1927
1928if __name__ == "__main__":
1929 unittest.main()