blob: 264677cdf1477b38038bd82e4b719b89d0b15f8b [file] [log] [blame]
pilgrim00a352e2009-05-29 04:04:44 +00001#!/usr/bin/env python3
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 3.0 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = ["Mark Pilgrim"]
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.2 ($Rev: 118 $)"
16
17
18import sys
19import unittest
20import http.client
21import httplib2
22import os
23import urllib.parse
24import time
25import base64
26import io
27
28# The test resources base uri
29base = 'http://bitworking.org/projects/httplib2/test/'
30#base = 'http://localhost/projects/httplib2/test/'
31cacheDirName = ".cache"
32
33
34class CredentialsTest(unittest.TestCase):
35 def test(self):
36 c = httplib2.Credentials()
37 c.add("joe", "password")
38 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
39 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
40 c.add("fred", "password2", "wellformedweb.org")
41 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
42 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
43 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
44 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
45 c.clear()
46 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
47 c.add("fred", "password2", "wellformedweb.org")
48 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
49 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
50 self.assertEqual(0, len(list(c.iter(""))))
51
52
53class ParserTest(unittest.TestCase):
54 def testFromStd66(self):
55 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
56 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
57 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
58 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
59 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
60 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
61 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
62 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
63
64
65class UrlNormTest(unittest.TestCase):
66 def test(self):
67 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
68 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
69 self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
70 self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
71 self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
72 self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
73 try:
74 httplib2.urlnorm("/")
75 self.fail("Non-absolute URIs should raise an exception")
76 except httplib2.RelativeURIError:
77 pass
78
79class UrlSafenameTest(unittest.TestCase):
80 def test(self):
81 # Test that different URIs end up generating different safe names
82 self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
83 self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
84 self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
85 self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
86 self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
87 self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
88 # Test the max length limits
89 uri = "http://" + ("w" * 200) + ".org"
90 uri2 = "http://" + ("w" * 201) + ".org"
91 self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
92 # Max length should be 200 + 1 (",") + 32
93 self.assertEqual(233, len(httplib2.safename(uri2)))
94 self.assertEqual(233, len(httplib2.safename(uri)))
95 # Unicode
96 if sys.version_info >= (2,3):
97 self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
98
99class _MyResponse(io.BytesIO):
100 def __init__(self, body, **kwargs):
101 io.BytesIO.__init__(self, body)
102 self.headers = kwargs
103
104 def items(self):
105 return self.headers.items()
106
107 def iteritems(self):
108 return iter(self.headers.items())
109
110
111class _MyHTTPConnection(object):
112 "This class is just a mock of httplib.HTTPConnection used for testing"
113
114 def __init__(self, host, port=None, key_file=None, cert_file=None,
115 strict=None, timeout=None, proxy_info=None):
116 self.host = host
117 self.port = port
118 self.timeout = timeout
119 self.log = ""
120
121 def set_debuglevel(self, level):
122 pass
123
124 def connect(self):
125 "Connect to a host on a given port."
126 pass
127
128 def close(self):
129 pass
130
131 def request(self, method, request_uri, body, headers):
132 pass
133
134 def getresponse(self):
135 return _MyResponse(b"the body", status="200")
136
137
138class HttpTest(unittest.TestCase):
139 def setUp(self):
140 if os.path.exists(cacheDirName):
141 [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
142 self.http = httplib2.Http(cacheDirName)
143 self.http.clear_credentials()
144
145 def testConnectionType(self):
146 self.http.force_exception_to_status_code = False
147 response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
148 self.assertEqual(response['content-location'], "http://bitworking.org")
149 self.assertEqual(content, b"the body")
150
151 def testGetUnknownServer(self):
152 self.http.force_exception_to_status_code = False
153 try:
154 self.http.request("http://fred.bitworking.org/")
155 self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
156 except httplib2.ServerNotFoundError:
157 pass
158
159 # Now test with exceptions turned off
160 self.http.force_exception_to_status_code = True
161
162 (response, content) = self.http.request("http://fred.bitworking.org/")
163 self.assertEqual(response['content-type'], 'text/plain')
164 self.assertTrue(content.startswith(b"Unable to find"))
165 self.assertEqual(response.status, 400)
166
167 def testGetIRI(self):
168 if sys.version_info >= (2,3):
169 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
170 (response, content) = self.http.request(uri, "GET")
171 d = self.reflector(content)
172 self.assertTrue('QUERY_STRING' in d)
173 self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
174
175 def testGetIsDefaultMethod(self):
176 # Test that GET is the default method
177 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
178 (response, content) = self.http.request(uri)
179 self.assertEqual(response['x-method'], "GET")
180
181 def testDifferentMethods(self):
182 # Test that all methods can be used
183 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
184 for method in ["GET", "PUT", "DELETE", "POST"]:
185 (response, content) = self.http.request(uri, method, body=b" ")
186 self.assertEqual(response['x-method'], method)
187
Joe Gregoriob628c0b2009-07-16 12:28:04 -0400188 def testHeadRead(self):
189 # Test that we don't try to read the response of a HEAD request
190 # since httplib blocks response.read() for HEAD requests.
191 # Oddly enough this doesn't appear as a problem when doing HEAD requests
192 # against Apache servers.
193 uri = "http://www.google.com/"
194 (response, content) = self.http.request(uri, "HEAD")
195 self.assertEqual(response.status, 200)
196 self.assertEqual(content, b"")
197
pilgrim00a352e2009-05-29 04:04:44 +0000198 def testGetNoCache(self):
199 # Test that can do a GET w/o the cache turned on.
200 http = httplib2.Http()
201 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
202 (response, content) = http.request(uri, "GET")
203 self.assertEqual(response.status, 200)
204 self.assertEqual(response.previous, None)
205
Joe Gregorioe202d212009-07-16 14:57:52 -0400206 def testGetOnlyIfCachedCacheHit(self):
207 # Test that can do a GET with cache and 'only-if-cached'
208 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
209 (response, content) = self.http.request(uri, "GET")
210 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
211 self.assertEqual(response.fromcache, True)
212 self.assertEqual(response.status, 200)
213
pilgrim00a352e2009-05-29 04:04:44 +0000214 def testGetOnlyIfCachedCacheMiss(self):
215 # Test that can do a GET with no cache with 'only-if-cached'
pilgrim00a352e2009-05-29 04:04:44 +0000216 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorioe202d212009-07-16 14:57:52 -0400217 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
pilgrim00a352e2009-05-29 04:04:44 +0000218 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400219 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000220
221 def testGetOnlyIfCachedNoCacheAtAll(self):
222 # Test that can do a GET with no cache with 'only-if-cached'
223 # Of course, there might be an intermediary beyond us
224 # that responds to the 'only-if-cached', so this
225 # test can't really be guaranteed to pass.
226 http = httplib2.Http()
227 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
228 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
229 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400230 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000231
232 def testUserAgent(self):
233 # Test that we provide a default user-agent
234 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
235 (response, content) = self.http.request(uri, "GET")
236 self.assertEqual(response.status, 200)
237 self.assertTrue(content.startswith(b"Python-httplib2/"))
238
239 def testUserAgentNonDefault(self):
240 # Test that the default user-agent can be over-ridden
241
242 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
243 (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
244 self.assertEqual(response.status, 200)
245 self.assertTrue(content.startswith(b"fred/1.0"))
246
247 def testGet300WithLocation(self):
248 # Test the we automatically follow 300 redirects if a Location: header is provided
249 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
250 (response, content) = self.http.request(uri, "GET")
251 self.assertEqual(response.status, 200)
252 self.assertEqual(content, b"This is the final destination.\n")
253 self.assertEqual(response.previous.status, 300)
254 self.assertEqual(response.previous.fromcache, False)
255
256 # Confirm that the intermediate 300 is not cached
257 (response, content) = self.http.request(uri, "GET")
258 self.assertEqual(response.status, 200)
259 self.assertEqual(content, b"This is the final destination.\n")
260 self.assertEqual(response.previous.status, 300)
261 self.assertEqual(response.previous.fromcache, False)
262
263 def testGet300WithLocationNoRedirect(self):
264 # Test the we automatically follow 300 redirects if a Location: header is provided
265 self.http.follow_redirects = False
266 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
267 (response, content) = self.http.request(uri, "GET")
268 self.assertEqual(response.status, 300)
269
270 def testGet300WithoutLocation(self):
271 # Not giving a Location: header in a 300 response is acceptable
272 # In which case we just return the 300 response
273 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
274 (response, content) = self.http.request(uri, "GET")
275 self.assertEqual(response.status, 300)
276 self.assertTrue(response['content-type'].startswith("text/html"))
277 self.assertEqual(response.previous, None)
278
279 def testGet301(self):
280 # Test that we automatically follow 301 redirects
281 # and that we cache the 301 response
282 uri = urllib.parse.urljoin(base, "301/onestep.asis")
283 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
284 (response, content) = self.http.request(uri, "GET")
285 self.assertEqual(response.status, 200)
286 self.assertTrue('content-location' in response)
287 self.assertEqual(response['content-location'], destination)
288 self.assertEqual(content, b"This is the final destination.\n")
289 self.assertEqual(response.previous.status, 301)
290 self.assertEqual(response.previous.fromcache, False)
291
292 (response, content) = self.http.request(uri, "GET")
293 self.assertEqual(response.status, 200)
294 self.assertEqual(response['content-location'], destination)
295 self.assertEqual(content, b"This is the final destination.\n")
296 self.assertEqual(response.previous.status, 301)
297 self.assertEqual(response.previous.fromcache, True)
298
299
300 def testGet301NoRedirect(self):
301 # Test that we automatically follow 301 redirects
302 # and that we cache the 301 response
303 self.http.follow_redirects = False
304 uri = urllib.parse.urljoin(base, "301/onestep.asis")
305 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
306 (response, content) = self.http.request(uri, "GET")
307 self.assertEqual(response.status, 301)
308
309
310 def testGet302(self):
311 # Test that we automatically follow 302 redirects
312 # and that we DO NOT cache the 302 response
313 uri = urllib.parse.urljoin(base, "302/onestep.asis")
314 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
315 (response, content) = self.http.request(uri, "GET")
316 self.assertEqual(response.status, 200)
317 self.assertEqual(response['content-location'], destination)
318 self.assertEqual(content, b"This is the final destination.\n")
319 self.assertEqual(response.previous.status, 302)
320 self.assertEqual(response.previous.fromcache, False)
321
322 uri = urllib.parse.urljoin(base, "302/onestep.asis")
323 (response, content) = self.http.request(uri, "GET")
324 self.assertEqual(response.status, 200)
325 self.assertEqual(response.fromcache, True)
326 self.assertEqual(response['content-location'], destination)
327 self.assertEqual(content, b"This is the final destination.\n")
328 self.assertEqual(response.previous.status, 302)
329 self.assertEqual(response.previous.fromcache, False)
330 self.assertEqual(response.previous['content-location'], uri)
331
332 uri = urllib.parse.urljoin(base, "302/twostep.asis")
333
334 (response, content) = self.http.request(uri, "GET")
335 self.assertEqual(response.status, 200)
336 self.assertEqual(response.fromcache, True)
337 self.assertEqual(content, b"This is the final destination.\n")
338 self.assertEqual(response.previous.status, 302)
339 self.assertEqual(response.previous.fromcache, False)
340
341 def testGet302RedirectionLimit(self):
342 # Test that we can set a lower redirection limit
343 # and that we raise an exception when we exceed
344 # that limit.
345 self.http.force_exception_to_status_code = False
346
347 uri = urllib.parse.urljoin(base, "302/twostep.asis")
348 try:
349 (response, content) = self.http.request(uri, "GET", redirections = 1)
350 self.fail("This should not happen")
351 except httplib2.RedirectLimit:
352 pass
353 except Exception as e:
354 self.fail("Threw wrong kind of exception ")
355
356 # Re-run the test with out the exceptions
357 self.http.force_exception_to_status_code = True
358
359 (response, content) = self.http.request(uri, "GET", redirections = 1)
360 self.assertEqual(response.status, 500)
361 self.assertTrue(response.reason.startswith("Redirected more"))
362 self.assertEqual("302", response['status'])
363 self.assertTrue(content.startswith(b"<html>"))
364 self.assertTrue(response.previous != None)
365
366 def testGet302NoLocation(self):
367 # Test that we throw an exception when we get
368 # a 302 with no Location: header.
369 self.http.force_exception_to_status_code = False
370 uri = urllib.parse.urljoin(base, "302/no-location.asis")
371 try:
372 (response, content) = self.http.request(uri, "GET")
373 self.fail("Should never reach here")
374 except httplib2.RedirectMissingLocation:
375 pass
376 except Exception as e:
377 self.fail("Threw wrong kind of exception ")
378
379 # Re-run the test with out the exceptions
380 self.http.force_exception_to_status_code = True
381
382 (response, content) = self.http.request(uri, "GET")
383 self.assertEqual(response.status, 500)
384 self.assertTrue(response.reason.startswith("Redirected but"))
385 self.assertEqual("302", response['status'])
386 self.assertTrue(content.startswith(b"This is content"))
387
388 def testGet302ViaHttps(self):
389 # Google always redirects to http://google.com
390 (response, content) = self.http.request("https://google.com", "GET")
391 self.assertEqual(200, response.status)
392 self.assertEqual(302, response.previous.status)
393
394 def testGetViaHttps(self):
395 # Test that we can handle HTTPS
396 (response, content) = self.http.request("https://google.com/adsense/", "GET")
397 self.assertEqual(200, response.status)
398
399 def testGetViaHttpsSpecViolationOnLocation(self):
400 # Test that we follow redirects through HTTPS
401 # even if they violate the spec by including
402 # a relative Location: header instead of an
403 # absolute one.
404 (response, content) = self.http.request("https://google.com/adsense", "GET")
405 self.assertEqual(200, response.status)
406 self.assertNotEqual(None, response.previous)
407
408
409 def testGetViaHttpsKeyCert(self):
410 # At this point I can only test
411 # that the key and cert files are passed in
412 # correctly to httplib. It would be nice to have
413 # a real https endpoint to test against.
414 http = httplib2.Http(timeout=2)
415
416 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
417 try:
418 (response, content) = http.request("https://bitworking.org", "GET")
419 except:
420 pass
421 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
422 self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
423
424 try:
425 (response, content) = http.request("https://notthere.bitworking.org", "GET")
426 except:
427 pass
428 self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
429 self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
430
431
432
433
434 def testGet303(self):
435 # Do a follow-up GET on a Location: header
436 # returned from a POST that gave a 303.
437 uri = urllib.parse.urljoin(base, "303/303.cgi")
438 (response, content) = self.http.request(uri, "POST", " ")
439 self.assertEqual(response.status, 200)
440 self.assertEqual(content, b"This is the final destination.\n")
441 self.assertEqual(response.previous.status, 303)
442
443 def testGet303NoRedirect(self):
444 # Do a follow-up GET on a Location: header
445 # returned from a POST that gave a 303.
446 self.http.follow_redirects = False
447 uri = urllib.parse.urljoin(base, "303/303.cgi")
448 (response, content) = self.http.request(uri, "POST", " ")
449 self.assertEqual(response.status, 303)
450
451 def test303ForDifferentMethods(self):
452 # Test that all methods can be used
453 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
454 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
455 (response, content) = self.http.request(uri, method, body=b" ")
456 self.assertEqual(response['x-method'], method_on_303)
457
458 def testGet304(self):
459 # Test that we use ETags properly to validate our cache
460 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
461 (response, content) = self.http.request(uri, "GET")
462 self.assertNotEqual(response['etag'], "")
463
464 (response, content) = self.http.request(uri, "GET")
465 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
466 self.assertEqual(response.status, 200)
467 self.assertEqual(response.fromcache, True)
468
469 cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
470 f = open(cache_file_name, "r")
471 status_line = f.readline()
472 f.close()
473
474 self.assertTrue(status_line.startswith("status:"))
475
476 (response, content) = self.http.request(uri, "HEAD")
477 self.assertEqual(response.status, 200)
478 self.assertEqual(response.fromcache, True)
479
480 (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
481 self.assertEqual(response.status, 206)
482 self.assertEqual(response.fromcache, False)
483
484 def testGetIgnoreEtag(self):
485 # Test that we can forcibly ignore ETags
486 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
487 (response, content) = self.http.request(uri, "GET")
488 self.assertNotEqual(response['etag'], "")
489
490 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
491 d = self.reflector(content)
492 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
493
494 self.http.ignore_etag = True
495 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
496 d = self.reflector(content)
497 self.assertEqual(response.fromcache, False)
498 self.assertFalse('HTTP_IF_NONE_MATCH' in d)
499
500 def testOverrideEtag(self):
501 # Test that we can forcibly ignore ETags
502 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
503 (response, content) = self.http.request(uri, "GET")
504 self.assertNotEqual(response['etag'], "")
505
506 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
507 d = self.reflector(content)
508 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
509 self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
510
511 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
512 d = self.reflector(content)
513 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
514 self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
515
516#MAP-commented this out because it consistently fails
517# def testGet304EndToEnd(self):
518# # Test that end to end headers get overwritten in the cache
519# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
520# (response, content) = self.http.request(uri, "GET")
521# self.assertNotEqual(response['etag'], "")
522# old_date = response['date']
523# time.sleep(2)
524#
525# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
526# # The response should be from the cache, but the Date: header should be updated.
527# new_date = response['date']
528# self.assertNotEqual(new_date, old_date)
529# self.assertEqual(response.status, 200)
530# self.assertEqual(response.fromcache, True)
531
532 def testGet304LastModified(self):
533 # Test that we can still handle a 304
534 # by only using the last-modified cache validator.
535 uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
536 (response, content) = self.http.request(uri, "GET")
537
538 self.assertNotEqual(response['last-modified'], "")
539 (response, content) = self.http.request(uri, "GET")
540 (response, content) = self.http.request(uri, "GET")
541 self.assertEqual(response.status, 200)
542 self.assertEqual(response.fromcache, True)
543
544 def testGet307(self):
545 # Test that we do follow 307 redirects but
546 # do not cache the 307
547 uri = urllib.parse.urljoin(base, "307/onestep.asis")
548 (response, content) = self.http.request(uri, "GET")
549 self.assertEqual(response.status, 200)
550 self.assertEqual(content, b"This is the final destination.\n")
551 self.assertEqual(response.previous.status, 307)
552 self.assertEqual(response.previous.fromcache, False)
553
554 (response, content) = self.http.request(uri, "GET")
555 self.assertEqual(response.status, 200)
556 self.assertEqual(response.fromcache, True)
557 self.assertEqual(content, b"This is the final destination.\n")
558 self.assertEqual(response.previous.status, 307)
559 self.assertEqual(response.previous.fromcache, False)
560
561 def testGet410(self):
562 # Test that we pass 410's through
563 uri = urllib.parse.urljoin(base, "410/410.asis")
564 (response, content) = self.http.request(uri, "GET")
565 self.assertEqual(response.status, 410)
566
chris.dent@gmail.comae846ca2009-12-24 14:02:57 -0600567 def testVaryHeaderSimple(self):
568 """
569 RFC 2616 13.6
570 When the cache receives a subsequent request whose Request-URI
571 specifies one or more cache entries including a Vary header field,
572 the cache MUST NOT use such a cache entry to construct a response
573 to the new request unless all of the selecting request-headers
574 present in the new request match the corresponding stored
575 request-headers in the original request.
576 """
577 # test that the vary header is sent
578 uri = urllib.parse.urljoin(base, "vary/accept.asis")
579 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
580 self.assertEqual(response.status, 200)
581 self.assertTrue('vary' in response)
582
583 # get the resource again, from the cache since accept header in this
584 # request is the same as the request
585 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
586 self.assertEqual(response.status, 200)
587 self.assertEqual(response.fromcache, True, msg="Should be from cache")
588
589 # get the resource again, not from cache since Accept headers does not match
590 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
591 self.assertEqual(response.status, 200)
592 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
593
594 # get the resource again, without any Accept header, so again no match
595 (response, content) = self.http.request(uri, "GET")
596 self.assertEqual(response.status, 200)
597 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
598
599 def testNoVary(self):
600 # when there is no vary, a different Accept header (e.g.) should not
601 # impact if the cache is used
602 # test that the vary header is not sent
603 uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
604 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
605 self.assertEqual(response.status, 200)
606 self.assertFalse('vary' in response)
607
608 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
609 self.assertEqual(response.status, 200)
610 self.assertEqual(response.fromcache, True, msg="Should be from cache")
611
612 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
613 self.assertEqual(response.status, 200)
614 self.assertEqual(response.fromcache, True, msg="Should be from cache")
615
616 def testVaryHeaderDouble(self):
617 uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
618 (response, content) = self.http.request(uri, "GET", headers={
619 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
620 self.assertEqual(response.status, 200)
621 self.assertTrue('vary' in response)
622
623 # we are from cache
624 (response, content) = self.http.request(uri, "GET", headers={
625 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
626 self.assertEqual(response.fromcache, True, msg="Should be from cache")
627
628 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
629 self.assertEqual(response.status, 200)
630 self.assertEqual(response.fromcache, False)
631
632 # get the resource again, not from cache, varied headers don't match exact
633 (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
634 self.assertEqual(response.status, 200)
635 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
636
jcgregorio@localhost9e603da2010-05-13 23:42:11 -0400637 def testVaryUnusedHeader(self):
638 # A header's value is not considered to vary if it's not used at all.
639 uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
640 (response, content) = self.http.request(uri, "GET", headers={
641 'Accept': 'text/plain'})
642 self.assertEqual(response.status, 200)
643 self.assertTrue('vary' in response)
644
645 # we are from cache
646 (response, content) = self.http.request(uri, "GET", headers={
647 'Accept': 'text/plain',})
648 self.assertEqual(response.fromcache, True, msg="Should be from cache")
649
pilgrim00a352e2009-05-29 04:04:44 +0000650 def testHeadGZip(self):
651 # Test that we don't try to decompress a HEAD response
652 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
653 (response, content) = self.http.request(uri, "HEAD")
654 self.assertEqual(response.status, 200)
655 self.assertNotEqual(int(response['content-length']), 0)
656 self.assertEqual(content, b"")
657
658 def testGetGZip(self):
659 # Test that we support gzip compression
660 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
661 (response, content) = self.http.request(uri, "GET")
662 self.assertEqual(response.status, 200)
663 self.assertFalse('content-encoding' in response)
664 self.assertTrue('-content-encoding' in response)
665 self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
666 self.assertEqual(content, b"This is the final destination.\n")
667
668 def testGetGZipFailure(self):
669 # Test that we raise a good exception when the gzip fails
670 self.http.force_exception_to_status_code = False
671 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
672 try:
673 (response, content) = self.http.request(uri, "GET")
674 self.fail("Should never reach here")
675 except httplib2.FailedToDecompressContent:
676 pass
677 except Exception:
678 self.fail("Threw wrong kind of exception")
679
680 # Re-run the test with out the exceptions
681 self.http.force_exception_to_status_code = True
682
683 (response, content) = self.http.request(uri, "GET")
684 self.assertEqual(response.status, 500)
685 self.assertTrue(response.reason.startswith("Content purported"))
686
687 def testTimeout(self):
688 self.http.force_exception_to_status_code = True
689 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
690 try:
691 import socket
692 socket.setdefaulttimeout(1)
693 except:
694 # Don't run the test if we can't set the timeout
695 return
696 (response, content) = self.http.request(uri)
697 self.assertEqual(response.status, 408)
698 self.assertTrue(response.reason.startswith("Request Timeout"))
699 self.assertTrue(content.startswith(b"Request Timeout"))
700
701 def testIndividualTimeout(self):
702 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
703 http = httplib2.Http(timeout=1)
704 http.force_exception_to_status_code = True
705
706 (response, content) = http.request(uri)
707 self.assertEqual(response.status, 408)
708 self.assertTrue(response.reason.startswith("Request Timeout"))
709 self.assertTrue(content.startswith(b"Request Timeout"))
710
711
712 def testGetDeflate(self):
713 # Test that we support deflate compression
714 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
715 (response, content) = self.http.request(uri, "GET")
716 self.assertEqual(response.status, 200)
717 self.assertFalse('content-encoding' in response)
718 self.assertEqual(int(response['content-length']), len("This is the final destination."))
719 self.assertEqual(content, b"This is the final destination.")
720
721 def testGetDeflateFailure(self):
722 # Test that we raise a good exception when the deflate fails
723 self.http.force_exception_to_status_code = False
724
725 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
726 try:
727 (response, content) = self.http.request(uri, "GET")
728 self.fail("Should never reach here")
729 except httplib2.FailedToDecompressContent:
730 pass
731 except Exception:
732 self.fail("Threw wrong kind of exception")
733
734 # Re-run the test with out the exceptions
735 self.http.force_exception_to_status_code = True
736
737 (response, content) = self.http.request(uri, "GET")
738 self.assertEqual(response.status, 500)
739 self.assertTrue(response.reason.startswith("Content purported"))
740
741 def testGetDuplicateHeaders(self):
742 # Test that duplicate headers get concatenated via ','
743 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
744 (response, content) = self.http.request(uri, "GET")
745 self.assertEqual(response.status, 200)
746 self.assertEqual(content, b"This is content\n")
747 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
748
749 def testGetCacheControlNoCache(self):
750 # Test Cache-Control: no-cache on requests
751 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
752 (response, content) = self.http.request(uri, "GET")
753 self.assertNotEqual(response['etag'], "")
754 (response, content) = self.http.request(uri, "GET")
755 self.assertEqual(response.status, 200)
756 self.assertEqual(response.fromcache, True)
757
758 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
759 self.assertEqual(response.status, 200)
760 self.assertEqual(response.fromcache, False)
761
762 def testGetCacheControlPragmaNoCache(self):
763 # Test Pragma: no-cache on requests
764 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
765 (response, content) = self.http.request(uri, "GET")
766 self.assertNotEqual(response['etag'], "")
767 (response, content) = self.http.request(uri, "GET")
768 self.assertEqual(response.status, 200)
769 self.assertEqual(response.fromcache, True)
770
771 (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
772 self.assertEqual(response.status, 200)
773 self.assertEqual(response.fromcache, False)
774
775 def testGetCacheControlNoStoreRequest(self):
776 # A no-store request means that the response should not be stored.
777 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
778
779 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
780 self.assertEqual(response.status, 200)
781 self.assertEqual(response.fromcache, False)
782
783 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
784 self.assertEqual(response.status, 200)
785 self.assertEqual(response.fromcache, False)
786
787 def testGetCacheControlNoStoreResponse(self):
788 # A no-store response means that the response should not be stored.
789 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
790
791 (response, content) = self.http.request(uri, "GET")
792 self.assertEqual(response.status, 200)
793 self.assertEqual(response.fromcache, False)
794
795 (response, content) = self.http.request(uri, "GET")
796 self.assertEqual(response.status, 200)
797 self.assertEqual(response.fromcache, False)
798
799 def testGetCacheControlNoCacheNoStoreRequest(self):
800 # Test that a no-store, no-cache clears the entry from the cache
801 # even if it was cached previously.
802 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
803
804 (response, content) = self.http.request(uri, "GET")
805 (response, content) = self.http.request(uri, "GET")
806 self.assertEqual(response.fromcache, True)
807 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
808 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
809 self.assertEqual(response.status, 200)
810 self.assertEqual(response.fromcache, False)
811
812 def testUpdateInvalidatesCache(self):
813 # Test that calling PUT or DELETE on a
814 # URI that is cache invalidates that cache.
815 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
816
817 (response, content) = self.http.request(uri, "GET")
818 (response, content) = self.http.request(uri, "GET")
819 self.assertEqual(response.fromcache, True)
820 (response, content) = self.http.request(uri, "DELETE")
821 self.assertEqual(response.status, 405)
822
823 (response, content) = self.http.request(uri, "GET")
824 self.assertEqual(response.fromcache, False)
825
826 def testUpdateUsesCachedETag(self):
827 # Test that we natively support http://www.w3.org/1999/04/Editing/
828 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
829
830 (response, content) = self.http.request(uri, "GET")
831 self.assertEqual(response.status, 200)
832 self.assertEqual(response.fromcache, False)
833 (response, content) = self.http.request(uri, "GET")
834 self.assertEqual(response.status, 200)
835 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400836 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000837 self.assertEqual(response.status, 200)
Joe Gregorio799b2072009-09-29 17:21:19 -0400838 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000839 self.assertEqual(response.status, 412)
840
841 def testUpdateUsesCachedETagAndOCMethod(self):
842 # Test that we natively support http://www.w3.org/1999/04/Editing/
843 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
844
845 (response, content) = self.http.request(uri, "GET")
846 self.assertEqual(response.status, 200)
847 self.assertEqual(response.fromcache, False)
848 (response, content) = self.http.request(uri, "GET")
849 self.assertEqual(response.status, 200)
850 self.assertEqual(response.fromcache, True)
851 self.http.optimistic_concurrency_methods.append("DELETE")
852 (response, content) = self.http.request(uri, "DELETE")
853 self.assertEqual(response.status, 200)
854
855
856 def testUpdateUsesCachedETagOverridden(self):
857 # Test that we natively support http://www.w3.org/1999/04/Editing/
858 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
859
860 (response, content) = self.http.request(uri, "GET")
861 self.assertEqual(response.status, 200)
862 self.assertEqual(response.fromcache, False)
863 (response, content) = self.http.request(uri, "GET")
864 self.assertEqual(response.status, 200)
865 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400866 (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
pilgrim00a352e2009-05-29 04:04:44 +0000867 self.assertEqual(response.status, 412)
868
869 def testBasicAuth(self):
870 # Test Basic Authentication
871 uri = urllib.parse.urljoin(base, "basic/file.txt")
872 (response, content) = self.http.request(uri, "GET")
873 self.assertEqual(response.status, 401)
874
875 uri = urllib.parse.urljoin(base, "basic/")
876 (response, content) = self.http.request(uri, "GET")
877 self.assertEqual(response.status, 401)
878
879 self.http.add_credentials('joe', 'password')
880 (response, content) = self.http.request(uri, "GET")
881 self.assertEqual(response.status, 200)
882
883 uri = urllib.parse.urljoin(base, "basic/file.txt")
884 (response, content) = self.http.request(uri, "GET")
885 self.assertEqual(response.status, 200)
886
887 def testBasicAuthWithDomain(self):
888 # Test Basic Authentication
889 uri = urllib.parse.urljoin(base, "basic/file.txt")
890 (response, content) = self.http.request(uri, "GET")
891 self.assertEqual(response.status, 401)
892
893 uri = urllib.parse.urljoin(base, "basic/")
894 (response, content) = self.http.request(uri, "GET")
895 self.assertEqual(response.status, 401)
896
897 self.http.add_credentials('joe', 'password', "example.org")
898 (response, content) = self.http.request(uri, "GET")
899 self.assertEqual(response.status, 401)
900
901 uri = urllib.parse.urljoin(base, "basic/file.txt")
902 (response, content) = self.http.request(uri, "GET")
903 self.assertEqual(response.status, 401)
904
905 domain = urllib.parse.urlparse(base)[1]
906 self.http.add_credentials('joe', 'password', domain)
907 (response, content) = self.http.request(uri, "GET")
908 self.assertEqual(response.status, 200)
909
910 uri = urllib.parse.urljoin(base, "basic/file.txt")
911 (response, content) = self.http.request(uri, "GET")
912 self.assertEqual(response.status, 200)
913
914
915
916
917
918
919 def testBasicAuthTwoDifferentCredentials(self):
920 # Test Basic Authentication with multiple sets of credentials
921 uri = urllib.parse.urljoin(base, "basic2/file.txt")
922 (response, content) = self.http.request(uri, "GET")
923 self.assertEqual(response.status, 401)
924
925 uri = urllib.parse.urljoin(base, "basic2/")
926 (response, content) = self.http.request(uri, "GET")
927 self.assertEqual(response.status, 401)
928
929 self.http.add_credentials('fred', 'barney')
930 (response, content) = self.http.request(uri, "GET")
931 self.assertEqual(response.status, 200)
932
933 uri = urllib.parse.urljoin(base, "basic2/file.txt")
934 (response, content) = self.http.request(uri, "GET")
935 self.assertEqual(response.status, 200)
936
937 def testBasicAuthNested(self):
938 # Test Basic Authentication with resources
939 # that are nested
940 uri = urllib.parse.urljoin(base, "basic-nested/")
941 (response, content) = self.http.request(uri, "GET")
942 self.assertEqual(response.status, 401)
943
944 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
945 (response, content) = self.http.request(uri, "GET")
946 self.assertEqual(response.status, 401)
947
948 # Now add in credentials one at a time and test.
949 self.http.add_credentials('joe', 'password')
950
951 uri = urllib.parse.urljoin(base, "basic-nested/")
952 (response, content) = self.http.request(uri, "GET")
953 self.assertEqual(response.status, 200)
954
955 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
956 (response, content) = self.http.request(uri, "GET")
957 self.assertEqual(response.status, 401)
958
959 self.http.add_credentials('fred', 'barney')
960
961 uri = urllib.parse.urljoin(base, "basic-nested/")
962 (response, content) = self.http.request(uri, "GET")
963 self.assertEqual(response.status, 200)
964
965 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
966 (response, content) = self.http.request(uri, "GET")
967 self.assertEqual(response.status, 200)
968
969 def testDigestAuth(self):
970 # Test that we support Digest Authentication
971 uri = urllib.parse.urljoin(base, "digest/")
972 (response, content) = self.http.request(uri, "GET")
973 self.assertEqual(response.status, 401)
974
975 self.http.add_credentials('joe', 'password')
976 (response, content) = self.http.request(uri, "GET")
977 self.assertEqual(response.status, 200)
978
979 uri = urllib.parse.urljoin(base, "digest/file.txt")
980 (response, content) = self.http.request(uri, "GET")
981
982 def testDigestAuthNextNonceAndNC(self):
983 # Test that if the server sets nextnonce that we reset
984 # the nonce count back to 1
985 uri = urllib.parse.urljoin(base, "digest/file.txt")
986 self.http.add_credentials('joe', 'password')
987 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
988 info = httplib2._parse_www_authenticate(response, 'authentication-info')
989 self.assertEqual(response.status, 200)
990 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
991 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
992 self.assertEqual(response.status, 200)
993
994 if 'nextnonce' in info:
995 self.assertEqual(info2['nc'], 1)
996
997 def testDigestAuthStale(self):
998 # Test that we can handle a nonce becoming stale
999 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
1000 self.http.add_credentials('joe', 'password')
1001 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1002 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1003 self.assertEqual(response.status, 200)
1004
1005 time.sleep(3)
1006 # Sleep long enough that the nonce becomes stale
1007
1008 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1009 self.assertFalse(response.fromcache)
1010 self.assertTrue(response._stale_digest)
1011 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
1012 self.assertEqual(response.status, 200)
1013
1014 def reflector(self, content):
1015 return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
1016
1017 def testReflector(self):
1018 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1019 (response, content) = self.http.request(uri, "GET")
1020 d = self.reflector(content)
1021 self.assertTrue('HTTP_USER_AGENT' in d)
1022
Joe Gregorio84cc10a2009-09-01 13:02:49 -04001023
1024 def testConnectionClose(self):
1025 uri = "http://www.google.com/"
1026 (response, content) = self.http.request(uri, "GET")
1027 for c in self.http.connections.values():
1028 self.assertNotEqual(None, c.sock)
1029 (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
1030 for c in self.http.connections.values():
1031 self.assertEqual(None, c.sock)
1032
pilgrim00a352e2009-05-29 04:04:44 +00001033try:
1034 import memcache
1035 class HttpTestMemCached(HttpTest):
1036 def setUp(self):
1037 self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
1038 #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1039 self.http = httplib2.Http(self.cache)
1040 self.cache.flush_all()
1041 # Not exactly sure why the sleep is needed here, but
1042 # if not present then some unit tests that rely on caching
1043 # fail. Memcached seems to lose some sets immediately
1044 # after a flush_all if the set is to a value that
1045 # was previously cached. (Maybe the flush is handled async?)
1046 time.sleep(1)
1047 self.http.clear_credentials()
1048except:
1049 pass
1050
1051
1052
1053# ------------------------------------------------------------------------
1054
1055class HttpPrivateTest(unittest.TestCase):
1056
1057 def testParseCacheControl(self):
1058 # Test that we can parse the Cache-Control header
1059 self.assertEqual({}, httplib2._parse_cache_control({}))
1060 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
1061 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
1062 self.assertEqual(cc['no-cache'], 1)
1063 self.assertEqual(cc['max-age'], '7200')
1064 cc = httplib2._parse_cache_control({'cache-control': ' , '})
1065 self.assertEqual(cc[''], 1)
1066
Joe Gregorioe314e8b2009-07-16 20:11:28 -04001067 try:
1068 cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
1069 self.assertTrue("max-age" in cc)
1070 except:
1071 self.fail("Should not throw exception")
1072
1073
1074
1075
pilgrim00a352e2009-05-29 04:04:44 +00001076 def testNormalizeHeaders(self):
1077 # Test that we normalize headers to lowercase
1078 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
1079 self.assertTrue('cache-control' in h)
1080 self.assertTrue('other' in h)
1081 self.assertEqual('Stuff', h['other'])
1082
1083 def testExpirationModelTransparent(self):
1084 # Test that no-cache makes our request TRANSPARENT
1085 response_headers = {
1086 'cache-control': 'max-age=7200'
1087 }
1088 request_headers = {
1089 'cache-control': 'no-cache'
1090 }
1091 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
1092
1093 def testMaxAgeNonNumeric(self):
1094 # Test that no-cache makes our request TRANSPARENT
1095 response_headers = {
1096 'cache-control': 'max-age=fred, min-fresh=barney'
1097 }
1098 request_headers = {
1099 }
1100 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1101
1102
1103 def testExpirationModelNoCacheResponse(self):
1104 # The date and expires point to an entry that should be
1105 # FRESH, but the no-cache over-rides that.
1106 now = time.time()
1107 response_headers = {
1108 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1109 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1110 'cache-control': 'no-cache'
1111 }
1112 request_headers = {
1113 }
1114 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1115
1116 def testExpirationModelStaleRequestMustReval(self):
1117 # must-revalidate forces STALE
1118 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
1119
1120 def testExpirationModelStaleResponseMustReval(self):
1121 # must-revalidate forces STALE
1122 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
1123
1124 def testExpirationModelFresh(self):
1125 response_headers = {
1126 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1127 'cache-control': 'max-age=2'
1128 }
1129 request_headers = {
1130 }
1131 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1132 time.sleep(3)
1133 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1134
1135 def testExpirationMaxAge0(self):
1136 response_headers = {
1137 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1138 'cache-control': 'max-age=0'
1139 }
1140 request_headers = {
1141 }
1142 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1143
1144 def testExpirationModelDateAndExpires(self):
1145 now = time.time()
1146 response_headers = {
1147 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1148 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1149 }
1150 request_headers = {
1151 }
1152 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1153 time.sleep(3)
1154 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1155
1156 def testExpiresZero(self):
1157 now = time.time()
1158 response_headers = {
1159 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1160 'expires': "0",
1161 }
1162 request_headers = {
1163 }
1164 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1165
1166 def testExpirationModelDateOnly(self):
1167 now = time.time()
1168 response_headers = {
1169 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
1170 }
1171 request_headers = {
1172 }
1173 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1174
1175 def testExpirationModelOnlyIfCached(self):
1176 response_headers = {
1177 }
1178 request_headers = {
1179 'cache-control': 'only-if-cached',
1180 }
1181 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1182
1183 def testExpirationModelMaxAgeBoth(self):
1184 now = time.time()
1185 response_headers = {
1186 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1187 'cache-control': 'max-age=2'
1188 }
1189 request_headers = {
1190 'cache-control': 'max-age=0'
1191 }
1192 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1193
1194 def testExpirationModelDateAndExpiresMinFresh1(self):
1195 now = time.time()
1196 response_headers = {
1197 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1198 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1199 }
1200 request_headers = {
1201 'cache-control': 'min-fresh=2'
1202 }
1203 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1204
1205 def testExpirationModelDateAndExpiresMinFresh2(self):
1206 now = time.time()
1207 response_headers = {
1208 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1209 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1210 }
1211 request_headers = {
1212 'cache-control': 'min-fresh=2'
1213 }
1214 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1215
1216 def testParseWWWAuthenticateEmpty(self):
1217 res = httplib2._parse_www_authenticate({})
1218 self.assertEqual(len(list(res.keys())), 0)
1219
1220 def testParseWWWAuthenticate(self):
1221 # different uses of spaces around commas
1222 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
1223 self.assertEqual(len(list(res.keys())), 1)
1224 self.assertEqual(len(list(res['test'].keys())), 5)
1225
1226 # tokens with non-alphanum
1227 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
1228 self.assertEqual(len(list(res.keys())), 1)
1229 self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
1230
1231 # quoted string with quoted pairs
1232 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
1233 self.assertEqual(len(list(res.keys())), 1)
1234 self.assertEqual(res['test']['realm'], 'a "test" realm')
1235
1236 def testParseWWWAuthenticateStrict(self):
1237 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
1238 self.testParseWWWAuthenticate();
1239 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
1240
1241 def testParseWWWAuthenticateBasic(self):
1242 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
1243 basic = res['basic']
1244 self.assertEqual('me', basic['realm'])
1245
1246 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
1247 basic = res['basic']
1248 self.assertEqual('me', basic['realm'])
1249 self.assertEqual('MD5', basic['algorithm'])
1250
1251 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
1252 basic = res['basic']
1253 self.assertEqual('me', basic['realm'])
1254 self.assertEqual('MD5', basic['algorithm'])
1255
1256 def testParseWWWAuthenticateBasic2(self):
1257 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
1258 basic = res['basic']
1259 self.assertEqual('me', basic['realm'])
1260 self.assertEqual('fred', basic['other'])
1261
1262 def testParseWWWAuthenticateBasic3(self):
1263 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
1264 basic = res['basic']
1265 self.assertEqual('me', basic['realm'])
1266
1267
1268 def testParseWWWAuthenticateDigest(self):
1269 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1270 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
1271 digest = res['digest']
1272 self.assertEqual('testrealm@host.com', digest['realm'])
1273 self.assertEqual('auth,auth-int', digest['qop'])
1274
1275
1276 def testParseWWWAuthenticateMultiple(self):
1277 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1278 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
1279 digest = res['digest']
1280 self.assertEqual('testrealm@host.com', digest['realm'])
1281 self.assertEqual('auth,auth-int', digest['qop'])
1282 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1283 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1284 basic = res['basic']
1285 self.assertEqual('me', basic['realm'])
1286
1287 def testParseWWWAuthenticateMultiple2(self):
1288 # Handle an added comma between challenges, which might get thrown in if the challenges were
1289 # originally sent in separate www-authenticate headers.
1290 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1291 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
1292 digest = res['digest']
1293 self.assertEqual('testrealm@host.com', digest['realm'])
1294 self.assertEqual('auth,auth-int', digest['qop'])
1295 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1296 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1297 basic = res['basic']
1298 self.assertEqual('me', basic['realm'])
1299
1300 def testParseWWWAuthenticateMultiple3(self):
1301 # Handle an added comma between challenges, which might get thrown in if the challenges were
1302 # originally sent in separate www-authenticate headers.
1303 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1304 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1305 digest = res['digest']
1306 self.assertEqual('testrealm@host.com', digest['realm'])
1307 self.assertEqual('auth,auth-int', digest['qop'])
1308 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1309 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1310 basic = res['basic']
1311 self.assertEqual('me', basic['realm'])
1312 wsse = res['wsse']
1313 self.assertEqual('foo', wsse['realm'])
1314 self.assertEqual('UsernameToken', wsse['profile'])
1315
1316 def testParseWWWAuthenticateMultiple4(self):
1317 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1318 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1319 digest = res['digest']
1320 self.assertEqual('test-real.m@host.com', digest['realm'])
1321 self.assertEqual('\tauth,auth-int', digest['qop'])
1322 self.assertEqual('(*)&^&$%#', digest['nonce'])
1323
1324 def testParseWWWAuthenticateMoreQuoteCombos(self):
1325 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
1326 digest = res['digest']
1327 self.assertEqual('myrealm', digest['realm'])
1328
1329 def testDigestObject(self):
1330 credentials = ('joe', 'password')
1331 host = None
1332 request_uri = '/projects/httplib2/test/digest/'
1333 headers = {}
1334 response = {
1335 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
1336 }
1337 content = b""
1338
1339 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1340 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1341 our_request = "Authorization: %s" % headers['Authorization']
1342 working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
1343 self.assertEqual(our_request, working_request)
1344
1345
1346 def testDigestObjectStale(self):
1347 credentials = ('joe', 'password')
1348 host = None
1349 request_uri = '/projects/httplib2/test/digest/'
1350 headers = {}
1351 response = httplib2.Response({ })
1352 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1353 response.status = 401
1354 content = b""
1355 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1356 # Returns true to force a retry
1357 self.assertTrue( d.response(response, content) )
1358
1359 def testDigestObjectAuthInfo(self):
1360 credentials = ('joe', 'password')
1361 host = None
1362 request_uri = '/projects/httplib2/test/digest/'
1363 headers = {}
1364 response = httplib2.Response({ })
1365 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1366 response['authentication-info'] = 'nextnonce="fred"'
1367 content = b""
1368 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1369 # Returns true to force a retry
1370 self.assertFalse( d.response(response, content) )
1371 self.assertEqual('fred', d.challenge['nonce'])
1372 self.assertEqual(1, d.challenge['nc'])
1373
1374 def testWsseAlgorithm(self):
1375 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
1376 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1377 self.assertEqual(expected, digest)
1378
1379 def testEnd2End(self):
1380 # one end to end header
1381 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
1382 end2end = httplib2._get_end2end_headers(response)
1383 self.assertTrue('content-type' in end2end)
1384 self.assertTrue('te' not in end2end)
1385 self.assertTrue('connection' not in end2end)
1386
1387 # one end to end header that gets eliminated
1388 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
1389 end2end = httplib2._get_end2end_headers(response)
1390 self.assertTrue('content-type' not in end2end)
1391 self.assertTrue('te' not in end2end)
1392 self.assertTrue('connection' not in end2end)
1393
1394 # Degenerate case of no headers
1395 response = {}
1396 end2end = httplib2._get_end2end_headers(response)
1397 self.assertEquals(0, len(end2end))
1398
1399 # Degenerate case of connection referrring to a header not passed in
1400 response = {'connection': 'content-type'}
1401 end2end = httplib2._get_end2end_headers(response)
1402 self.assertEquals(0, len(end2end))
1403
1404unittest.main()