blob: 870275fcc2fba3defc3c99a438fb0157e421563b [file] [log] [blame]
pilgrim00a352e2009-05-29 04:04:44 +00001#!/usr/bin/env python3
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 3.0 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = ["Mark Pilgrim"]
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.2 ($Rev: 118 $)"
16
Joe Gregoriod760a1a2011-02-11 01:03:22 -050017import base64
pilgrim00a352e2009-05-29 04:04:44 +000018import http.client
19import httplib2
pilgrim00a352e2009-05-29 04:04:44 +000020import io
Joe Gregoriod760a1a2011-02-11 01:03:22 -050021import os
22import socket
Joe Gregorio732b11f2011-06-07 15:44:51 -040023import ssl
Joe Gregoriod760a1a2011-02-11 01:03:22 -050024import sys
25import time
26import unittest
27import urllib.parse
pilgrim00a352e2009-05-29 04:04:44 +000028
29# The test resources base uri
30base = 'http://bitworking.org/projects/httplib2/test/'
31#base = 'http://localhost/projects/httplib2/test/'
32cacheDirName = ".cache"
33
34
35class CredentialsTest(unittest.TestCase):
36 def test(self):
37 c = httplib2.Credentials()
38 c.add("joe", "password")
39 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
40 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
41 c.add("fred", "password2", "wellformedweb.org")
42 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
43 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
44 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
45 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
46 c.clear()
47 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
48 c.add("fred", "password2", "wellformedweb.org")
49 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
50 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
51 self.assertEqual(0, len(list(c.iter(""))))
52
53
54class ParserTest(unittest.TestCase):
55 def testFromStd66(self):
56 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
57 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
58 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
59 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
60 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
61 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
62 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
63 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
64
65
66class UrlNormTest(unittest.TestCase):
67 def test(self):
68 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
69 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
70 self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
71 self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
72 self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
73 self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
74 try:
75 httplib2.urlnorm("/")
76 self.fail("Non-absolute URIs should raise an exception")
77 except httplib2.RelativeURIError:
78 pass
79
80class UrlSafenameTest(unittest.TestCase):
81 def test(self):
82 # Test that different URIs end up generating different safe names
83 self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
84 self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
85 self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
86 self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
87 self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
88 self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
89 # Test the max length limits
90 uri = "http://" + ("w" * 200) + ".org"
91 uri2 = "http://" + ("w" * 201) + ".org"
92 self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
93 # Max length should be 200 + 1 (",") + 32
94 self.assertEqual(233, len(httplib2.safename(uri2)))
95 self.assertEqual(233, len(httplib2.safename(uri)))
96 # Unicode
97 if sys.version_info >= (2,3):
98 self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
99
100class _MyResponse(io.BytesIO):
101 def __init__(self, body, **kwargs):
102 io.BytesIO.__init__(self, body)
103 self.headers = kwargs
104
105 def items(self):
106 return self.headers.items()
107
108 def iteritems(self):
109 return iter(self.headers.items())
110
111
112class _MyHTTPConnection(object):
113 "This class is just a mock of httplib.HTTPConnection used for testing"
114
115 def __init__(self, host, port=None, key_file=None, cert_file=None,
116 strict=None, timeout=None, proxy_info=None):
117 self.host = host
118 self.port = port
119 self.timeout = timeout
120 self.log = ""
Joe Gregorio732b11f2011-06-07 15:44:51 -0400121 self.sock = None
pilgrim00a352e2009-05-29 04:04:44 +0000122
123 def set_debuglevel(self, level):
124 pass
125
126 def connect(self):
127 "Connect to a host on a given port."
128 pass
129
130 def close(self):
131 pass
132
133 def request(self, method, request_uri, body, headers):
134 pass
135
136 def getresponse(self):
137 return _MyResponse(b"the body", status="200")
138
139
140class HttpTest(unittest.TestCase):
141 def setUp(self):
142 if os.path.exists(cacheDirName):
143 [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
144 self.http = httplib2.Http(cacheDirName)
145 self.http.clear_credentials()
146
Joe Gregorio756d3b32011-02-13 11:59:51 -0500147 def testIPv6NoSSL(self):
148 try:
149 self.http.request("http://[::1]/")
150 except socket.gaierror:
151 self.fail("should get the address family right for IPv6")
152 except socket.error:
153 # Even if IPv6 isn't installed on a machine it should just raise socket.error
154 pass
155
156 def testIPv6SSL(self):
157 try:
158 self.http.request("https://[::1]/")
159 except socket.gaierror:
160 self.fail("should get the address family right for IPv6")
161 except socket.error:
162 # Even if IPv6 isn't installed on a machine it should just raise socket.error
163 pass
164
pilgrim00a352e2009-05-29 04:04:44 +0000165 def testConnectionType(self):
166 self.http.force_exception_to_status_code = False
167 response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
168 self.assertEqual(response['content-location'], "http://bitworking.org")
169 self.assertEqual(content, b"the body")
170
171 def testGetUnknownServer(self):
172 self.http.force_exception_to_status_code = False
173 try:
174 self.http.request("http://fred.bitworking.org/")
175 self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
176 except httplib2.ServerNotFoundError:
177 pass
178
179 # Now test with exceptions turned off
180 self.http.force_exception_to_status_code = True
181
182 (response, content) = self.http.request("http://fred.bitworking.org/")
183 self.assertEqual(response['content-type'], 'text/plain')
184 self.assertTrue(content.startswith(b"Unable to find"))
185 self.assertEqual(response.status, 400)
186
Joe Gregoriod760a1a2011-02-11 01:03:22 -0500187 def testGetConnectionRefused(self):
188 self.http.force_exception_to_status_code = False
189 try:
190 self.http.request("http://localhost:7777/")
191 self.fail("An socket.error exception must be thrown on Connection Refused.")
192 except socket.error:
193 pass
194
195 # Now test with exceptions turned off
196 self.http.force_exception_to_status_code = True
197
198 (response, content) = self.http.request("http://localhost:7777/")
199 self.assertEqual(response['content-type'], 'text/plain')
200 self.assertTrue(b"Connection refused" in content)
201 self.assertEqual(response.status, 400)
202
pilgrim00a352e2009-05-29 04:04:44 +0000203 def testGetIRI(self):
204 if sys.version_info >= (2,3):
205 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
206 (response, content) = self.http.request(uri, "GET")
207 d = self.reflector(content)
208 self.assertTrue('QUERY_STRING' in d)
209 self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
210
211 def testGetIsDefaultMethod(self):
212 # Test that GET is the default method
213 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
214 (response, content) = self.http.request(uri)
215 self.assertEqual(response['x-method'], "GET")
216
217 def testDifferentMethods(self):
218 # Test that all methods can be used
219 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
220 for method in ["GET", "PUT", "DELETE", "POST"]:
221 (response, content) = self.http.request(uri, method, body=b" ")
222 self.assertEqual(response['x-method'], method)
223
Joe Gregoriob628c0b2009-07-16 12:28:04 -0400224 def testHeadRead(self):
225 # Test that we don't try to read the response of a HEAD request
226 # since httplib blocks response.read() for HEAD requests.
227 # Oddly enough this doesn't appear as a problem when doing HEAD requests
228 # against Apache servers.
229 uri = "http://www.google.com/"
230 (response, content) = self.http.request(uri, "HEAD")
231 self.assertEqual(response.status, 200)
232 self.assertEqual(content, b"")
233
pilgrim00a352e2009-05-29 04:04:44 +0000234 def testGetNoCache(self):
235 # Test that can do a GET w/o the cache turned on.
236 http = httplib2.Http()
237 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
238 (response, content) = http.request(uri, "GET")
239 self.assertEqual(response.status, 200)
240 self.assertEqual(response.previous, None)
241
Joe Gregorioe202d212009-07-16 14:57:52 -0400242 def testGetOnlyIfCachedCacheHit(self):
243 # Test that can do a GET with cache and 'only-if-cached'
244 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
245 (response, content) = self.http.request(uri, "GET")
246 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
247 self.assertEqual(response.fromcache, True)
248 self.assertEqual(response.status, 200)
249
pilgrim00a352e2009-05-29 04:04:44 +0000250 def testGetOnlyIfCachedCacheMiss(self):
251 # Test that can do a GET with no cache with 'only-if-cached'
pilgrim00a352e2009-05-29 04:04:44 +0000252 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorioe202d212009-07-16 14:57:52 -0400253 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
pilgrim00a352e2009-05-29 04:04:44 +0000254 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400255 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000256
257 def testGetOnlyIfCachedNoCacheAtAll(self):
258 # Test that can do a GET with no cache with 'only-if-cached'
259 # Of course, there might be an intermediary beyond us
260 # that responds to the 'only-if-cached', so this
261 # test can't really be guaranteed to pass.
262 http = httplib2.Http()
263 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
264 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
265 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400266 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000267
268 def testUserAgent(self):
269 # Test that we provide a default user-agent
270 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
271 (response, content) = self.http.request(uri, "GET")
272 self.assertEqual(response.status, 200)
273 self.assertTrue(content.startswith(b"Python-httplib2/"))
274
275 def testUserAgentNonDefault(self):
276 # Test that the default user-agent can be over-ridden
277
278 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
279 (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
280 self.assertEqual(response.status, 200)
281 self.assertTrue(content.startswith(b"fred/1.0"))
282
283 def testGet300WithLocation(self):
284 # Test the we automatically follow 300 redirects if a Location: header is provided
285 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
286 (response, content) = self.http.request(uri, "GET")
287 self.assertEqual(response.status, 200)
288 self.assertEqual(content, b"This is the final destination.\n")
289 self.assertEqual(response.previous.status, 300)
290 self.assertEqual(response.previous.fromcache, False)
291
292 # Confirm that the intermediate 300 is not cached
293 (response, content) = self.http.request(uri, "GET")
294 self.assertEqual(response.status, 200)
295 self.assertEqual(content, b"This is the final destination.\n")
296 self.assertEqual(response.previous.status, 300)
297 self.assertEqual(response.previous.fromcache, False)
298
299 def testGet300WithLocationNoRedirect(self):
300 # Test the we automatically follow 300 redirects if a Location: header is provided
301 self.http.follow_redirects = False
302 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
303 (response, content) = self.http.request(uri, "GET")
304 self.assertEqual(response.status, 300)
305
306 def testGet300WithoutLocation(self):
307 # Not giving a Location: header in a 300 response is acceptable
308 # In which case we just return the 300 response
309 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
310 (response, content) = self.http.request(uri, "GET")
311 self.assertEqual(response.status, 300)
312 self.assertTrue(response['content-type'].startswith("text/html"))
313 self.assertEqual(response.previous, None)
314
315 def testGet301(self):
316 # Test that we automatically follow 301 redirects
317 # and that we cache the 301 response
318 uri = urllib.parse.urljoin(base, "301/onestep.asis")
319 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
320 (response, content) = self.http.request(uri, "GET")
321 self.assertEqual(response.status, 200)
322 self.assertTrue('content-location' in response)
323 self.assertEqual(response['content-location'], destination)
324 self.assertEqual(content, b"This is the final destination.\n")
325 self.assertEqual(response.previous.status, 301)
326 self.assertEqual(response.previous.fromcache, False)
327
328 (response, content) = self.http.request(uri, "GET")
329 self.assertEqual(response.status, 200)
330 self.assertEqual(response['content-location'], destination)
331 self.assertEqual(content, b"This is the final destination.\n")
332 self.assertEqual(response.previous.status, 301)
333 self.assertEqual(response.previous.fromcache, True)
334
Joe Gregorio0abd39f2011-02-13 21:40:09 -0500335 def testHead301(self):
336 # Test that we automatically follow 301 redirects
337 uri = urllib.parse.urljoin(base, "301/onestep.asis")
338 (response, content) = self.http.request(uri, "HEAD")
339 self.assertEqual(response.status, 200)
340 self.assertEqual(response.previous.status, 301)
341 self.assertEqual(response.previous.fromcache, False)
pilgrim00a352e2009-05-29 04:04:44 +0000342
343 def testGet301NoRedirect(self):
344 # Test that we automatically follow 301 redirects
345 # and that we cache the 301 response
346 self.http.follow_redirects = False
347 uri = urllib.parse.urljoin(base, "301/onestep.asis")
348 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
349 (response, content) = self.http.request(uri, "GET")
350 self.assertEqual(response.status, 301)
351
352
353 def testGet302(self):
354 # Test that we automatically follow 302 redirects
355 # and that we DO NOT cache the 302 response
356 uri = urllib.parse.urljoin(base, "302/onestep.asis")
357 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
358 (response, content) = self.http.request(uri, "GET")
359 self.assertEqual(response.status, 200)
360 self.assertEqual(response['content-location'], destination)
361 self.assertEqual(content, b"This is the final destination.\n")
362 self.assertEqual(response.previous.status, 302)
363 self.assertEqual(response.previous.fromcache, False)
364
365 uri = urllib.parse.urljoin(base, "302/onestep.asis")
366 (response, content) = self.http.request(uri, "GET")
367 self.assertEqual(response.status, 200)
368 self.assertEqual(response.fromcache, True)
369 self.assertEqual(response['content-location'], destination)
370 self.assertEqual(content, b"This is the final destination.\n")
371 self.assertEqual(response.previous.status, 302)
372 self.assertEqual(response.previous.fromcache, False)
373 self.assertEqual(response.previous['content-location'], uri)
374
375 uri = urllib.parse.urljoin(base, "302/twostep.asis")
376
377 (response, content) = self.http.request(uri, "GET")
378 self.assertEqual(response.status, 200)
379 self.assertEqual(response.fromcache, True)
380 self.assertEqual(content, b"This is the final destination.\n")
381 self.assertEqual(response.previous.status, 302)
382 self.assertEqual(response.previous.fromcache, False)
383
384 def testGet302RedirectionLimit(self):
385 # Test that we can set a lower redirection limit
386 # and that we raise an exception when we exceed
387 # that limit.
388 self.http.force_exception_to_status_code = False
389
390 uri = urllib.parse.urljoin(base, "302/twostep.asis")
391 try:
392 (response, content) = self.http.request(uri, "GET", redirections = 1)
393 self.fail("This should not happen")
394 except httplib2.RedirectLimit:
395 pass
396 except Exception as e:
397 self.fail("Threw wrong kind of exception ")
398
399 # Re-run the test with out the exceptions
Joe Gregorio850d92e2011-02-14 23:30:21 -0500400 self.http.force_exception_to_status_code = True
pilgrim00a352e2009-05-29 04:04:44 +0000401
402 (response, content) = self.http.request(uri, "GET", redirections = 1)
403 self.assertEqual(response.status, 500)
404 self.assertTrue(response.reason.startswith("Redirected more"))
405 self.assertEqual("302", response['status'])
406 self.assertTrue(content.startswith(b"<html>"))
407 self.assertTrue(response.previous != None)
408
409 def testGet302NoLocation(self):
410 # Test that we throw an exception when we get
411 # a 302 with no Location: header.
412 self.http.force_exception_to_status_code = False
413 uri = urllib.parse.urljoin(base, "302/no-location.asis")
414 try:
415 (response, content) = self.http.request(uri, "GET")
416 self.fail("Should never reach here")
417 except httplib2.RedirectMissingLocation:
418 pass
419 except Exception as e:
420 self.fail("Threw wrong kind of exception ")
421
422 # Re-run the test with out the exceptions
423 self.http.force_exception_to_status_code = True
424
425 (response, content) = self.http.request(uri, "GET")
426 self.assertEqual(response.status, 500)
427 self.assertTrue(response.reason.startswith("Redirected but"))
428 self.assertEqual("302", response['status'])
429 self.assertTrue(content.startswith(b"This is content"))
430
431 def testGet302ViaHttps(self):
432 # Google always redirects to http://google.com
Joe Gregorio1ae409f2011-05-24 11:43:44 -0400433 (response, content) = self.http.request("https://www.google.com", "GET")
pilgrim00a352e2009-05-29 04:04:44 +0000434 self.assertEqual(200, response.status)
435 self.assertEqual(302, response.previous.status)
436
437 def testGetViaHttps(self):
438 # Test that we can handle HTTPS
439 (response, content) = self.http.request("https://google.com/adsense/", "GET")
440 self.assertEqual(200, response.status)
441
442 def testGetViaHttpsSpecViolationOnLocation(self):
443 # Test that we follow redirects through HTTPS
444 # even if they violate the spec by including
445 # a relative Location: header instead of an
446 # absolute one.
447 (response, content) = self.http.request("https://google.com/adsense", "GET")
448 self.assertEqual(200, response.status)
449 self.assertNotEqual(None, response.previous)
450
451
452 def testGetViaHttpsKeyCert(self):
453 # At this point I can only test
454 # that the key and cert files are passed in
455 # correctly to httplib. It would be nice to have
456 # a real https endpoint to test against.
457 http = httplib2.Http(timeout=2)
458
459 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
460 try:
Joe Gregorioa3934ae2011-06-06 16:39:56 -0400461 (response, content) = http.request("https://bitworking.org", "GET")
462 except AttributeError:
463 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
464 self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
465 except IOError:
466 # Skip on 3.2
467 pass
pilgrim00a352e2009-05-29 04:04:44 +0000468
469 try:
470 (response, content) = http.request("https://notthere.bitworking.org", "GET")
Joe Gregorioa3934ae2011-06-06 16:39:56 -0400471 except httplib2.ServerNotFoundError:
472 self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
473 self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
474 except IOError:
475 # Skip on 3.2
476 pass
pilgrim00a352e2009-05-29 04:04:44 +0000477
Joe Gregorio732b11f2011-06-07 15:44:51 -0400478 def testSslCertValidation(self):
479 # Test that we get an ssl.SSLError when specifying a non-existent CA
480 # certs file.
481 http = httplib2.Http(ca_certs='/nosuchfile')
482 self.assertRaises(IOError,
483 http.request, "https://www.google.com/", "GET")
pilgrim00a352e2009-05-29 04:04:44 +0000484
Joe Gregorio732b11f2011-06-07 15:44:51 -0400485 # Test that we get a SSLHandshakeError if we try to access
486 # https://www.google.com, using a CA cert file that doesn't contain
487 # the CA Gogole uses (i.e., simulating a cert that's not signed by a
488 # trusted CA).
489 other_ca_certs = os.path.join(
490 os.path.dirname(os.path.abspath(httplib2.__file__ )),
491 "test", "other_cacerts.txt")
492 http = httplib2.Http(ca_certs=other_ca_certs)
493 self.assertRaises(ssl.SSLError,
494 http.request,"https://www.google.com/", "GET")
pilgrim00a352e2009-05-29 04:04:44 +0000495
Joe Gregorio732b11f2011-06-07 15:44:51 -0400496 def testSniHostnameValidation(self):
497 self.http.request("https://google.com/", method="GET")
pilgrim00a352e2009-05-29 04:04:44 +0000498
499 def testGet303(self):
500 # Do a follow-up GET on a Location: header
501 # returned from a POST that gave a 303.
502 uri = urllib.parse.urljoin(base, "303/303.cgi")
503 (response, content) = self.http.request(uri, "POST", " ")
504 self.assertEqual(response.status, 200)
505 self.assertEqual(content, b"This is the final destination.\n")
506 self.assertEqual(response.previous.status, 303)
507
508 def testGet303NoRedirect(self):
509 # Do a follow-up GET on a Location: header
510 # returned from a POST that gave a 303.
511 self.http.follow_redirects = False
512 uri = urllib.parse.urljoin(base, "303/303.cgi")
513 (response, content) = self.http.request(uri, "POST", " ")
514 self.assertEqual(response.status, 303)
515
516 def test303ForDifferentMethods(self):
517 # Test that all methods can be used
518 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
519 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
520 (response, content) = self.http.request(uri, method, body=b" ")
521 self.assertEqual(response['x-method'], method_on_303)
522
523 def testGet304(self):
524 # Test that we use ETags properly to validate our cache
525 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
526 (response, content) = self.http.request(uri, "GET")
527 self.assertNotEqual(response['etag'], "")
528
529 (response, content) = self.http.request(uri, "GET")
530 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
531 self.assertEqual(response.status, 200)
532 self.assertEqual(response.fromcache, True)
533
534 cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
535 f = open(cache_file_name, "r")
536 status_line = f.readline()
537 f.close()
538
539 self.assertTrue(status_line.startswith("status:"))
540
541 (response, content) = self.http.request(uri, "HEAD")
542 self.assertEqual(response.status, 200)
543 self.assertEqual(response.fromcache, True)
544
545 (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
546 self.assertEqual(response.status, 206)
547 self.assertEqual(response.fromcache, False)
548
549 def testGetIgnoreEtag(self):
550 # Test that we can forcibly ignore ETags
551 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
552 (response, content) = self.http.request(uri, "GET")
553 self.assertNotEqual(response['etag'], "")
554
555 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
556 d = self.reflector(content)
557 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
558
559 self.http.ignore_etag = True
560 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
561 d = self.reflector(content)
562 self.assertEqual(response.fromcache, False)
563 self.assertFalse('HTTP_IF_NONE_MATCH' in d)
564
565 def testOverrideEtag(self):
566 # Test that we can forcibly ignore ETags
567 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
568 (response, content) = self.http.request(uri, "GET")
569 self.assertNotEqual(response['etag'], "")
570
571 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
572 d = self.reflector(content)
573 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
574 self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
575
576 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
577 d = self.reflector(content)
578 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
579 self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
580
581#MAP-commented this out because it consistently fails
582# def testGet304EndToEnd(self):
583# # Test that end to end headers get overwritten in the cache
584# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
585# (response, content) = self.http.request(uri, "GET")
586# self.assertNotEqual(response['etag'], "")
587# old_date = response['date']
588# time.sleep(2)
589#
590# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
591# # The response should be from the cache, but the Date: header should be updated.
592# new_date = response['date']
593# self.assertNotEqual(new_date, old_date)
594# self.assertEqual(response.status, 200)
595# self.assertEqual(response.fromcache, True)
596
597 def testGet304LastModified(self):
598 # Test that we can still handle a 304
599 # by only using the last-modified cache validator.
600 uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
601 (response, content) = self.http.request(uri, "GET")
602
603 self.assertNotEqual(response['last-modified'], "")
604 (response, content) = self.http.request(uri, "GET")
605 (response, content) = self.http.request(uri, "GET")
606 self.assertEqual(response.status, 200)
607 self.assertEqual(response.fromcache, True)
608
609 def testGet307(self):
610 # Test that we do follow 307 redirects but
611 # do not cache the 307
612 uri = urllib.parse.urljoin(base, "307/onestep.asis")
613 (response, content) = self.http.request(uri, "GET")
614 self.assertEqual(response.status, 200)
615 self.assertEqual(content, b"This is the final destination.\n")
616 self.assertEqual(response.previous.status, 307)
617 self.assertEqual(response.previous.fromcache, False)
618
619 (response, content) = self.http.request(uri, "GET")
620 self.assertEqual(response.status, 200)
621 self.assertEqual(response.fromcache, True)
622 self.assertEqual(content, b"This is the final destination.\n")
623 self.assertEqual(response.previous.status, 307)
624 self.assertEqual(response.previous.fromcache, False)
625
626 def testGet410(self):
627 # Test that we pass 410's through
628 uri = urllib.parse.urljoin(base, "410/410.asis")
629 (response, content) = self.http.request(uri, "GET")
630 self.assertEqual(response.status, 410)
631
chris.dent@gmail.comae846ca2009-12-24 14:02:57 -0600632 def testVaryHeaderSimple(self):
633 """
634 RFC 2616 13.6
635 When the cache receives a subsequent request whose Request-URI
636 specifies one or more cache entries including a Vary header field,
637 the cache MUST NOT use such a cache entry to construct a response
638 to the new request unless all of the selecting request-headers
639 present in the new request match the corresponding stored
640 request-headers in the original request.
641 """
642 # test that the vary header is sent
643 uri = urllib.parse.urljoin(base, "vary/accept.asis")
644 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
645 self.assertEqual(response.status, 200)
646 self.assertTrue('vary' in response)
647
648 # get the resource again, from the cache since accept header in this
649 # request is the same as the request
650 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
651 self.assertEqual(response.status, 200)
652 self.assertEqual(response.fromcache, True, msg="Should be from cache")
653
654 # get the resource again, not from cache since Accept headers does not match
655 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
656 self.assertEqual(response.status, 200)
657 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
658
659 # get the resource again, without any Accept header, so again no match
660 (response, content) = self.http.request(uri, "GET")
661 self.assertEqual(response.status, 200)
662 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
663
664 def testNoVary(self):
665 # when there is no vary, a different Accept header (e.g.) should not
666 # impact if the cache is used
667 # test that the vary header is not sent
668 uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
669 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
670 self.assertEqual(response.status, 200)
671 self.assertFalse('vary' in response)
672
673 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
674 self.assertEqual(response.status, 200)
675 self.assertEqual(response.fromcache, True, msg="Should be from cache")
676
677 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
678 self.assertEqual(response.status, 200)
679 self.assertEqual(response.fromcache, True, msg="Should be from cache")
680
681 def testVaryHeaderDouble(self):
682 uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
683 (response, content) = self.http.request(uri, "GET", headers={
684 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
685 self.assertEqual(response.status, 200)
686 self.assertTrue('vary' in response)
687
688 # we are from cache
689 (response, content) = self.http.request(uri, "GET", headers={
690 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
691 self.assertEqual(response.fromcache, True, msg="Should be from cache")
692
693 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
694 self.assertEqual(response.status, 200)
695 self.assertEqual(response.fromcache, False)
696
697 # get the resource again, not from cache, varied headers don't match exact
698 (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
699 self.assertEqual(response.status, 200)
700 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
701
jcgregorio@localhost9e603da2010-05-13 23:42:11 -0400702 def testVaryUnusedHeader(self):
703 # A header's value is not considered to vary if it's not used at all.
704 uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
705 (response, content) = self.http.request(uri, "GET", headers={
706 'Accept': 'text/plain'})
707 self.assertEqual(response.status, 200)
708 self.assertTrue('vary' in response)
709
710 # we are from cache
711 (response, content) = self.http.request(uri, "GET", headers={
712 'Accept': 'text/plain',})
713 self.assertEqual(response.fromcache, True, msg="Should be from cache")
714
pilgrim00a352e2009-05-29 04:04:44 +0000715 def testHeadGZip(self):
716 # Test that we don't try to decompress a HEAD response
717 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
718 (response, content) = self.http.request(uri, "HEAD")
719 self.assertEqual(response.status, 200)
720 self.assertNotEqual(int(response['content-length']), 0)
721 self.assertEqual(content, b"")
722
723 def testGetGZip(self):
724 # Test that we support gzip compression
725 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
726 (response, content) = self.http.request(uri, "GET")
727 self.assertEqual(response.status, 200)
728 self.assertFalse('content-encoding' in response)
729 self.assertTrue('-content-encoding' in response)
730 self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
731 self.assertEqual(content, b"This is the final destination.\n")
732
Joe Gregoriof952e7f2011-02-13 19:27:35 -0500733 def testPostAndGZipResponse(self):
734 uri = urllib.parse.urljoin(base, "gzip/post.cgi")
735 (response, content) = self.http.request(uri, "POST", body=" ")
736 self.assertEqual(response.status, 200)
737 self.assertFalse('content-encoding' in response)
738 self.assertTrue('-content-encoding' in response)
739
pilgrim00a352e2009-05-29 04:04:44 +0000740 def testGetGZipFailure(self):
741 # Test that we raise a good exception when the gzip fails
742 self.http.force_exception_to_status_code = False
743 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
744 try:
745 (response, content) = self.http.request(uri, "GET")
746 self.fail("Should never reach here")
747 except httplib2.FailedToDecompressContent:
748 pass
749 except Exception:
750 self.fail("Threw wrong kind of exception")
751
752 # Re-run the test with out the exceptions
753 self.http.force_exception_to_status_code = True
754
755 (response, content) = self.http.request(uri, "GET")
756 self.assertEqual(response.status, 500)
757 self.assertTrue(response.reason.startswith("Content purported"))
758
pilgrim00a352e2009-05-29 04:04:44 +0000759 def testIndividualTimeout(self):
760 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
761 http = httplib2.Http(timeout=1)
762 http.force_exception_to_status_code = True
763
764 (response, content) = http.request(uri)
765 self.assertEqual(response.status, 408)
766 self.assertTrue(response.reason.startswith("Request Timeout"))
767 self.assertTrue(content.startswith(b"Request Timeout"))
768
769
770 def testGetDeflate(self):
771 # Test that we support deflate compression
772 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
773 (response, content) = self.http.request(uri, "GET")
774 self.assertEqual(response.status, 200)
775 self.assertFalse('content-encoding' in response)
776 self.assertEqual(int(response['content-length']), len("This is the final destination."))
777 self.assertEqual(content, b"This is the final destination.")
778
779 def testGetDeflateFailure(self):
780 # Test that we raise a good exception when the deflate fails
781 self.http.force_exception_to_status_code = False
782
783 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
784 try:
785 (response, content) = self.http.request(uri, "GET")
786 self.fail("Should never reach here")
787 except httplib2.FailedToDecompressContent:
788 pass
789 except Exception:
790 self.fail("Threw wrong kind of exception")
791
792 # Re-run the test with out the exceptions
793 self.http.force_exception_to_status_code = True
794
795 (response, content) = self.http.request(uri, "GET")
796 self.assertEqual(response.status, 500)
797 self.assertTrue(response.reason.startswith("Content purported"))
798
799 def testGetDuplicateHeaders(self):
800 # Test that duplicate headers get concatenated via ','
801 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
802 (response, content) = self.http.request(uri, "GET")
803 self.assertEqual(response.status, 200)
804 self.assertEqual(content, b"This is content\n")
805 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
806
807 def testGetCacheControlNoCache(self):
808 # Test Cache-Control: no-cache on requests
809 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
810 (response, content) = self.http.request(uri, "GET")
811 self.assertNotEqual(response['etag'], "")
812 (response, content) = self.http.request(uri, "GET")
813 self.assertEqual(response.status, 200)
814 self.assertEqual(response.fromcache, True)
815
816 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
817 self.assertEqual(response.status, 200)
818 self.assertEqual(response.fromcache, False)
819
820 def testGetCacheControlPragmaNoCache(self):
821 # Test Pragma: no-cache on requests
822 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
823 (response, content) = self.http.request(uri, "GET")
824 self.assertNotEqual(response['etag'], "")
825 (response, content) = self.http.request(uri, "GET")
826 self.assertEqual(response.status, 200)
827 self.assertEqual(response.fromcache, True)
828
829 (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
830 self.assertEqual(response.status, 200)
831 self.assertEqual(response.fromcache, False)
832
833 def testGetCacheControlNoStoreRequest(self):
834 # A no-store request means that the response should not be stored.
835 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
836
837 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
838 self.assertEqual(response.status, 200)
839 self.assertEqual(response.fromcache, False)
840
841 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
842 self.assertEqual(response.status, 200)
843 self.assertEqual(response.fromcache, False)
844
845 def testGetCacheControlNoStoreResponse(self):
846 # A no-store response means that the response should not be stored.
847 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
848
849 (response, content) = self.http.request(uri, "GET")
850 self.assertEqual(response.status, 200)
851 self.assertEqual(response.fromcache, False)
852
853 (response, content) = self.http.request(uri, "GET")
854 self.assertEqual(response.status, 200)
855 self.assertEqual(response.fromcache, False)
856
857 def testGetCacheControlNoCacheNoStoreRequest(self):
858 # Test that a no-store, no-cache clears the entry from the cache
859 # even if it was cached previously.
860 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
861
862 (response, content) = self.http.request(uri, "GET")
863 (response, content) = self.http.request(uri, "GET")
864 self.assertEqual(response.fromcache, True)
865 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
866 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
867 self.assertEqual(response.status, 200)
868 self.assertEqual(response.fromcache, False)
869
870 def testUpdateInvalidatesCache(self):
871 # Test that calling PUT or DELETE on a
872 # URI that is cache invalidates that cache.
873 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
874
875 (response, content) = self.http.request(uri, "GET")
876 (response, content) = self.http.request(uri, "GET")
877 self.assertEqual(response.fromcache, True)
878 (response, content) = self.http.request(uri, "DELETE")
879 self.assertEqual(response.status, 405)
880
881 (response, content) = self.http.request(uri, "GET")
882 self.assertEqual(response.fromcache, False)
883
884 def testUpdateUsesCachedETag(self):
885 # Test that we natively support http://www.w3.org/1999/04/Editing/
886 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
887
888 (response, content) = self.http.request(uri, "GET")
889 self.assertEqual(response.status, 200)
890 self.assertEqual(response.fromcache, False)
891 (response, content) = self.http.request(uri, "GET")
892 self.assertEqual(response.status, 200)
893 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400894 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000895 self.assertEqual(response.status, 200)
Joe Gregorio799b2072009-09-29 17:21:19 -0400896 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000897 self.assertEqual(response.status, 412)
898
Joe Gregorio87a70d02011-05-24 14:06:09 -0400899
900 def testUpdatePatchUsesCachedETag(self):
901 # Test that we natively support http://www.w3.org/1999/04/Editing/
902 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
903
904 (response, content) = self.http.request(uri, "GET")
905 self.assertEqual(response.status, 200)
906 self.assertEqual(response.fromcache, False)
907 (response, content) = self.http.request(uri, "GET")
908 self.assertEqual(response.status, 200)
909 self.assertEqual(response.fromcache, True)
910 (response, content) = self.http.request(uri, "PATCH", body="foo")
911 self.assertEqual(response.status, 200)
912 (response, content) = self.http.request(uri, "PATCH", body="foo")
913 self.assertEqual(response.status, 412)
914
pilgrim00a352e2009-05-29 04:04:44 +0000915 def testUpdateUsesCachedETagAndOCMethod(self):
916 # Test that we natively support http://www.w3.org/1999/04/Editing/
917 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
918
919 (response, content) = self.http.request(uri, "GET")
920 self.assertEqual(response.status, 200)
921 self.assertEqual(response.fromcache, False)
922 (response, content) = self.http.request(uri, "GET")
923 self.assertEqual(response.status, 200)
924 self.assertEqual(response.fromcache, True)
925 self.http.optimistic_concurrency_methods.append("DELETE")
926 (response, content) = self.http.request(uri, "DELETE")
927 self.assertEqual(response.status, 200)
928
929
930 def testUpdateUsesCachedETagOverridden(self):
931 # Test that we natively support http://www.w3.org/1999/04/Editing/
932 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
933
934 (response, content) = self.http.request(uri, "GET")
935 self.assertEqual(response.status, 200)
936 self.assertEqual(response.fromcache, False)
937 (response, content) = self.http.request(uri, "GET")
938 self.assertEqual(response.status, 200)
939 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400940 (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
pilgrim00a352e2009-05-29 04:04:44 +0000941 self.assertEqual(response.status, 412)
942
943 def testBasicAuth(self):
944 # Test Basic Authentication
945 uri = urllib.parse.urljoin(base, "basic/file.txt")
946 (response, content) = self.http.request(uri, "GET")
947 self.assertEqual(response.status, 401)
948
949 uri = urllib.parse.urljoin(base, "basic/")
950 (response, content) = self.http.request(uri, "GET")
951 self.assertEqual(response.status, 401)
952
953 self.http.add_credentials('joe', 'password')
954 (response, content) = self.http.request(uri, "GET")
955 self.assertEqual(response.status, 200)
956
957 uri = urllib.parse.urljoin(base, "basic/file.txt")
958 (response, content) = self.http.request(uri, "GET")
959 self.assertEqual(response.status, 200)
960
961 def testBasicAuthWithDomain(self):
962 # Test Basic Authentication
963 uri = urllib.parse.urljoin(base, "basic/file.txt")
964 (response, content) = self.http.request(uri, "GET")
965 self.assertEqual(response.status, 401)
966
967 uri = urllib.parse.urljoin(base, "basic/")
968 (response, content) = self.http.request(uri, "GET")
969 self.assertEqual(response.status, 401)
970
971 self.http.add_credentials('joe', 'password', "example.org")
972 (response, content) = self.http.request(uri, "GET")
973 self.assertEqual(response.status, 401)
974
975 uri = urllib.parse.urljoin(base, "basic/file.txt")
976 (response, content) = self.http.request(uri, "GET")
977 self.assertEqual(response.status, 401)
978
979 domain = urllib.parse.urlparse(base)[1]
980 self.http.add_credentials('joe', 'password', domain)
981 (response, content) = self.http.request(uri, "GET")
982 self.assertEqual(response.status, 200)
983
984 uri = urllib.parse.urljoin(base, "basic/file.txt")
985 (response, content) = self.http.request(uri, "GET")
986 self.assertEqual(response.status, 200)
987
988
989
990
991
992
993 def testBasicAuthTwoDifferentCredentials(self):
994 # Test Basic Authentication with multiple sets of credentials
995 uri = urllib.parse.urljoin(base, "basic2/file.txt")
996 (response, content) = self.http.request(uri, "GET")
997 self.assertEqual(response.status, 401)
998
999 uri = urllib.parse.urljoin(base, "basic2/")
1000 (response, content) = self.http.request(uri, "GET")
1001 self.assertEqual(response.status, 401)
1002
1003 self.http.add_credentials('fred', 'barney')
1004 (response, content) = self.http.request(uri, "GET")
1005 self.assertEqual(response.status, 200)
1006
1007 uri = urllib.parse.urljoin(base, "basic2/file.txt")
1008 (response, content) = self.http.request(uri, "GET")
1009 self.assertEqual(response.status, 200)
1010
1011 def testBasicAuthNested(self):
1012 # Test Basic Authentication with resources
1013 # that are nested
1014 uri = urllib.parse.urljoin(base, "basic-nested/")
1015 (response, content) = self.http.request(uri, "GET")
1016 self.assertEqual(response.status, 401)
1017
1018 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1019 (response, content) = self.http.request(uri, "GET")
1020 self.assertEqual(response.status, 401)
1021
1022 # Now add in credentials one at a time and test.
1023 self.http.add_credentials('joe', 'password')
1024
1025 uri = urllib.parse.urljoin(base, "basic-nested/")
1026 (response, content) = self.http.request(uri, "GET")
1027 self.assertEqual(response.status, 200)
1028
1029 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1030 (response, content) = self.http.request(uri, "GET")
1031 self.assertEqual(response.status, 401)
1032
1033 self.http.add_credentials('fred', 'barney')
1034
1035 uri = urllib.parse.urljoin(base, "basic-nested/")
1036 (response, content) = self.http.request(uri, "GET")
1037 self.assertEqual(response.status, 200)
1038
1039 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1040 (response, content) = self.http.request(uri, "GET")
1041 self.assertEqual(response.status, 200)
1042
1043 def testDigestAuth(self):
1044 # Test that we support Digest Authentication
1045 uri = urllib.parse.urljoin(base, "digest/")
1046 (response, content) = self.http.request(uri, "GET")
1047 self.assertEqual(response.status, 401)
1048
1049 self.http.add_credentials('joe', 'password')
1050 (response, content) = self.http.request(uri, "GET")
1051 self.assertEqual(response.status, 200)
1052
1053 uri = urllib.parse.urljoin(base, "digest/file.txt")
1054 (response, content) = self.http.request(uri, "GET")
1055
1056 def testDigestAuthNextNonceAndNC(self):
1057 # Test that if the server sets nextnonce that we reset
1058 # the nonce count back to 1
1059 uri = urllib.parse.urljoin(base, "digest/file.txt")
1060 self.http.add_credentials('joe', 'password')
1061 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1062 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1063 self.assertEqual(response.status, 200)
1064 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1065 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
1066 self.assertEqual(response.status, 200)
1067
1068 if 'nextnonce' in info:
1069 self.assertEqual(info2['nc'], 1)
1070
1071 def testDigestAuthStale(self):
1072 # Test that we can handle a nonce becoming stale
1073 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
1074 self.http.add_credentials('joe', 'password')
1075 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1076 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1077 self.assertEqual(response.status, 200)
1078
1079 time.sleep(3)
1080 # Sleep long enough that the nonce becomes stale
1081
1082 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1083 self.assertFalse(response.fromcache)
1084 self.assertTrue(response._stale_digest)
1085 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
1086 self.assertEqual(response.status, 200)
1087
1088 def reflector(self, content):
1089 return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
1090
1091 def testReflector(self):
1092 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1093 (response, content) = self.http.request(uri, "GET")
1094 d = self.reflector(content)
1095 self.assertTrue('HTTP_USER_AGENT' in d)
1096
Joe Gregorio84cc10a2009-09-01 13:02:49 -04001097
1098 def testConnectionClose(self):
1099 uri = "http://www.google.com/"
1100 (response, content) = self.http.request(uri, "GET")
1101 for c in self.http.connections.values():
1102 self.assertNotEqual(None, c.sock)
1103 (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
1104 for c in self.http.connections.values():
1105 self.assertEqual(None, c.sock)
1106
pilgrim00a352e2009-05-29 04:04:44 +00001107try:
1108 import memcache
1109 class HttpTestMemCached(HttpTest):
1110 def setUp(self):
1111 self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
1112 #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1113 self.http = httplib2.Http(self.cache)
1114 self.cache.flush_all()
1115 # Not exactly sure why the sleep is needed here, but
1116 # if not present then some unit tests that rely on caching
1117 # fail. Memcached seems to lose some sets immediately
1118 # after a flush_all if the set is to a value that
1119 # was previously cached. (Maybe the flush is handled async?)
1120 time.sleep(1)
1121 self.http.clear_credentials()
1122except:
1123 pass
1124
1125
1126
1127# ------------------------------------------------------------------------
1128
1129class HttpPrivateTest(unittest.TestCase):
1130
1131 def testParseCacheControl(self):
1132 # Test that we can parse the Cache-Control header
1133 self.assertEqual({}, httplib2._parse_cache_control({}))
1134 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
1135 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
1136 self.assertEqual(cc['no-cache'], 1)
1137 self.assertEqual(cc['max-age'], '7200')
1138 cc = httplib2._parse_cache_control({'cache-control': ' , '})
1139 self.assertEqual(cc[''], 1)
1140
Joe Gregorioe314e8b2009-07-16 20:11:28 -04001141 try:
1142 cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
1143 self.assertTrue("max-age" in cc)
1144 except:
1145 self.fail("Should not throw exception")
1146
1147
1148
1149
pilgrim00a352e2009-05-29 04:04:44 +00001150 def testNormalizeHeaders(self):
1151 # Test that we normalize headers to lowercase
1152 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
1153 self.assertTrue('cache-control' in h)
1154 self.assertTrue('other' in h)
1155 self.assertEqual('Stuff', h['other'])
1156
1157 def testExpirationModelTransparent(self):
1158 # Test that no-cache makes our request TRANSPARENT
1159 response_headers = {
1160 'cache-control': 'max-age=7200'
1161 }
1162 request_headers = {
1163 'cache-control': 'no-cache'
1164 }
1165 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
1166
1167 def testMaxAgeNonNumeric(self):
1168 # Test that no-cache makes our request TRANSPARENT
1169 response_headers = {
1170 'cache-control': 'max-age=fred, min-fresh=barney'
1171 }
1172 request_headers = {
1173 }
1174 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1175
1176
1177 def testExpirationModelNoCacheResponse(self):
1178 # The date and expires point to an entry that should be
1179 # FRESH, but the no-cache over-rides that.
1180 now = time.time()
1181 response_headers = {
1182 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1183 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1184 'cache-control': 'no-cache'
1185 }
1186 request_headers = {
1187 }
1188 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1189
1190 def testExpirationModelStaleRequestMustReval(self):
1191 # must-revalidate forces STALE
1192 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
1193
1194 def testExpirationModelStaleResponseMustReval(self):
1195 # must-revalidate forces STALE
1196 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
1197
1198 def testExpirationModelFresh(self):
1199 response_headers = {
1200 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1201 'cache-control': 'max-age=2'
1202 }
1203 request_headers = {
1204 }
1205 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1206 time.sleep(3)
1207 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1208
1209 def testExpirationMaxAge0(self):
1210 response_headers = {
1211 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1212 'cache-control': 'max-age=0'
1213 }
1214 request_headers = {
1215 }
1216 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1217
1218 def testExpirationModelDateAndExpires(self):
1219 now = time.time()
1220 response_headers = {
1221 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1222 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1223 }
1224 request_headers = {
1225 }
1226 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1227 time.sleep(3)
1228 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1229
1230 def testExpiresZero(self):
1231 now = time.time()
1232 response_headers = {
1233 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1234 'expires': "0",
1235 }
1236 request_headers = {
1237 }
1238 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1239
1240 def testExpirationModelDateOnly(self):
1241 now = time.time()
1242 response_headers = {
1243 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
1244 }
1245 request_headers = {
1246 }
1247 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1248
1249 def testExpirationModelOnlyIfCached(self):
1250 response_headers = {
1251 }
1252 request_headers = {
1253 'cache-control': 'only-if-cached',
1254 }
1255 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1256
1257 def testExpirationModelMaxAgeBoth(self):
1258 now = time.time()
1259 response_headers = {
1260 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1261 'cache-control': 'max-age=2'
1262 }
1263 request_headers = {
1264 'cache-control': 'max-age=0'
1265 }
1266 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1267
1268 def testExpirationModelDateAndExpiresMinFresh1(self):
1269 now = time.time()
1270 response_headers = {
1271 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1272 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1273 }
1274 request_headers = {
1275 'cache-control': 'min-fresh=2'
1276 }
1277 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1278
1279 def testExpirationModelDateAndExpiresMinFresh2(self):
1280 now = time.time()
1281 response_headers = {
1282 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1283 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1284 }
1285 request_headers = {
1286 'cache-control': 'min-fresh=2'
1287 }
1288 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1289
1290 def testParseWWWAuthenticateEmpty(self):
1291 res = httplib2._parse_www_authenticate({})
1292 self.assertEqual(len(list(res.keys())), 0)
1293
1294 def testParseWWWAuthenticate(self):
1295 # different uses of spaces around commas
1296 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
1297 self.assertEqual(len(list(res.keys())), 1)
1298 self.assertEqual(len(list(res['test'].keys())), 5)
1299
1300 # tokens with non-alphanum
1301 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
1302 self.assertEqual(len(list(res.keys())), 1)
1303 self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
1304
1305 # quoted string with quoted pairs
1306 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
1307 self.assertEqual(len(list(res.keys())), 1)
1308 self.assertEqual(res['test']['realm'], 'a "test" realm')
1309
1310 def testParseWWWAuthenticateStrict(self):
1311 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
1312 self.testParseWWWAuthenticate();
1313 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
1314
1315 def testParseWWWAuthenticateBasic(self):
1316 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
1317 basic = res['basic']
1318 self.assertEqual('me', basic['realm'])
1319
1320 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
1321 basic = res['basic']
1322 self.assertEqual('me', basic['realm'])
1323 self.assertEqual('MD5', basic['algorithm'])
1324
1325 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
1326 basic = res['basic']
1327 self.assertEqual('me', basic['realm'])
1328 self.assertEqual('MD5', basic['algorithm'])
1329
1330 def testParseWWWAuthenticateBasic2(self):
1331 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
1332 basic = res['basic']
1333 self.assertEqual('me', basic['realm'])
1334 self.assertEqual('fred', basic['other'])
1335
1336 def testParseWWWAuthenticateBasic3(self):
1337 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
1338 basic = res['basic']
1339 self.assertEqual('me', basic['realm'])
1340
1341
1342 def testParseWWWAuthenticateDigest(self):
1343 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1344 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
1345 digest = res['digest']
1346 self.assertEqual('testrealm@host.com', digest['realm'])
1347 self.assertEqual('auth,auth-int', digest['qop'])
1348
1349
1350 def testParseWWWAuthenticateMultiple(self):
1351 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1352 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
1353 digest = res['digest']
1354 self.assertEqual('testrealm@host.com', digest['realm'])
1355 self.assertEqual('auth,auth-int', digest['qop'])
1356 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1357 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1358 basic = res['basic']
1359 self.assertEqual('me', basic['realm'])
1360
1361 def testParseWWWAuthenticateMultiple2(self):
1362 # Handle an added comma between challenges, which might get thrown in if the challenges were
1363 # originally sent in separate www-authenticate headers.
1364 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1365 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
1366 digest = res['digest']
1367 self.assertEqual('testrealm@host.com', digest['realm'])
1368 self.assertEqual('auth,auth-int', digest['qop'])
1369 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1370 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1371 basic = res['basic']
1372 self.assertEqual('me', basic['realm'])
1373
1374 def testParseWWWAuthenticateMultiple3(self):
1375 # Handle an added comma between challenges, which might get thrown in if the challenges were
1376 # originally sent in separate www-authenticate headers.
1377 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1378 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1379 digest = res['digest']
1380 self.assertEqual('testrealm@host.com', digest['realm'])
1381 self.assertEqual('auth,auth-int', digest['qop'])
1382 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1383 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1384 basic = res['basic']
1385 self.assertEqual('me', basic['realm'])
1386 wsse = res['wsse']
1387 self.assertEqual('foo', wsse['realm'])
1388 self.assertEqual('UsernameToken', wsse['profile'])
1389
1390 def testParseWWWAuthenticateMultiple4(self):
1391 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1392 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1393 digest = res['digest']
1394 self.assertEqual('test-real.m@host.com', digest['realm'])
1395 self.assertEqual('\tauth,auth-int', digest['qop'])
1396 self.assertEqual('(*)&^&$%#', digest['nonce'])
1397
1398 def testParseWWWAuthenticateMoreQuoteCombos(self):
1399 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
1400 digest = res['digest']
1401 self.assertEqual('myrealm', digest['realm'])
1402
Joe Gregorio8d576032011-02-13 22:45:06 -05001403 def testParseWWWAuthenticateMalformed(self):
1404 try:
1405 res = httplib2._parse_www_authenticate({'www-authenticate':'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'})
1406 self.fail("should raise an exception")
1407 except httplib2.MalformedHeader:
1408 pass
1409
pilgrim00a352e2009-05-29 04:04:44 +00001410 def testDigestObject(self):
1411 credentials = ('joe', 'password')
1412 host = None
1413 request_uri = '/projects/httplib2/test/digest/'
1414 headers = {}
1415 response = {
1416 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
1417 }
1418 content = b""
Joe Gregorio23c2f8d2011-06-13 14:06:23 -04001419
pilgrim00a352e2009-05-29 04:04:44 +00001420 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1421 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
Joe Gregorio23c2f8d2011-06-13 14:06:23 -04001422 our_request = "authorization: %s" % headers['authorization']
1423 working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
pilgrim00a352e2009-05-29 04:04:44 +00001424 self.assertEqual(our_request, working_request)
1425
1426
1427 def testDigestObjectStale(self):
1428 credentials = ('joe', 'password')
1429 host = None
1430 request_uri = '/projects/httplib2/test/digest/'
1431 headers = {}
1432 response = httplib2.Response({ })
1433 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1434 response.status = 401
1435 content = b""
1436 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1437 # Returns true to force a retry
1438 self.assertTrue( d.response(response, content) )
1439
1440 def testDigestObjectAuthInfo(self):
1441 credentials = ('joe', 'password')
1442 host = None
1443 request_uri = '/projects/httplib2/test/digest/'
1444 headers = {}
1445 response = httplib2.Response({ })
1446 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1447 response['authentication-info'] = 'nextnonce="fred"'
1448 content = b""
1449 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1450 # Returns true to force a retry
1451 self.assertFalse( d.response(response, content) )
1452 self.assertEqual('fred', d.challenge['nonce'])
1453 self.assertEqual(1, d.challenge['nc'])
1454
1455 def testWsseAlgorithm(self):
1456 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
1457 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1458 self.assertEqual(expected, digest)
1459
1460 def testEnd2End(self):
1461 # one end to end header
1462 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
1463 end2end = httplib2._get_end2end_headers(response)
1464 self.assertTrue('content-type' in end2end)
1465 self.assertTrue('te' not in end2end)
1466 self.assertTrue('connection' not in end2end)
1467
1468 # one end to end header that gets eliminated
1469 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
1470 end2end = httplib2._get_end2end_headers(response)
1471 self.assertTrue('content-type' not in end2end)
1472 self.assertTrue('te' not in end2end)
1473 self.assertTrue('connection' not in end2end)
1474
1475 # Degenerate case of no headers
1476 response = {}
1477 end2end = httplib2._get_end2end_headers(response)
Joe Gregorio732b11f2011-06-07 15:44:51 -04001478 self.assertEqual(0, len(end2end))
pilgrim00a352e2009-05-29 04:04:44 +00001479
1480 # Degenerate case of connection referrring to a header not passed in
1481 response = {'connection': 'content-type'}
1482 end2end = httplib2._get_end2end_headers(response)
Joe Gregorio732b11f2011-06-07 15:44:51 -04001483 self.assertEqual(0, len(end2end))
pilgrim00a352e2009-05-29 04:04:44 +00001484
1485unittest.main()