blob: 5e7e930968ea6d347b8bf420dca475a5b339681c [file] [log] [blame]
pilgrim00a352e2009-05-29 04:04:44 +00001#!/usr/bin/env python3
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 3.0 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = ["Mark Pilgrim"]
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.2 ($Rev: 118 $)"
16
Joe Gregoriod760a1a2011-02-11 01:03:22 -050017import base64
pilgrim00a352e2009-05-29 04:04:44 +000018import http.client
19import httplib2
pilgrim00a352e2009-05-29 04:04:44 +000020import io
Joe Gregoriod760a1a2011-02-11 01:03:22 -050021import os
22import socket
23import sys
24import time
25import unittest
26import urllib.parse
pilgrim00a352e2009-05-29 04:04:44 +000027
28# The test resources base uri
29base = 'http://bitworking.org/projects/httplib2/test/'
30#base = 'http://localhost/projects/httplib2/test/'
31cacheDirName = ".cache"
32
33
34class CredentialsTest(unittest.TestCase):
35 def test(self):
36 c = httplib2.Credentials()
37 c.add("joe", "password")
38 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
39 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
40 c.add("fred", "password2", "wellformedweb.org")
41 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
42 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
43 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
44 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
45 c.clear()
46 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
47 c.add("fred", "password2", "wellformedweb.org")
48 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
49 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
50 self.assertEqual(0, len(list(c.iter(""))))
51
52
53class ParserTest(unittest.TestCase):
54 def testFromStd66(self):
55 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
56 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
57 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
58 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
59 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
60 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
61 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
62 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
63
64
65class UrlNormTest(unittest.TestCase):
66 def test(self):
67 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
68 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
69 self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
70 self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
71 self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
72 self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
73 try:
74 httplib2.urlnorm("/")
75 self.fail("Non-absolute URIs should raise an exception")
76 except httplib2.RelativeURIError:
77 pass
78
79class UrlSafenameTest(unittest.TestCase):
80 def test(self):
81 # Test that different URIs end up generating different safe names
82 self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
83 self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
84 self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
85 self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
86 self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
87 self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
88 # Test the max length limits
89 uri = "http://" + ("w" * 200) + ".org"
90 uri2 = "http://" + ("w" * 201) + ".org"
91 self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
92 # Max length should be 200 + 1 (",") + 32
93 self.assertEqual(233, len(httplib2.safename(uri2)))
94 self.assertEqual(233, len(httplib2.safename(uri)))
95 # Unicode
96 if sys.version_info >= (2,3):
97 self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
98
99class _MyResponse(io.BytesIO):
100 def __init__(self, body, **kwargs):
101 io.BytesIO.__init__(self, body)
102 self.headers = kwargs
103
104 def items(self):
105 return self.headers.items()
106
107 def iteritems(self):
108 return iter(self.headers.items())
109
110
111class _MyHTTPConnection(object):
112 "This class is just a mock of httplib.HTTPConnection used for testing"
113
114 def __init__(self, host, port=None, key_file=None, cert_file=None,
115 strict=None, timeout=None, proxy_info=None):
116 self.host = host
117 self.port = port
118 self.timeout = timeout
119 self.log = ""
120
121 def set_debuglevel(self, level):
122 pass
123
124 def connect(self):
125 "Connect to a host on a given port."
126 pass
127
128 def close(self):
129 pass
130
131 def request(self, method, request_uri, body, headers):
132 pass
133
134 def getresponse(self):
135 return _MyResponse(b"the body", status="200")
136
137
138class HttpTest(unittest.TestCase):
139 def setUp(self):
140 if os.path.exists(cacheDirName):
141 [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
142 self.http = httplib2.Http(cacheDirName)
143 self.http.clear_credentials()
144
Joe Gregorio756d3b32011-02-13 11:59:51 -0500145 def testIPv6NoSSL(self):
146 try:
147 self.http.request("http://[::1]/")
148 except socket.gaierror:
149 self.fail("should get the address family right for IPv6")
150 except socket.error:
151 # Even if IPv6 isn't installed on a machine it should just raise socket.error
152 pass
153
154 def testIPv6SSL(self):
155 try:
156 self.http.request("https://[::1]/")
157 except socket.gaierror:
158 self.fail("should get the address family right for IPv6")
159 except socket.error:
160 # Even if IPv6 isn't installed on a machine it should just raise socket.error
161 pass
162
pilgrim00a352e2009-05-29 04:04:44 +0000163 def testConnectionType(self):
164 self.http.force_exception_to_status_code = False
165 response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
166 self.assertEqual(response['content-location'], "http://bitworking.org")
167 self.assertEqual(content, b"the body")
168
169 def testGetUnknownServer(self):
170 self.http.force_exception_to_status_code = False
171 try:
172 self.http.request("http://fred.bitworking.org/")
173 self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
174 except httplib2.ServerNotFoundError:
175 pass
176
177 # Now test with exceptions turned off
178 self.http.force_exception_to_status_code = True
179
180 (response, content) = self.http.request("http://fred.bitworking.org/")
181 self.assertEqual(response['content-type'], 'text/plain')
182 self.assertTrue(content.startswith(b"Unable to find"))
183 self.assertEqual(response.status, 400)
184
Joe Gregoriod760a1a2011-02-11 01:03:22 -0500185 def testGetConnectionRefused(self):
186 self.http.force_exception_to_status_code = False
187 try:
188 self.http.request("http://localhost:7777/")
189 self.fail("An socket.error exception must be thrown on Connection Refused.")
190 except socket.error:
191 pass
192
193 # Now test with exceptions turned off
194 self.http.force_exception_to_status_code = True
195
196 (response, content) = self.http.request("http://localhost:7777/")
197 self.assertEqual(response['content-type'], 'text/plain')
198 self.assertTrue(b"Connection refused" in content)
199 self.assertEqual(response.status, 400)
200
pilgrim00a352e2009-05-29 04:04:44 +0000201 def testGetIRI(self):
202 if sys.version_info >= (2,3):
203 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
204 (response, content) = self.http.request(uri, "GET")
205 d = self.reflector(content)
206 self.assertTrue('QUERY_STRING' in d)
207 self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
208
209 def testGetIsDefaultMethod(self):
210 # Test that GET is the default method
211 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
212 (response, content) = self.http.request(uri)
213 self.assertEqual(response['x-method'], "GET")
214
215 def testDifferentMethods(self):
216 # Test that all methods can be used
217 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
218 for method in ["GET", "PUT", "DELETE", "POST"]:
219 (response, content) = self.http.request(uri, method, body=b" ")
220 self.assertEqual(response['x-method'], method)
221
Joe Gregoriob628c0b2009-07-16 12:28:04 -0400222 def testHeadRead(self):
223 # Test that we don't try to read the response of a HEAD request
224 # since httplib blocks response.read() for HEAD requests.
225 # Oddly enough this doesn't appear as a problem when doing HEAD requests
226 # against Apache servers.
227 uri = "http://www.google.com/"
228 (response, content) = self.http.request(uri, "HEAD")
229 self.assertEqual(response.status, 200)
230 self.assertEqual(content, b"")
231
pilgrim00a352e2009-05-29 04:04:44 +0000232 def testGetNoCache(self):
233 # Test that can do a GET w/o the cache turned on.
234 http = httplib2.Http()
235 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
236 (response, content) = http.request(uri, "GET")
237 self.assertEqual(response.status, 200)
238 self.assertEqual(response.previous, None)
239
Joe Gregorioe202d212009-07-16 14:57:52 -0400240 def testGetOnlyIfCachedCacheHit(self):
241 # Test that can do a GET with cache and 'only-if-cached'
242 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
243 (response, content) = self.http.request(uri, "GET")
244 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
245 self.assertEqual(response.fromcache, True)
246 self.assertEqual(response.status, 200)
247
pilgrim00a352e2009-05-29 04:04:44 +0000248 def testGetOnlyIfCachedCacheMiss(self):
249 # Test that can do a GET with no cache with 'only-if-cached'
pilgrim00a352e2009-05-29 04:04:44 +0000250 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorioe202d212009-07-16 14:57:52 -0400251 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
pilgrim00a352e2009-05-29 04:04:44 +0000252 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400253 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000254
255 def testGetOnlyIfCachedNoCacheAtAll(self):
256 # Test that can do a GET with no cache with 'only-if-cached'
257 # Of course, there might be an intermediary beyond us
258 # that responds to the 'only-if-cached', so this
259 # test can't really be guaranteed to pass.
260 http = httplib2.Http()
261 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
262 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
263 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400264 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000265
266 def testUserAgent(self):
267 # Test that we provide a default user-agent
268 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
269 (response, content) = self.http.request(uri, "GET")
270 self.assertEqual(response.status, 200)
271 self.assertTrue(content.startswith(b"Python-httplib2/"))
272
273 def testUserAgentNonDefault(self):
274 # Test that the default user-agent can be over-ridden
275
276 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
277 (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
278 self.assertEqual(response.status, 200)
279 self.assertTrue(content.startswith(b"fred/1.0"))
280
281 def testGet300WithLocation(self):
282 # Test the we automatically follow 300 redirects if a Location: header is provided
283 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
284 (response, content) = self.http.request(uri, "GET")
285 self.assertEqual(response.status, 200)
286 self.assertEqual(content, b"This is the final destination.\n")
287 self.assertEqual(response.previous.status, 300)
288 self.assertEqual(response.previous.fromcache, False)
289
290 # Confirm that the intermediate 300 is not cached
291 (response, content) = self.http.request(uri, "GET")
292 self.assertEqual(response.status, 200)
293 self.assertEqual(content, b"This is the final destination.\n")
294 self.assertEqual(response.previous.status, 300)
295 self.assertEqual(response.previous.fromcache, False)
296
297 def testGet300WithLocationNoRedirect(self):
298 # Test the we automatically follow 300 redirects if a Location: header is provided
299 self.http.follow_redirects = False
300 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
301 (response, content) = self.http.request(uri, "GET")
302 self.assertEqual(response.status, 300)
303
304 def testGet300WithoutLocation(self):
305 # Not giving a Location: header in a 300 response is acceptable
306 # In which case we just return the 300 response
307 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
308 (response, content) = self.http.request(uri, "GET")
309 self.assertEqual(response.status, 300)
310 self.assertTrue(response['content-type'].startswith("text/html"))
311 self.assertEqual(response.previous, None)
312
313 def testGet301(self):
314 # Test that we automatically follow 301 redirects
315 # and that we cache the 301 response
316 uri = urllib.parse.urljoin(base, "301/onestep.asis")
317 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
318 (response, content) = self.http.request(uri, "GET")
319 self.assertEqual(response.status, 200)
320 self.assertTrue('content-location' in response)
321 self.assertEqual(response['content-location'], destination)
322 self.assertEqual(content, b"This is the final destination.\n")
323 self.assertEqual(response.previous.status, 301)
324 self.assertEqual(response.previous.fromcache, False)
325
326 (response, content) = self.http.request(uri, "GET")
327 self.assertEqual(response.status, 200)
328 self.assertEqual(response['content-location'], destination)
329 self.assertEqual(content, b"This is the final destination.\n")
330 self.assertEqual(response.previous.status, 301)
331 self.assertEqual(response.previous.fromcache, True)
332
Joe Gregorio0abd39f2011-02-13 21:40:09 -0500333 def testHead301(self):
334 # Test that we automatically follow 301 redirects
335 uri = urllib.parse.urljoin(base, "301/onestep.asis")
336 (response, content) = self.http.request(uri, "HEAD")
337 self.assertEqual(response.status, 200)
338 self.assertEqual(response.previous.status, 301)
339 self.assertEqual(response.previous.fromcache, False)
pilgrim00a352e2009-05-29 04:04:44 +0000340
341 def testGet301NoRedirect(self):
342 # Test that we automatically follow 301 redirects
343 # and that we cache the 301 response
344 self.http.follow_redirects = False
345 uri = urllib.parse.urljoin(base, "301/onestep.asis")
346 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
347 (response, content) = self.http.request(uri, "GET")
348 self.assertEqual(response.status, 301)
349
350
351 def testGet302(self):
352 # Test that we automatically follow 302 redirects
353 # and that we DO NOT cache the 302 response
354 uri = urllib.parse.urljoin(base, "302/onestep.asis")
355 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
356 (response, content) = self.http.request(uri, "GET")
357 self.assertEqual(response.status, 200)
358 self.assertEqual(response['content-location'], destination)
359 self.assertEqual(content, b"This is the final destination.\n")
360 self.assertEqual(response.previous.status, 302)
361 self.assertEqual(response.previous.fromcache, False)
362
363 uri = urllib.parse.urljoin(base, "302/onestep.asis")
364 (response, content) = self.http.request(uri, "GET")
365 self.assertEqual(response.status, 200)
366 self.assertEqual(response.fromcache, True)
367 self.assertEqual(response['content-location'], destination)
368 self.assertEqual(content, b"This is the final destination.\n")
369 self.assertEqual(response.previous.status, 302)
370 self.assertEqual(response.previous.fromcache, False)
371 self.assertEqual(response.previous['content-location'], uri)
372
373 uri = urllib.parse.urljoin(base, "302/twostep.asis")
374
375 (response, content) = self.http.request(uri, "GET")
376 self.assertEqual(response.status, 200)
377 self.assertEqual(response.fromcache, True)
378 self.assertEqual(content, b"This is the final destination.\n")
379 self.assertEqual(response.previous.status, 302)
380 self.assertEqual(response.previous.fromcache, False)
381
382 def testGet302RedirectionLimit(self):
383 # Test that we can set a lower redirection limit
384 # and that we raise an exception when we exceed
385 # that limit.
386 self.http.force_exception_to_status_code = False
387
388 uri = urllib.parse.urljoin(base, "302/twostep.asis")
389 try:
390 (response, content) = self.http.request(uri, "GET", redirections = 1)
391 self.fail("This should not happen")
392 except httplib2.RedirectLimit:
393 pass
394 except Exception as e:
395 self.fail("Threw wrong kind of exception ")
396
397 # Re-run the test with out the exceptions
398 self.http.force_exception_to_status_code = True
399
400 (response, content) = self.http.request(uri, "GET", redirections = 1)
401 self.assertEqual(response.status, 500)
402 self.assertTrue(response.reason.startswith("Redirected more"))
403 self.assertEqual("302", response['status'])
404 self.assertTrue(content.startswith(b"<html>"))
405 self.assertTrue(response.previous != None)
406
407 def testGet302NoLocation(self):
408 # Test that we throw an exception when we get
409 # a 302 with no Location: header.
410 self.http.force_exception_to_status_code = False
411 uri = urllib.parse.urljoin(base, "302/no-location.asis")
412 try:
413 (response, content) = self.http.request(uri, "GET")
414 self.fail("Should never reach here")
415 except httplib2.RedirectMissingLocation:
416 pass
417 except Exception as e:
418 self.fail("Threw wrong kind of exception ")
419
420 # Re-run the test with out the exceptions
421 self.http.force_exception_to_status_code = True
422
423 (response, content) = self.http.request(uri, "GET")
424 self.assertEqual(response.status, 500)
425 self.assertTrue(response.reason.startswith("Redirected but"))
426 self.assertEqual("302", response['status'])
427 self.assertTrue(content.startswith(b"This is content"))
428
429 def testGet302ViaHttps(self):
430 # Google always redirects to http://google.com
431 (response, content) = self.http.request("https://google.com", "GET")
432 self.assertEqual(200, response.status)
433 self.assertEqual(302, response.previous.status)
434
435 def testGetViaHttps(self):
436 # Test that we can handle HTTPS
437 (response, content) = self.http.request("https://google.com/adsense/", "GET")
438 self.assertEqual(200, response.status)
439
440 def testGetViaHttpsSpecViolationOnLocation(self):
441 # Test that we follow redirects through HTTPS
442 # even if they violate the spec by including
443 # a relative Location: header instead of an
444 # absolute one.
445 (response, content) = self.http.request("https://google.com/adsense", "GET")
446 self.assertEqual(200, response.status)
447 self.assertNotEqual(None, response.previous)
448
449
450 def testGetViaHttpsKeyCert(self):
451 # At this point I can only test
452 # that the key and cert files are passed in
453 # correctly to httplib. It would be nice to have
454 # a real https endpoint to test against.
455 http = httplib2.Http(timeout=2)
456
457 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
458 try:
459 (response, content) = http.request("https://bitworking.org", "GET")
460 except:
461 pass
462 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
463 self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
464
465 try:
466 (response, content) = http.request("https://notthere.bitworking.org", "GET")
467 except:
468 pass
469 self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
470 self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
471
472
473
474
475 def testGet303(self):
476 # Do a follow-up GET on a Location: header
477 # returned from a POST that gave a 303.
478 uri = urllib.parse.urljoin(base, "303/303.cgi")
479 (response, content) = self.http.request(uri, "POST", " ")
480 self.assertEqual(response.status, 200)
481 self.assertEqual(content, b"This is the final destination.\n")
482 self.assertEqual(response.previous.status, 303)
483
484 def testGet303NoRedirect(self):
485 # Do a follow-up GET on a Location: header
486 # returned from a POST that gave a 303.
487 self.http.follow_redirects = False
488 uri = urllib.parse.urljoin(base, "303/303.cgi")
489 (response, content) = self.http.request(uri, "POST", " ")
490 self.assertEqual(response.status, 303)
491
492 def test303ForDifferentMethods(self):
493 # Test that all methods can be used
494 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
495 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
496 (response, content) = self.http.request(uri, method, body=b" ")
497 self.assertEqual(response['x-method'], method_on_303)
498
499 def testGet304(self):
500 # Test that we use ETags properly to validate our cache
501 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
502 (response, content) = self.http.request(uri, "GET")
503 self.assertNotEqual(response['etag'], "")
504
505 (response, content) = self.http.request(uri, "GET")
506 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
507 self.assertEqual(response.status, 200)
508 self.assertEqual(response.fromcache, True)
509
510 cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
511 f = open(cache_file_name, "r")
512 status_line = f.readline()
513 f.close()
514
515 self.assertTrue(status_line.startswith("status:"))
516
517 (response, content) = self.http.request(uri, "HEAD")
518 self.assertEqual(response.status, 200)
519 self.assertEqual(response.fromcache, True)
520
521 (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
522 self.assertEqual(response.status, 206)
523 self.assertEqual(response.fromcache, False)
524
525 def testGetIgnoreEtag(self):
526 # Test that we can forcibly ignore ETags
527 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
528 (response, content) = self.http.request(uri, "GET")
529 self.assertNotEqual(response['etag'], "")
530
531 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
532 d = self.reflector(content)
533 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
534
535 self.http.ignore_etag = True
536 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
537 d = self.reflector(content)
538 self.assertEqual(response.fromcache, False)
539 self.assertFalse('HTTP_IF_NONE_MATCH' in d)
540
541 def testOverrideEtag(self):
542 # Test that we can forcibly ignore ETags
543 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
544 (response, content) = self.http.request(uri, "GET")
545 self.assertNotEqual(response['etag'], "")
546
547 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
548 d = self.reflector(content)
549 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
550 self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
551
552 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
553 d = self.reflector(content)
554 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
555 self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
556
557#MAP-commented this out because it consistently fails
558# def testGet304EndToEnd(self):
559# # Test that end to end headers get overwritten in the cache
560# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
561# (response, content) = self.http.request(uri, "GET")
562# self.assertNotEqual(response['etag'], "")
563# old_date = response['date']
564# time.sleep(2)
565#
566# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
567# # The response should be from the cache, but the Date: header should be updated.
568# new_date = response['date']
569# self.assertNotEqual(new_date, old_date)
570# self.assertEqual(response.status, 200)
571# self.assertEqual(response.fromcache, True)
572
573 def testGet304LastModified(self):
574 # Test that we can still handle a 304
575 # by only using the last-modified cache validator.
576 uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
577 (response, content) = self.http.request(uri, "GET")
578
579 self.assertNotEqual(response['last-modified'], "")
580 (response, content) = self.http.request(uri, "GET")
581 (response, content) = self.http.request(uri, "GET")
582 self.assertEqual(response.status, 200)
583 self.assertEqual(response.fromcache, True)
584
585 def testGet307(self):
586 # Test that we do follow 307 redirects but
587 # do not cache the 307
588 uri = urllib.parse.urljoin(base, "307/onestep.asis")
589 (response, content) = self.http.request(uri, "GET")
590 self.assertEqual(response.status, 200)
591 self.assertEqual(content, b"This is the final destination.\n")
592 self.assertEqual(response.previous.status, 307)
593 self.assertEqual(response.previous.fromcache, False)
594
595 (response, content) = self.http.request(uri, "GET")
596 self.assertEqual(response.status, 200)
597 self.assertEqual(response.fromcache, True)
598 self.assertEqual(content, b"This is the final destination.\n")
599 self.assertEqual(response.previous.status, 307)
600 self.assertEqual(response.previous.fromcache, False)
601
602 def testGet410(self):
603 # Test that we pass 410's through
604 uri = urllib.parse.urljoin(base, "410/410.asis")
605 (response, content) = self.http.request(uri, "GET")
606 self.assertEqual(response.status, 410)
607
chris.dent@gmail.comae846ca2009-12-24 14:02:57 -0600608 def testVaryHeaderSimple(self):
609 """
610 RFC 2616 13.6
611 When the cache receives a subsequent request whose Request-URI
612 specifies one or more cache entries including a Vary header field,
613 the cache MUST NOT use such a cache entry to construct a response
614 to the new request unless all of the selecting request-headers
615 present in the new request match the corresponding stored
616 request-headers in the original request.
617 """
618 # test that the vary header is sent
619 uri = urllib.parse.urljoin(base, "vary/accept.asis")
620 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
621 self.assertEqual(response.status, 200)
622 self.assertTrue('vary' in response)
623
624 # get the resource again, from the cache since accept header in this
625 # request is the same as the request
626 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
627 self.assertEqual(response.status, 200)
628 self.assertEqual(response.fromcache, True, msg="Should be from cache")
629
630 # get the resource again, not from cache since Accept headers does not match
631 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
632 self.assertEqual(response.status, 200)
633 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
634
635 # get the resource again, without any Accept header, so again no match
636 (response, content) = self.http.request(uri, "GET")
637 self.assertEqual(response.status, 200)
638 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
639
640 def testNoVary(self):
641 # when there is no vary, a different Accept header (e.g.) should not
642 # impact if the cache is used
643 # test that the vary header is not sent
644 uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
645 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
646 self.assertEqual(response.status, 200)
647 self.assertFalse('vary' in response)
648
649 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
650 self.assertEqual(response.status, 200)
651 self.assertEqual(response.fromcache, True, msg="Should be from cache")
652
653 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
654 self.assertEqual(response.status, 200)
655 self.assertEqual(response.fromcache, True, msg="Should be from cache")
656
657 def testVaryHeaderDouble(self):
658 uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
659 (response, content) = self.http.request(uri, "GET", headers={
660 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
661 self.assertEqual(response.status, 200)
662 self.assertTrue('vary' in response)
663
664 # we are from cache
665 (response, content) = self.http.request(uri, "GET", headers={
666 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
667 self.assertEqual(response.fromcache, True, msg="Should be from cache")
668
669 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
670 self.assertEqual(response.status, 200)
671 self.assertEqual(response.fromcache, False)
672
673 # get the resource again, not from cache, varied headers don't match exact
674 (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
675 self.assertEqual(response.status, 200)
676 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
677
jcgregorio@localhost9e603da2010-05-13 23:42:11 -0400678 def testVaryUnusedHeader(self):
679 # A header's value is not considered to vary if it's not used at all.
680 uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
681 (response, content) = self.http.request(uri, "GET", headers={
682 'Accept': 'text/plain'})
683 self.assertEqual(response.status, 200)
684 self.assertTrue('vary' in response)
685
686 # we are from cache
687 (response, content) = self.http.request(uri, "GET", headers={
688 'Accept': 'text/plain',})
689 self.assertEqual(response.fromcache, True, msg="Should be from cache")
690
pilgrim00a352e2009-05-29 04:04:44 +0000691 def testHeadGZip(self):
692 # Test that we don't try to decompress a HEAD response
693 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
694 (response, content) = self.http.request(uri, "HEAD")
695 self.assertEqual(response.status, 200)
696 self.assertNotEqual(int(response['content-length']), 0)
697 self.assertEqual(content, b"")
698
699 def testGetGZip(self):
700 # Test that we support gzip compression
701 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
702 (response, content) = self.http.request(uri, "GET")
703 self.assertEqual(response.status, 200)
704 self.assertFalse('content-encoding' in response)
705 self.assertTrue('-content-encoding' in response)
706 self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
707 self.assertEqual(content, b"This is the final destination.\n")
708
Joe Gregoriof952e7f2011-02-13 19:27:35 -0500709 def testPostAndGZipResponse(self):
710 uri = urllib.parse.urljoin(base, "gzip/post.cgi")
711 (response, content) = self.http.request(uri, "POST", body=" ")
712 self.assertEqual(response.status, 200)
713 self.assertFalse('content-encoding' in response)
714 self.assertTrue('-content-encoding' in response)
715
pilgrim00a352e2009-05-29 04:04:44 +0000716 def testGetGZipFailure(self):
717 # Test that we raise a good exception when the gzip fails
718 self.http.force_exception_to_status_code = False
719 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
720 try:
721 (response, content) = self.http.request(uri, "GET")
722 self.fail("Should never reach here")
723 except httplib2.FailedToDecompressContent:
724 pass
725 except Exception:
726 self.fail("Threw wrong kind of exception")
727
728 # Re-run the test with out the exceptions
729 self.http.force_exception_to_status_code = True
730
731 (response, content) = self.http.request(uri, "GET")
732 self.assertEqual(response.status, 500)
733 self.assertTrue(response.reason.startswith("Content purported"))
734
735 def testTimeout(self):
736 self.http.force_exception_to_status_code = True
737 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
738 try:
739 import socket
740 socket.setdefaulttimeout(1)
741 except:
742 # Don't run the test if we can't set the timeout
743 return
744 (response, content) = self.http.request(uri)
745 self.assertEqual(response.status, 408)
746 self.assertTrue(response.reason.startswith("Request Timeout"))
747 self.assertTrue(content.startswith(b"Request Timeout"))
748
749 def testIndividualTimeout(self):
750 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
751 http = httplib2.Http(timeout=1)
752 http.force_exception_to_status_code = True
753
754 (response, content) = http.request(uri)
755 self.assertEqual(response.status, 408)
756 self.assertTrue(response.reason.startswith("Request Timeout"))
757 self.assertTrue(content.startswith(b"Request Timeout"))
758
759
760 def testGetDeflate(self):
761 # Test that we support deflate compression
762 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
763 (response, content) = self.http.request(uri, "GET")
764 self.assertEqual(response.status, 200)
765 self.assertFalse('content-encoding' in response)
766 self.assertEqual(int(response['content-length']), len("This is the final destination."))
767 self.assertEqual(content, b"This is the final destination.")
768
769 def testGetDeflateFailure(self):
770 # Test that we raise a good exception when the deflate fails
771 self.http.force_exception_to_status_code = False
772
773 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
774 try:
775 (response, content) = self.http.request(uri, "GET")
776 self.fail("Should never reach here")
777 except httplib2.FailedToDecompressContent:
778 pass
779 except Exception:
780 self.fail("Threw wrong kind of exception")
781
782 # Re-run the test with out the exceptions
783 self.http.force_exception_to_status_code = True
784
785 (response, content) = self.http.request(uri, "GET")
786 self.assertEqual(response.status, 500)
787 self.assertTrue(response.reason.startswith("Content purported"))
788
789 def testGetDuplicateHeaders(self):
790 # Test that duplicate headers get concatenated via ','
791 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
792 (response, content) = self.http.request(uri, "GET")
793 self.assertEqual(response.status, 200)
794 self.assertEqual(content, b"This is content\n")
795 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
796
797 def testGetCacheControlNoCache(self):
798 # Test Cache-Control: no-cache on requests
799 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
800 (response, content) = self.http.request(uri, "GET")
801 self.assertNotEqual(response['etag'], "")
802 (response, content) = self.http.request(uri, "GET")
803 self.assertEqual(response.status, 200)
804 self.assertEqual(response.fromcache, True)
805
806 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
807 self.assertEqual(response.status, 200)
808 self.assertEqual(response.fromcache, False)
809
810 def testGetCacheControlPragmaNoCache(self):
811 # Test Pragma: no-cache on requests
812 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
813 (response, content) = self.http.request(uri, "GET")
814 self.assertNotEqual(response['etag'], "")
815 (response, content) = self.http.request(uri, "GET")
816 self.assertEqual(response.status, 200)
817 self.assertEqual(response.fromcache, True)
818
819 (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
820 self.assertEqual(response.status, 200)
821 self.assertEqual(response.fromcache, False)
822
823 def testGetCacheControlNoStoreRequest(self):
824 # A no-store request means that the response should not be stored.
825 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
826
827 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
828 self.assertEqual(response.status, 200)
829 self.assertEqual(response.fromcache, False)
830
831 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
832 self.assertEqual(response.status, 200)
833 self.assertEqual(response.fromcache, False)
834
835 def testGetCacheControlNoStoreResponse(self):
836 # A no-store response means that the response should not be stored.
837 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
838
839 (response, content) = self.http.request(uri, "GET")
840 self.assertEqual(response.status, 200)
841 self.assertEqual(response.fromcache, False)
842
843 (response, content) = self.http.request(uri, "GET")
844 self.assertEqual(response.status, 200)
845 self.assertEqual(response.fromcache, False)
846
847 def testGetCacheControlNoCacheNoStoreRequest(self):
848 # Test that a no-store, no-cache clears the entry from the cache
849 # even if it was cached previously.
850 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
851
852 (response, content) = self.http.request(uri, "GET")
853 (response, content) = self.http.request(uri, "GET")
854 self.assertEqual(response.fromcache, True)
855 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
856 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
857 self.assertEqual(response.status, 200)
858 self.assertEqual(response.fromcache, False)
859
860 def testUpdateInvalidatesCache(self):
861 # Test that calling PUT or DELETE on a
862 # URI that is cache invalidates that cache.
863 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
864
865 (response, content) = self.http.request(uri, "GET")
866 (response, content) = self.http.request(uri, "GET")
867 self.assertEqual(response.fromcache, True)
868 (response, content) = self.http.request(uri, "DELETE")
869 self.assertEqual(response.status, 405)
870
871 (response, content) = self.http.request(uri, "GET")
872 self.assertEqual(response.fromcache, False)
873
874 def testUpdateUsesCachedETag(self):
875 # Test that we natively support http://www.w3.org/1999/04/Editing/
876 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
877
878 (response, content) = self.http.request(uri, "GET")
879 self.assertEqual(response.status, 200)
880 self.assertEqual(response.fromcache, False)
881 (response, content) = self.http.request(uri, "GET")
882 self.assertEqual(response.status, 200)
883 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400884 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000885 self.assertEqual(response.status, 200)
Joe Gregorio799b2072009-09-29 17:21:19 -0400886 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000887 self.assertEqual(response.status, 412)
888
889 def testUpdateUsesCachedETagAndOCMethod(self):
890 # Test that we natively support http://www.w3.org/1999/04/Editing/
891 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
892
893 (response, content) = self.http.request(uri, "GET")
894 self.assertEqual(response.status, 200)
895 self.assertEqual(response.fromcache, False)
896 (response, content) = self.http.request(uri, "GET")
897 self.assertEqual(response.status, 200)
898 self.assertEqual(response.fromcache, True)
899 self.http.optimistic_concurrency_methods.append("DELETE")
900 (response, content) = self.http.request(uri, "DELETE")
901 self.assertEqual(response.status, 200)
902
903
904 def testUpdateUsesCachedETagOverridden(self):
905 # Test that we natively support http://www.w3.org/1999/04/Editing/
906 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
907
908 (response, content) = self.http.request(uri, "GET")
909 self.assertEqual(response.status, 200)
910 self.assertEqual(response.fromcache, False)
911 (response, content) = self.http.request(uri, "GET")
912 self.assertEqual(response.status, 200)
913 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400914 (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
pilgrim00a352e2009-05-29 04:04:44 +0000915 self.assertEqual(response.status, 412)
916
917 def testBasicAuth(self):
918 # Test Basic Authentication
919 uri = urllib.parse.urljoin(base, "basic/file.txt")
920 (response, content) = self.http.request(uri, "GET")
921 self.assertEqual(response.status, 401)
922
923 uri = urllib.parse.urljoin(base, "basic/")
924 (response, content) = self.http.request(uri, "GET")
925 self.assertEqual(response.status, 401)
926
927 self.http.add_credentials('joe', 'password')
928 (response, content) = self.http.request(uri, "GET")
929 self.assertEqual(response.status, 200)
930
931 uri = urllib.parse.urljoin(base, "basic/file.txt")
932 (response, content) = self.http.request(uri, "GET")
933 self.assertEqual(response.status, 200)
934
935 def testBasicAuthWithDomain(self):
936 # Test Basic Authentication
937 uri = urllib.parse.urljoin(base, "basic/file.txt")
938 (response, content) = self.http.request(uri, "GET")
939 self.assertEqual(response.status, 401)
940
941 uri = urllib.parse.urljoin(base, "basic/")
942 (response, content) = self.http.request(uri, "GET")
943 self.assertEqual(response.status, 401)
944
945 self.http.add_credentials('joe', 'password', "example.org")
946 (response, content) = self.http.request(uri, "GET")
947 self.assertEqual(response.status, 401)
948
949 uri = urllib.parse.urljoin(base, "basic/file.txt")
950 (response, content) = self.http.request(uri, "GET")
951 self.assertEqual(response.status, 401)
952
953 domain = urllib.parse.urlparse(base)[1]
954 self.http.add_credentials('joe', 'password', domain)
955 (response, content) = self.http.request(uri, "GET")
956 self.assertEqual(response.status, 200)
957
958 uri = urllib.parse.urljoin(base, "basic/file.txt")
959 (response, content) = self.http.request(uri, "GET")
960 self.assertEqual(response.status, 200)
961
962
963
964
965
966
967 def testBasicAuthTwoDifferentCredentials(self):
968 # Test Basic Authentication with multiple sets of credentials
969 uri = urllib.parse.urljoin(base, "basic2/file.txt")
970 (response, content) = self.http.request(uri, "GET")
971 self.assertEqual(response.status, 401)
972
973 uri = urllib.parse.urljoin(base, "basic2/")
974 (response, content) = self.http.request(uri, "GET")
975 self.assertEqual(response.status, 401)
976
977 self.http.add_credentials('fred', 'barney')
978 (response, content) = self.http.request(uri, "GET")
979 self.assertEqual(response.status, 200)
980
981 uri = urllib.parse.urljoin(base, "basic2/file.txt")
982 (response, content) = self.http.request(uri, "GET")
983 self.assertEqual(response.status, 200)
984
985 def testBasicAuthNested(self):
986 # Test Basic Authentication with resources
987 # that are nested
988 uri = urllib.parse.urljoin(base, "basic-nested/")
989 (response, content) = self.http.request(uri, "GET")
990 self.assertEqual(response.status, 401)
991
992 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
993 (response, content) = self.http.request(uri, "GET")
994 self.assertEqual(response.status, 401)
995
996 # Now add in credentials one at a time and test.
997 self.http.add_credentials('joe', 'password')
998
999 uri = urllib.parse.urljoin(base, "basic-nested/")
1000 (response, content) = self.http.request(uri, "GET")
1001 self.assertEqual(response.status, 200)
1002
1003 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1004 (response, content) = self.http.request(uri, "GET")
1005 self.assertEqual(response.status, 401)
1006
1007 self.http.add_credentials('fred', 'barney')
1008
1009 uri = urllib.parse.urljoin(base, "basic-nested/")
1010 (response, content) = self.http.request(uri, "GET")
1011 self.assertEqual(response.status, 200)
1012
1013 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1014 (response, content) = self.http.request(uri, "GET")
1015 self.assertEqual(response.status, 200)
1016
1017 def testDigestAuth(self):
1018 # Test that we support Digest Authentication
1019 uri = urllib.parse.urljoin(base, "digest/")
1020 (response, content) = self.http.request(uri, "GET")
1021 self.assertEqual(response.status, 401)
1022
1023 self.http.add_credentials('joe', 'password')
1024 (response, content) = self.http.request(uri, "GET")
1025 self.assertEqual(response.status, 200)
1026
1027 uri = urllib.parse.urljoin(base, "digest/file.txt")
1028 (response, content) = self.http.request(uri, "GET")
1029
1030 def testDigestAuthNextNonceAndNC(self):
1031 # Test that if the server sets nextnonce that we reset
1032 # the nonce count back to 1
1033 uri = urllib.parse.urljoin(base, "digest/file.txt")
1034 self.http.add_credentials('joe', 'password')
1035 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1036 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1037 self.assertEqual(response.status, 200)
1038 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1039 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
1040 self.assertEqual(response.status, 200)
1041
1042 if 'nextnonce' in info:
1043 self.assertEqual(info2['nc'], 1)
1044
1045 def testDigestAuthStale(self):
1046 # Test that we can handle a nonce becoming stale
1047 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
1048 self.http.add_credentials('joe', 'password')
1049 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1050 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1051 self.assertEqual(response.status, 200)
1052
1053 time.sleep(3)
1054 # Sleep long enough that the nonce becomes stale
1055
1056 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1057 self.assertFalse(response.fromcache)
1058 self.assertTrue(response._stale_digest)
1059 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
1060 self.assertEqual(response.status, 200)
1061
1062 def reflector(self, content):
1063 return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
1064
1065 def testReflector(self):
1066 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1067 (response, content) = self.http.request(uri, "GET")
1068 d = self.reflector(content)
1069 self.assertTrue('HTTP_USER_AGENT' in d)
1070
Joe Gregorio84cc10a2009-09-01 13:02:49 -04001071
1072 def testConnectionClose(self):
1073 uri = "http://www.google.com/"
1074 (response, content) = self.http.request(uri, "GET")
1075 for c in self.http.connections.values():
1076 self.assertNotEqual(None, c.sock)
1077 (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
1078 for c in self.http.connections.values():
1079 self.assertEqual(None, c.sock)
1080
pilgrim00a352e2009-05-29 04:04:44 +00001081try:
1082 import memcache
1083 class HttpTestMemCached(HttpTest):
1084 def setUp(self):
1085 self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
1086 #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1087 self.http = httplib2.Http(self.cache)
1088 self.cache.flush_all()
1089 # Not exactly sure why the sleep is needed here, but
1090 # if not present then some unit tests that rely on caching
1091 # fail. Memcached seems to lose some sets immediately
1092 # after a flush_all if the set is to a value that
1093 # was previously cached. (Maybe the flush is handled async?)
1094 time.sleep(1)
1095 self.http.clear_credentials()
1096except:
1097 pass
1098
1099
1100
1101# ------------------------------------------------------------------------
1102
1103class HttpPrivateTest(unittest.TestCase):
1104
1105 def testParseCacheControl(self):
1106 # Test that we can parse the Cache-Control header
1107 self.assertEqual({}, httplib2._parse_cache_control({}))
1108 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
1109 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
1110 self.assertEqual(cc['no-cache'], 1)
1111 self.assertEqual(cc['max-age'], '7200')
1112 cc = httplib2._parse_cache_control({'cache-control': ' , '})
1113 self.assertEqual(cc[''], 1)
1114
Joe Gregorioe314e8b2009-07-16 20:11:28 -04001115 try:
1116 cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
1117 self.assertTrue("max-age" in cc)
1118 except:
1119 self.fail("Should not throw exception")
1120
1121
1122
1123
pilgrim00a352e2009-05-29 04:04:44 +00001124 def testNormalizeHeaders(self):
1125 # Test that we normalize headers to lowercase
1126 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
1127 self.assertTrue('cache-control' in h)
1128 self.assertTrue('other' in h)
1129 self.assertEqual('Stuff', h['other'])
1130
1131 def testExpirationModelTransparent(self):
1132 # Test that no-cache makes our request TRANSPARENT
1133 response_headers = {
1134 'cache-control': 'max-age=7200'
1135 }
1136 request_headers = {
1137 'cache-control': 'no-cache'
1138 }
1139 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
1140
1141 def testMaxAgeNonNumeric(self):
1142 # Test that no-cache makes our request TRANSPARENT
1143 response_headers = {
1144 'cache-control': 'max-age=fred, min-fresh=barney'
1145 }
1146 request_headers = {
1147 }
1148 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1149
1150
1151 def testExpirationModelNoCacheResponse(self):
1152 # The date and expires point to an entry that should be
1153 # FRESH, but the no-cache over-rides that.
1154 now = time.time()
1155 response_headers = {
1156 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1157 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1158 'cache-control': 'no-cache'
1159 }
1160 request_headers = {
1161 }
1162 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1163
1164 def testExpirationModelStaleRequestMustReval(self):
1165 # must-revalidate forces STALE
1166 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
1167
1168 def testExpirationModelStaleResponseMustReval(self):
1169 # must-revalidate forces STALE
1170 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
1171
1172 def testExpirationModelFresh(self):
1173 response_headers = {
1174 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1175 'cache-control': 'max-age=2'
1176 }
1177 request_headers = {
1178 }
1179 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1180 time.sleep(3)
1181 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1182
1183 def testExpirationMaxAge0(self):
1184 response_headers = {
1185 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1186 'cache-control': 'max-age=0'
1187 }
1188 request_headers = {
1189 }
1190 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1191
1192 def testExpirationModelDateAndExpires(self):
1193 now = time.time()
1194 response_headers = {
1195 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1196 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1197 }
1198 request_headers = {
1199 }
1200 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1201 time.sleep(3)
1202 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1203
1204 def testExpiresZero(self):
1205 now = time.time()
1206 response_headers = {
1207 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1208 'expires': "0",
1209 }
1210 request_headers = {
1211 }
1212 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1213
1214 def testExpirationModelDateOnly(self):
1215 now = time.time()
1216 response_headers = {
1217 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
1218 }
1219 request_headers = {
1220 }
1221 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1222
1223 def testExpirationModelOnlyIfCached(self):
1224 response_headers = {
1225 }
1226 request_headers = {
1227 'cache-control': 'only-if-cached',
1228 }
1229 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1230
1231 def testExpirationModelMaxAgeBoth(self):
1232 now = time.time()
1233 response_headers = {
1234 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1235 'cache-control': 'max-age=2'
1236 }
1237 request_headers = {
1238 'cache-control': 'max-age=0'
1239 }
1240 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1241
1242 def testExpirationModelDateAndExpiresMinFresh1(self):
1243 now = time.time()
1244 response_headers = {
1245 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1246 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1247 }
1248 request_headers = {
1249 'cache-control': 'min-fresh=2'
1250 }
1251 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1252
1253 def testExpirationModelDateAndExpiresMinFresh2(self):
1254 now = time.time()
1255 response_headers = {
1256 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1257 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1258 }
1259 request_headers = {
1260 'cache-control': 'min-fresh=2'
1261 }
1262 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1263
1264 def testParseWWWAuthenticateEmpty(self):
1265 res = httplib2._parse_www_authenticate({})
1266 self.assertEqual(len(list(res.keys())), 0)
1267
1268 def testParseWWWAuthenticate(self):
1269 # different uses of spaces around commas
1270 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
1271 self.assertEqual(len(list(res.keys())), 1)
1272 self.assertEqual(len(list(res['test'].keys())), 5)
1273
1274 # tokens with non-alphanum
1275 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
1276 self.assertEqual(len(list(res.keys())), 1)
1277 self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
1278
1279 # quoted string with quoted pairs
1280 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
1281 self.assertEqual(len(list(res.keys())), 1)
1282 self.assertEqual(res['test']['realm'], 'a "test" realm')
1283
1284 def testParseWWWAuthenticateStrict(self):
1285 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
1286 self.testParseWWWAuthenticate();
1287 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
1288
1289 def testParseWWWAuthenticateBasic(self):
1290 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
1291 basic = res['basic']
1292 self.assertEqual('me', basic['realm'])
1293
1294 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
1295 basic = res['basic']
1296 self.assertEqual('me', basic['realm'])
1297 self.assertEqual('MD5', basic['algorithm'])
1298
1299 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
1300 basic = res['basic']
1301 self.assertEqual('me', basic['realm'])
1302 self.assertEqual('MD5', basic['algorithm'])
1303
1304 def testParseWWWAuthenticateBasic2(self):
1305 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
1306 basic = res['basic']
1307 self.assertEqual('me', basic['realm'])
1308 self.assertEqual('fred', basic['other'])
1309
1310 def testParseWWWAuthenticateBasic3(self):
1311 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
1312 basic = res['basic']
1313 self.assertEqual('me', basic['realm'])
1314
1315
1316 def testParseWWWAuthenticateDigest(self):
1317 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1318 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
1319 digest = res['digest']
1320 self.assertEqual('testrealm@host.com', digest['realm'])
1321 self.assertEqual('auth,auth-int', digest['qop'])
1322
1323
1324 def testParseWWWAuthenticateMultiple(self):
1325 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1326 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
1327 digest = res['digest']
1328 self.assertEqual('testrealm@host.com', digest['realm'])
1329 self.assertEqual('auth,auth-int', digest['qop'])
1330 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1331 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1332 basic = res['basic']
1333 self.assertEqual('me', basic['realm'])
1334
1335 def testParseWWWAuthenticateMultiple2(self):
1336 # Handle an added comma between challenges, which might get thrown in if the challenges were
1337 # originally sent in separate www-authenticate headers.
1338 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1339 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
1340 digest = res['digest']
1341 self.assertEqual('testrealm@host.com', digest['realm'])
1342 self.assertEqual('auth,auth-int', digest['qop'])
1343 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1344 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1345 basic = res['basic']
1346 self.assertEqual('me', basic['realm'])
1347
1348 def testParseWWWAuthenticateMultiple3(self):
1349 # Handle an added comma between challenges, which might get thrown in if the challenges were
1350 # originally sent in separate www-authenticate headers.
1351 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1352 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1353 digest = res['digest']
1354 self.assertEqual('testrealm@host.com', digest['realm'])
1355 self.assertEqual('auth,auth-int', digest['qop'])
1356 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1357 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1358 basic = res['basic']
1359 self.assertEqual('me', basic['realm'])
1360 wsse = res['wsse']
1361 self.assertEqual('foo', wsse['realm'])
1362 self.assertEqual('UsernameToken', wsse['profile'])
1363
1364 def testParseWWWAuthenticateMultiple4(self):
1365 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1366 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1367 digest = res['digest']
1368 self.assertEqual('test-real.m@host.com', digest['realm'])
1369 self.assertEqual('\tauth,auth-int', digest['qop'])
1370 self.assertEqual('(*)&^&$%#', digest['nonce'])
1371
1372 def testParseWWWAuthenticateMoreQuoteCombos(self):
1373 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
1374 digest = res['digest']
1375 self.assertEqual('myrealm', digest['realm'])
1376
1377 def testDigestObject(self):
1378 credentials = ('joe', 'password')
1379 host = None
1380 request_uri = '/projects/httplib2/test/digest/'
1381 headers = {}
1382 response = {
1383 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
1384 }
1385 content = b""
1386
1387 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1388 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1389 our_request = "Authorization: %s" % headers['Authorization']
1390 working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
1391 self.assertEqual(our_request, working_request)
1392
1393
1394 def testDigestObjectStale(self):
1395 credentials = ('joe', 'password')
1396 host = None
1397 request_uri = '/projects/httplib2/test/digest/'
1398 headers = {}
1399 response = httplib2.Response({ })
1400 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1401 response.status = 401
1402 content = b""
1403 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1404 # Returns true to force a retry
1405 self.assertTrue( d.response(response, content) )
1406
1407 def testDigestObjectAuthInfo(self):
1408 credentials = ('joe', 'password')
1409 host = None
1410 request_uri = '/projects/httplib2/test/digest/'
1411 headers = {}
1412 response = httplib2.Response({ })
1413 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1414 response['authentication-info'] = 'nextnonce="fred"'
1415 content = b""
1416 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1417 # Returns true to force a retry
1418 self.assertFalse( d.response(response, content) )
1419 self.assertEqual('fred', d.challenge['nonce'])
1420 self.assertEqual(1, d.challenge['nc'])
1421
1422 def testWsseAlgorithm(self):
1423 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
1424 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1425 self.assertEqual(expected, digest)
1426
1427 def testEnd2End(self):
1428 # one end to end header
1429 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
1430 end2end = httplib2._get_end2end_headers(response)
1431 self.assertTrue('content-type' in end2end)
1432 self.assertTrue('te' not in end2end)
1433 self.assertTrue('connection' not in end2end)
1434
1435 # one end to end header that gets eliminated
1436 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
1437 end2end = httplib2._get_end2end_headers(response)
1438 self.assertTrue('content-type' not in end2end)
1439 self.assertTrue('te' not in end2end)
1440 self.assertTrue('connection' not in end2end)
1441
1442 # Degenerate case of no headers
1443 response = {}
1444 end2end = httplib2._get_end2end_headers(response)
1445 self.assertEquals(0, len(end2end))
1446
1447 # Degenerate case of connection referrring to a header not passed in
1448 response = {'connection': 'content-type'}
1449 end2end = httplib2._get_end2end_headers(response)
1450 self.assertEquals(0, len(end2end))
1451
1452unittest.main()