blob: 46a850d9860d20bcd8a12569a6ad3f3d67e069fc [file] [log] [blame]
pilgrim00a352e2009-05-29 04:04:44 +00001#!/usr/bin/env python3
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 3.0 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = ["Mark Pilgrim"]
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.2 ($Rev: 118 $)"
16
17
18import sys
19import unittest
20import http.client
21import httplib2
22import os
23import urllib.parse
24import time
25import base64
26import io
27
28# The test resources base uri
29base = 'http://bitworking.org/projects/httplib2/test/'
30#base = 'http://localhost/projects/httplib2/test/'
31cacheDirName = ".cache"
32
33
34class CredentialsTest(unittest.TestCase):
35 def test(self):
36 c = httplib2.Credentials()
37 c.add("joe", "password")
38 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
39 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
40 c.add("fred", "password2", "wellformedweb.org")
41 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
42 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
43 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
44 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
45 c.clear()
46 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
47 c.add("fred", "password2", "wellformedweb.org")
48 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
49 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
50 self.assertEqual(0, len(list(c.iter(""))))
51
52
53class ParserTest(unittest.TestCase):
54 def testFromStd66(self):
55 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
56 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
57 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
58 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
59 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
60 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
61 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
62 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
63
64
65class UrlNormTest(unittest.TestCase):
66 def test(self):
67 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
68 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
69 self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
70 self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
71 self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
72 self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
73 try:
74 httplib2.urlnorm("/")
75 self.fail("Non-absolute URIs should raise an exception")
76 except httplib2.RelativeURIError:
77 pass
78
79class UrlSafenameTest(unittest.TestCase):
80 def test(self):
81 # Test that different URIs end up generating different safe names
82 self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
83 self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
84 self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
85 self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
86 self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
87 self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
88 # Test the max length limits
89 uri = "http://" + ("w" * 200) + ".org"
90 uri2 = "http://" + ("w" * 201) + ".org"
91 self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
92 # Max length should be 200 + 1 (",") + 32
93 self.assertEqual(233, len(httplib2.safename(uri2)))
94 self.assertEqual(233, len(httplib2.safename(uri)))
95 # Unicode
96 if sys.version_info >= (2,3):
97 self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
98
99class _MyResponse(io.BytesIO):
100 def __init__(self, body, **kwargs):
101 io.BytesIO.__init__(self, body)
102 self.headers = kwargs
103
104 def items(self):
105 return self.headers.items()
106
107 def iteritems(self):
108 return iter(self.headers.items())
109
110
111class _MyHTTPConnection(object):
112 "This class is just a mock of httplib.HTTPConnection used for testing"
113
114 def __init__(self, host, port=None, key_file=None, cert_file=None,
115 strict=None, timeout=None, proxy_info=None):
116 self.host = host
117 self.port = port
118 self.timeout = timeout
119 self.log = ""
120
121 def set_debuglevel(self, level):
122 pass
123
124 def connect(self):
125 "Connect to a host on a given port."
126 pass
127
128 def close(self):
129 pass
130
131 def request(self, method, request_uri, body, headers):
132 pass
133
134 def getresponse(self):
135 return _MyResponse(b"the body", status="200")
136
137
138class HttpTest(unittest.TestCase):
139 def setUp(self):
140 if os.path.exists(cacheDirName):
141 [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
142 self.http = httplib2.Http(cacheDirName)
143 self.http.clear_credentials()
144
145 def testConnectionType(self):
146 self.http.force_exception_to_status_code = False
147 response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
148 self.assertEqual(response['content-location'], "http://bitworking.org")
149 self.assertEqual(content, b"the body")
150
151 def testGetUnknownServer(self):
152 self.http.force_exception_to_status_code = False
153 try:
154 self.http.request("http://fred.bitworking.org/")
155 self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
156 except httplib2.ServerNotFoundError:
157 pass
158
159 # Now test with exceptions turned off
160 self.http.force_exception_to_status_code = True
161
162 (response, content) = self.http.request("http://fred.bitworking.org/")
163 self.assertEqual(response['content-type'], 'text/plain')
164 self.assertTrue(content.startswith(b"Unable to find"))
165 self.assertEqual(response.status, 400)
166
167 def testGetIRI(self):
168 if sys.version_info >= (2,3):
169 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
170 (response, content) = self.http.request(uri, "GET")
171 d = self.reflector(content)
172 self.assertTrue('QUERY_STRING' in d)
173 self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
174
175 def testGetIsDefaultMethod(self):
176 # Test that GET is the default method
177 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
178 (response, content) = self.http.request(uri)
179 self.assertEqual(response['x-method'], "GET")
180
181 def testDifferentMethods(self):
182 # Test that all methods can be used
183 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
184 for method in ["GET", "PUT", "DELETE", "POST"]:
185 (response, content) = self.http.request(uri, method, body=b" ")
186 self.assertEqual(response['x-method'], method)
187
Joe Gregoriob628c0b2009-07-16 12:28:04 -0400188 def testHeadRead(self):
189 # Test that we don't try to read the response of a HEAD request
190 # since httplib blocks response.read() for HEAD requests.
191 # Oddly enough this doesn't appear as a problem when doing HEAD requests
192 # against Apache servers.
193 uri = "http://www.google.com/"
194 (response, content) = self.http.request(uri, "HEAD")
195 self.assertEqual(response.status, 200)
196 self.assertEqual(content, b"")
197
pilgrim00a352e2009-05-29 04:04:44 +0000198 def testGetNoCache(self):
199 # Test that can do a GET w/o the cache turned on.
200 http = httplib2.Http()
201 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
202 (response, content) = http.request(uri, "GET")
203 self.assertEqual(response.status, 200)
204 self.assertEqual(response.previous, None)
205
Joe Gregorioe202d212009-07-16 14:57:52 -0400206 def testGetOnlyIfCachedCacheHit(self):
207 # Test that can do a GET with cache and 'only-if-cached'
208 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
209 (response, content) = self.http.request(uri, "GET")
210 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
211 self.assertEqual(response.fromcache, True)
212 self.assertEqual(response.status, 200)
213
pilgrim00a352e2009-05-29 04:04:44 +0000214 def testGetOnlyIfCachedCacheMiss(self):
215 # Test that can do a GET with no cache with 'only-if-cached'
pilgrim00a352e2009-05-29 04:04:44 +0000216 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorioe202d212009-07-16 14:57:52 -0400217 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
pilgrim00a352e2009-05-29 04:04:44 +0000218 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400219 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000220
221 def testGetOnlyIfCachedNoCacheAtAll(self):
222 # Test that can do a GET with no cache with 'only-if-cached'
223 # Of course, there might be an intermediary beyond us
224 # that responds to the 'only-if-cached', so this
225 # test can't really be guaranteed to pass.
226 http = httplib2.Http()
227 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
228 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
229 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400230 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000231
232 def testUserAgent(self):
233 # Test that we provide a default user-agent
234 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
235 (response, content) = self.http.request(uri, "GET")
236 self.assertEqual(response.status, 200)
237 self.assertTrue(content.startswith(b"Python-httplib2/"))
238
239 def testUserAgentNonDefault(self):
240 # Test that the default user-agent can be over-ridden
241
242 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
243 (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
244 self.assertEqual(response.status, 200)
245 self.assertTrue(content.startswith(b"fred/1.0"))
246
247 def testGet300WithLocation(self):
248 # Test the we automatically follow 300 redirects if a Location: header is provided
249 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
250 (response, content) = self.http.request(uri, "GET")
251 self.assertEqual(response.status, 200)
252 self.assertEqual(content, b"This is the final destination.\n")
253 self.assertEqual(response.previous.status, 300)
254 self.assertEqual(response.previous.fromcache, False)
255
256 # Confirm that the intermediate 300 is not cached
257 (response, content) = self.http.request(uri, "GET")
258 self.assertEqual(response.status, 200)
259 self.assertEqual(content, b"This is the final destination.\n")
260 self.assertEqual(response.previous.status, 300)
261 self.assertEqual(response.previous.fromcache, False)
262
263 def testGet300WithLocationNoRedirect(self):
264 # Test the we automatically follow 300 redirects if a Location: header is provided
265 self.http.follow_redirects = False
266 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
267 (response, content) = self.http.request(uri, "GET")
268 self.assertEqual(response.status, 300)
269
270 def testGet300WithoutLocation(self):
271 # Not giving a Location: header in a 300 response is acceptable
272 # In which case we just return the 300 response
273 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
274 (response, content) = self.http.request(uri, "GET")
275 self.assertEqual(response.status, 300)
276 self.assertTrue(response['content-type'].startswith("text/html"))
277 self.assertEqual(response.previous, None)
278
279 def testGet301(self):
280 # Test that we automatically follow 301 redirects
281 # and that we cache the 301 response
282 uri = urllib.parse.urljoin(base, "301/onestep.asis")
283 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
284 (response, content) = self.http.request(uri, "GET")
285 self.assertEqual(response.status, 200)
286 self.assertTrue('content-location' in response)
287 self.assertEqual(response['content-location'], destination)
288 self.assertEqual(content, b"This is the final destination.\n")
289 self.assertEqual(response.previous.status, 301)
290 self.assertEqual(response.previous.fromcache, False)
291
292 (response, content) = self.http.request(uri, "GET")
293 self.assertEqual(response.status, 200)
294 self.assertEqual(response['content-location'], destination)
295 self.assertEqual(content, b"This is the final destination.\n")
296 self.assertEqual(response.previous.status, 301)
297 self.assertEqual(response.previous.fromcache, True)
298
299
300 def testGet301NoRedirect(self):
301 # Test that we automatically follow 301 redirects
302 # and that we cache the 301 response
303 self.http.follow_redirects = False
304 uri = urllib.parse.urljoin(base, "301/onestep.asis")
305 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
306 (response, content) = self.http.request(uri, "GET")
307 self.assertEqual(response.status, 301)
308
309
310 def testGet302(self):
311 # Test that we automatically follow 302 redirects
312 # and that we DO NOT cache the 302 response
313 uri = urllib.parse.urljoin(base, "302/onestep.asis")
314 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
315 (response, content) = self.http.request(uri, "GET")
316 self.assertEqual(response.status, 200)
317 self.assertEqual(response['content-location'], destination)
318 self.assertEqual(content, b"This is the final destination.\n")
319 self.assertEqual(response.previous.status, 302)
320 self.assertEqual(response.previous.fromcache, False)
321
322 uri = urllib.parse.urljoin(base, "302/onestep.asis")
323 (response, content) = self.http.request(uri, "GET")
324 self.assertEqual(response.status, 200)
325 self.assertEqual(response.fromcache, True)
326 self.assertEqual(response['content-location'], destination)
327 self.assertEqual(content, b"This is the final destination.\n")
328 self.assertEqual(response.previous.status, 302)
329 self.assertEqual(response.previous.fromcache, False)
330 self.assertEqual(response.previous['content-location'], uri)
331
332 uri = urllib.parse.urljoin(base, "302/twostep.asis")
333
334 (response, content) = self.http.request(uri, "GET")
335 self.assertEqual(response.status, 200)
336 self.assertEqual(response.fromcache, True)
337 self.assertEqual(content, b"This is the final destination.\n")
338 self.assertEqual(response.previous.status, 302)
339 self.assertEqual(response.previous.fromcache, False)
340
341 def testGet302RedirectionLimit(self):
342 # Test that we can set a lower redirection limit
343 # and that we raise an exception when we exceed
344 # that limit.
345 self.http.force_exception_to_status_code = False
346
347 uri = urllib.parse.urljoin(base, "302/twostep.asis")
348 try:
349 (response, content) = self.http.request(uri, "GET", redirections = 1)
350 self.fail("This should not happen")
351 except httplib2.RedirectLimit:
352 pass
353 except Exception as e:
354 self.fail("Threw wrong kind of exception ")
355
356 # Re-run the test with out the exceptions
357 self.http.force_exception_to_status_code = True
358
359 (response, content) = self.http.request(uri, "GET", redirections = 1)
360 self.assertEqual(response.status, 500)
361 self.assertTrue(response.reason.startswith("Redirected more"))
362 self.assertEqual("302", response['status'])
363 self.assertTrue(content.startswith(b"<html>"))
364 self.assertTrue(response.previous != None)
365
366 def testGet302NoLocation(self):
367 # Test that we throw an exception when we get
368 # a 302 with no Location: header.
369 self.http.force_exception_to_status_code = False
370 uri = urllib.parse.urljoin(base, "302/no-location.asis")
371 try:
372 (response, content) = self.http.request(uri, "GET")
373 self.fail("Should never reach here")
374 except httplib2.RedirectMissingLocation:
375 pass
376 except Exception as e:
377 self.fail("Threw wrong kind of exception ")
378
379 # Re-run the test with out the exceptions
380 self.http.force_exception_to_status_code = True
381
382 (response, content) = self.http.request(uri, "GET")
383 self.assertEqual(response.status, 500)
384 self.assertTrue(response.reason.startswith("Redirected but"))
385 self.assertEqual("302", response['status'])
386 self.assertTrue(content.startswith(b"This is content"))
387
388 def testGet302ViaHttps(self):
389 # Google always redirects to http://google.com
390 (response, content) = self.http.request("https://google.com", "GET")
391 self.assertEqual(200, response.status)
392 self.assertEqual(302, response.previous.status)
393
394 def testGetViaHttps(self):
395 # Test that we can handle HTTPS
396 (response, content) = self.http.request("https://google.com/adsense/", "GET")
397 self.assertEqual(200, response.status)
398
399 def testGetViaHttpsSpecViolationOnLocation(self):
400 # Test that we follow redirects through HTTPS
401 # even if they violate the spec by including
402 # a relative Location: header instead of an
403 # absolute one.
404 (response, content) = self.http.request("https://google.com/adsense", "GET")
405 self.assertEqual(200, response.status)
406 self.assertNotEqual(None, response.previous)
407
408
409 def testGetViaHttpsKeyCert(self):
410 # At this point I can only test
411 # that the key and cert files are passed in
412 # correctly to httplib. It would be nice to have
413 # a real https endpoint to test against.
414 http = httplib2.Http(timeout=2)
415
416 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
417 try:
418 (response, content) = http.request("https://bitworking.org", "GET")
419 except:
420 pass
421 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
422 self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
423
424 try:
425 (response, content) = http.request("https://notthere.bitworking.org", "GET")
426 except:
427 pass
428 self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
429 self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
430
431
432
433
434 def testGet303(self):
435 # Do a follow-up GET on a Location: header
436 # returned from a POST that gave a 303.
437 uri = urllib.parse.urljoin(base, "303/303.cgi")
438 (response, content) = self.http.request(uri, "POST", " ")
439 self.assertEqual(response.status, 200)
440 self.assertEqual(content, b"This is the final destination.\n")
441 self.assertEqual(response.previous.status, 303)
442
443 def testGet303NoRedirect(self):
444 # Do a follow-up GET on a Location: header
445 # returned from a POST that gave a 303.
446 self.http.follow_redirects = False
447 uri = urllib.parse.urljoin(base, "303/303.cgi")
448 (response, content) = self.http.request(uri, "POST", " ")
449 self.assertEqual(response.status, 303)
450
451 def test303ForDifferentMethods(self):
452 # Test that all methods can be used
453 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
454 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
455 (response, content) = self.http.request(uri, method, body=b" ")
456 self.assertEqual(response['x-method'], method_on_303)
457
458 def testGet304(self):
459 # Test that we use ETags properly to validate our cache
460 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
461 (response, content) = self.http.request(uri, "GET")
462 self.assertNotEqual(response['etag'], "")
463
464 (response, content) = self.http.request(uri, "GET")
465 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
466 self.assertEqual(response.status, 200)
467 self.assertEqual(response.fromcache, True)
468
469 cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
470 f = open(cache_file_name, "r")
471 status_line = f.readline()
472 f.close()
473
474 self.assertTrue(status_line.startswith("status:"))
475
476 (response, content) = self.http.request(uri, "HEAD")
477 self.assertEqual(response.status, 200)
478 self.assertEqual(response.fromcache, True)
479
480 (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
481 self.assertEqual(response.status, 206)
482 self.assertEqual(response.fromcache, False)
483
484 def testGetIgnoreEtag(self):
485 # Test that we can forcibly ignore ETags
486 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
487 (response, content) = self.http.request(uri, "GET")
488 self.assertNotEqual(response['etag'], "")
489
490 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
491 d = self.reflector(content)
492 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
493
494 self.http.ignore_etag = True
495 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
496 d = self.reflector(content)
497 self.assertEqual(response.fromcache, False)
498 self.assertFalse('HTTP_IF_NONE_MATCH' in d)
499
500 def testOverrideEtag(self):
501 # Test that we can forcibly ignore ETags
502 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
503 (response, content) = self.http.request(uri, "GET")
504 self.assertNotEqual(response['etag'], "")
505
506 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
507 d = self.reflector(content)
508 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
509 self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
510
511 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
512 d = self.reflector(content)
513 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
514 self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
515
516#MAP-commented this out because it consistently fails
517# def testGet304EndToEnd(self):
518# # Test that end to end headers get overwritten in the cache
519# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
520# (response, content) = self.http.request(uri, "GET")
521# self.assertNotEqual(response['etag'], "")
522# old_date = response['date']
523# time.sleep(2)
524#
525# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
526# # The response should be from the cache, but the Date: header should be updated.
527# new_date = response['date']
528# self.assertNotEqual(new_date, old_date)
529# self.assertEqual(response.status, 200)
530# self.assertEqual(response.fromcache, True)
531
532 def testGet304LastModified(self):
533 # Test that we can still handle a 304
534 # by only using the last-modified cache validator.
535 uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
536 (response, content) = self.http.request(uri, "GET")
537
538 self.assertNotEqual(response['last-modified'], "")
539 (response, content) = self.http.request(uri, "GET")
540 (response, content) = self.http.request(uri, "GET")
541 self.assertEqual(response.status, 200)
542 self.assertEqual(response.fromcache, True)
543
544 def testGet307(self):
545 # Test that we do follow 307 redirects but
546 # do not cache the 307
547 uri = urllib.parse.urljoin(base, "307/onestep.asis")
548 (response, content) = self.http.request(uri, "GET")
549 self.assertEqual(response.status, 200)
550 self.assertEqual(content, b"This is the final destination.\n")
551 self.assertEqual(response.previous.status, 307)
552 self.assertEqual(response.previous.fromcache, False)
553
554 (response, content) = self.http.request(uri, "GET")
555 self.assertEqual(response.status, 200)
556 self.assertEqual(response.fromcache, True)
557 self.assertEqual(content, b"This is the final destination.\n")
558 self.assertEqual(response.previous.status, 307)
559 self.assertEqual(response.previous.fromcache, False)
560
561 def testGet410(self):
562 # Test that we pass 410's through
563 uri = urllib.parse.urljoin(base, "410/410.asis")
564 (response, content) = self.http.request(uri, "GET")
565 self.assertEqual(response.status, 410)
566
chris.dent@gmail.comae846ca2009-12-24 14:02:57 -0600567 def testVaryHeaderSimple(self):
568 """
569 RFC 2616 13.6
570 When the cache receives a subsequent request whose Request-URI
571 specifies one or more cache entries including a Vary header field,
572 the cache MUST NOT use such a cache entry to construct a response
573 to the new request unless all of the selecting request-headers
574 present in the new request match the corresponding stored
575 request-headers in the original request.
576 """
577 # test that the vary header is sent
578 uri = urllib.parse.urljoin(base, "vary/accept.asis")
579 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
580 self.assertEqual(response.status, 200)
581 self.assertTrue('vary' in response)
582
583 # get the resource again, from the cache since accept header in this
584 # request is the same as the request
585 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
586 self.assertEqual(response.status, 200)
587 self.assertEqual(response.fromcache, True, msg="Should be from cache")
588
589 # get the resource again, not from cache since Accept headers does not match
590 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
591 self.assertEqual(response.status, 200)
592 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
593
594 # get the resource again, without any Accept header, so again no match
595 (response, content) = self.http.request(uri, "GET")
596 self.assertEqual(response.status, 200)
597 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
598
599 def testNoVary(self):
600 # when there is no vary, a different Accept header (e.g.) should not
601 # impact if the cache is used
602 # test that the vary header is not sent
603 uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
604 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
605 self.assertEqual(response.status, 200)
606 self.assertFalse('vary' in response)
607
608 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
609 self.assertEqual(response.status, 200)
610 self.assertEqual(response.fromcache, True, msg="Should be from cache")
611
612 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
613 self.assertEqual(response.status, 200)
614 self.assertEqual(response.fromcache, True, msg="Should be from cache")
615
616 def testVaryHeaderDouble(self):
617 uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
618 (response, content) = self.http.request(uri, "GET", headers={
619 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
620 self.assertEqual(response.status, 200)
621 self.assertTrue('vary' in response)
622
623 # we are from cache
624 (response, content) = self.http.request(uri, "GET", headers={
625 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
626 self.assertEqual(response.fromcache, True, msg="Should be from cache")
627
628 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
629 self.assertEqual(response.status, 200)
630 self.assertEqual(response.fromcache, False)
631
632 # get the resource again, not from cache, varied headers don't match exact
633 (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
634 self.assertEqual(response.status, 200)
635 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
636
pilgrim00a352e2009-05-29 04:04:44 +0000637 def testHeadGZip(self):
638 # Test that we don't try to decompress a HEAD response
639 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
640 (response, content) = self.http.request(uri, "HEAD")
641 self.assertEqual(response.status, 200)
642 self.assertNotEqual(int(response['content-length']), 0)
643 self.assertEqual(content, b"")
644
645 def testGetGZip(self):
646 # Test that we support gzip compression
647 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
648 (response, content) = self.http.request(uri, "GET")
649 self.assertEqual(response.status, 200)
650 self.assertFalse('content-encoding' in response)
651 self.assertTrue('-content-encoding' in response)
652 self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
653 self.assertEqual(content, b"This is the final destination.\n")
654
655 def testGetGZipFailure(self):
656 # Test that we raise a good exception when the gzip fails
657 self.http.force_exception_to_status_code = False
658 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
659 try:
660 (response, content) = self.http.request(uri, "GET")
661 self.fail("Should never reach here")
662 except httplib2.FailedToDecompressContent:
663 pass
664 except Exception:
665 self.fail("Threw wrong kind of exception")
666
667 # Re-run the test with out the exceptions
668 self.http.force_exception_to_status_code = True
669
670 (response, content) = self.http.request(uri, "GET")
671 self.assertEqual(response.status, 500)
672 self.assertTrue(response.reason.startswith("Content purported"))
673
674 def testTimeout(self):
675 self.http.force_exception_to_status_code = True
676 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
677 try:
678 import socket
679 socket.setdefaulttimeout(1)
680 except:
681 # Don't run the test if we can't set the timeout
682 return
683 (response, content) = self.http.request(uri)
684 self.assertEqual(response.status, 408)
685 self.assertTrue(response.reason.startswith("Request Timeout"))
686 self.assertTrue(content.startswith(b"Request Timeout"))
687
688 def testIndividualTimeout(self):
689 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
690 http = httplib2.Http(timeout=1)
691 http.force_exception_to_status_code = True
692
693 (response, content) = http.request(uri)
694 self.assertEqual(response.status, 408)
695 self.assertTrue(response.reason.startswith("Request Timeout"))
696 self.assertTrue(content.startswith(b"Request Timeout"))
697
698
699 def testGetDeflate(self):
700 # Test that we support deflate compression
701 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
702 (response, content) = self.http.request(uri, "GET")
703 self.assertEqual(response.status, 200)
704 self.assertFalse('content-encoding' in response)
705 self.assertEqual(int(response['content-length']), len("This is the final destination."))
706 self.assertEqual(content, b"This is the final destination.")
707
708 def testGetDeflateFailure(self):
709 # Test that we raise a good exception when the deflate fails
710 self.http.force_exception_to_status_code = False
711
712 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
713 try:
714 (response, content) = self.http.request(uri, "GET")
715 self.fail("Should never reach here")
716 except httplib2.FailedToDecompressContent:
717 pass
718 except Exception:
719 self.fail("Threw wrong kind of exception")
720
721 # Re-run the test with out the exceptions
722 self.http.force_exception_to_status_code = True
723
724 (response, content) = self.http.request(uri, "GET")
725 self.assertEqual(response.status, 500)
726 self.assertTrue(response.reason.startswith("Content purported"))
727
728 def testGetDuplicateHeaders(self):
729 # Test that duplicate headers get concatenated via ','
730 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
731 (response, content) = self.http.request(uri, "GET")
732 self.assertEqual(response.status, 200)
733 self.assertEqual(content, b"This is content\n")
734 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
735
736 def testGetCacheControlNoCache(self):
737 # Test Cache-Control: no-cache on requests
738 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
739 (response, content) = self.http.request(uri, "GET")
740 self.assertNotEqual(response['etag'], "")
741 (response, content) = self.http.request(uri, "GET")
742 self.assertEqual(response.status, 200)
743 self.assertEqual(response.fromcache, True)
744
745 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
746 self.assertEqual(response.status, 200)
747 self.assertEqual(response.fromcache, False)
748
749 def testGetCacheControlPragmaNoCache(self):
750 # Test Pragma: no-cache on requests
751 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
752 (response, content) = self.http.request(uri, "GET")
753 self.assertNotEqual(response['etag'], "")
754 (response, content) = self.http.request(uri, "GET")
755 self.assertEqual(response.status, 200)
756 self.assertEqual(response.fromcache, True)
757
758 (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
759 self.assertEqual(response.status, 200)
760 self.assertEqual(response.fromcache, False)
761
762 def testGetCacheControlNoStoreRequest(self):
763 # A no-store request means that the response should not be stored.
764 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
765
766 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
767 self.assertEqual(response.status, 200)
768 self.assertEqual(response.fromcache, False)
769
770 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
771 self.assertEqual(response.status, 200)
772 self.assertEqual(response.fromcache, False)
773
774 def testGetCacheControlNoStoreResponse(self):
775 # A no-store response means that the response should not be stored.
776 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
777
778 (response, content) = self.http.request(uri, "GET")
779 self.assertEqual(response.status, 200)
780 self.assertEqual(response.fromcache, False)
781
782 (response, content) = self.http.request(uri, "GET")
783 self.assertEqual(response.status, 200)
784 self.assertEqual(response.fromcache, False)
785
786 def testGetCacheControlNoCacheNoStoreRequest(self):
787 # Test that a no-store, no-cache clears the entry from the cache
788 # even if it was cached previously.
789 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
790
791 (response, content) = self.http.request(uri, "GET")
792 (response, content) = self.http.request(uri, "GET")
793 self.assertEqual(response.fromcache, True)
794 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
795 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
796 self.assertEqual(response.status, 200)
797 self.assertEqual(response.fromcache, False)
798
799 def testUpdateInvalidatesCache(self):
800 # Test that calling PUT or DELETE on a
801 # URI that is cache invalidates that cache.
802 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
803
804 (response, content) = self.http.request(uri, "GET")
805 (response, content) = self.http.request(uri, "GET")
806 self.assertEqual(response.fromcache, True)
807 (response, content) = self.http.request(uri, "DELETE")
808 self.assertEqual(response.status, 405)
809
810 (response, content) = self.http.request(uri, "GET")
811 self.assertEqual(response.fromcache, False)
812
813 def testUpdateUsesCachedETag(self):
814 # Test that we natively support http://www.w3.org/1999/04/Editing/
815 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
816
817 (response, content) = self.http.request(uri, "GET")
818 self.assertEqual(response.status, 200)
819 self.assertEqual(response.fromcache, False)
820 (response, content) = self.http.request(uri, "GET")
821 self.assertEqual(response.status, 200)
822 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400823 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000824 self.assertEqual(response.status, 200)
Joe Gregorio799b2072009-09-29 17:21:19 -0400825 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000826 self.assertEqual(response.status, 412)
827
828 def testUpdateUsesCachedETagAndOCMethod(self):
829 # Test that we natively support http://www.w3.org/1999/04/Editing/
830 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
831
832 (response, content) = self.http.request(uri, "GET")
833 self.assertEqual(response.status, 200)
834 self.assertEqual(response.fromcache, False)
835 (response, content) = self.http.request(uri, "GET")
836 self.assertEqual(response.status, 200)
837 self.assertEqual(response.fromcache, True)
838 self.http.optimistic_concurrency_methods.append("DELETE")
839 (response, content) = self.http.request(uri, "DELETE")
840 self.assertEqual(response.status, 200)
841
842
843 def testUpdateUsesCachedETagOverridden(self):
844 # Test that we natively support http://www.w3.org/1999/04/Editing/
845 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
846
847 (response, content) = self.http.request(uri, "GET")
848 self.assertEqual(response.status, 200)
849 self.assertEqual(response.fromcache, False)
850 (response, content) = self.http.request(uri, "GET")
851 self.assertEqual(response.status, 200)
852 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400853 (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
pilgrim00a352e2009-05-29 04:04:44 +0000854 self.assertEqual(response.status, 412)
855
856 def testBasicAuth(self):
857 # Test Basic Authentication
858 uri = urllib.parse.urljoin(base, "basic/file.txt")
859 (response, content) = self.http.request(uri, "GET")
860 self.assertEqual(response.status, 401)
861
862 uri = urllib.parse.urljoin(base, "basic/")
863 (response, content) = self.http.request(uri, "GET")
864 self.assertEqual(response.status, 401)
865
866 self.http.add_credentials('joe', 'password')
867 (response, content) = self.http.request(uri, "GET")
868 self.assertEqual(response.status, 200)
869
870 uri = urllib.parse.urljoin(base, "basic/file.txt")
871 (response, content) = self.http.request(uri, "GET")
872 self.assertEqual(response.status, 200)
873
874 def testBasicAuthWithDomain(self):
875 # Test Basic Authentication
876 uri = urllib.parse.urljoin(base, "basic/file.txt")
877 (response, content) = self.http.request(uri, "GET")
878 self.assertEqual(response.status, 401)
879
880 uri = urllib.parse.urljoin(base, "basic/")
881 (response, content) = self.http.request(uri, "GET")
882 self.assertEqual(response.status, 401)
883
884 self.http.add_credentials('joe', 'password', "example.org")
885 (response, content) = self.http.request(uri, "GET")
886 self.assertEqual(response.status, 401)
887
888 uri = urllib.parse.urljoin(base, "basic/file.txt")
889 (response, content) = self.http.request(uri, "GET")
890 self.assertEqual(response.status, 401)
891
892 domain = urllib.parse.urlparse(base)[1]
893 self.http.add_credentials('joe', 'password', domain)
894 (response, content) = self.http.request(uri, "GET")
895 self.assertEqual(response.status, 200)
896
897 uri = urllib.parse.urljoin(base, "basic/file.txt")
898 (response, content) = self.http.request(uri, "GET")
899 self.assertEqual(response.status, 200)
900
901
902
903
904
905
906 def testBasicAuthTwoDifferentCredentials(self):
907 # Test Basic Authentication with multiple sets of credentials
908 uri = urllib.parse.urljoin(base, "basic2/file.txt")
909 (response, content) = self.http.request(uri, "GET")
910 self.assertEqual(response.status, 401)
911
912 uri = urllib.parse.urljoin(base, "basic2/")
913 (response, content) = self.http.request(uri, "GET")
914 self.assertEqual(response.status, 401)
915
916 self.http.add_credentials('fred', 'barney')
917 (response, content) = self.http.request(uri, "GET")
918 self.assertEqual(response.status, 200)
919
920 uri = urllib.parse.urljoin(base, "basic2/file.txt")
921 (response, content) = self.http.request(uri, "GET")
922 self.assertEqual(response.status, 200)
923
924 def testBasicAuthNested(self):
925 # Test Basic Authentication with resources
926 # that are nested
927 uri = urllib.parse.urljoin(base, "basic-nested/")
928 (response, content) = self.http.request(uri, "GET")
929 self.assertEqual(response.status, 401)
930
931 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
932 (response, content) = self.http.request(uri, "GET")
933 self.assertEqual(response.status, 401)
934
935 # Now add in credentials one at a time and test.
936 self.http.add_credentials('joe', 'password')
937
938 uri = urllib.parse.urljoin(base, "basic-nested/")
939 (response, content) = self.http.request(uri, "GET")
940 self.assertEqual(response.status, 200)
941
942 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
943 (response, content) = self.http.request(uri, "GET")
944 self.assertEqual(response.status, 401)
945
946 self.http.add_credentials('fred', 'barney')
947
948 uri = urllib.parse.urljoin(base, "basic-nested/")
949 (response, content) = self.http.request(uri, "GET")
950 self.assertEqual(response.status, 200)
951
952 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
953 (response, content) = self.http.request(uri, "GET")
954 self.assertEqual(response.status, 200)
955
956 def testDigestAuth(self):
957 # Test that we support Digest Authentication
958 uri = urllib.parse.urljoin(base, "digest/")
959 (response, content) = self.http.request(uri, "GET")
960 self.assertEqual(response.status, 401)
961
962 self.http.add_credentials('joe', 'password')
963 (response, content) = self.http.request(uri, "GET")
964 self.assertEqual(response.status, 200)
965
966 uri = urllib.parse.urljoin(base, "digest/file.txt")
967 (response, content) = self.http.request(uri, "GET")
968
969 def testDigestAuthNextNonceAndNC(self):
970 # Test that if the server sets nextnonce that we reset
971 # the nonce count back to 1
972 uri = urllib.parse.urljoin(base, "digest/file.txt")
973 self.http.add_credentials('joe', 'password')
974 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
975 info = httplib2._parse_www_authenticate(response, 'authentication-info')
976 self.assertEqual(response.status, 200)
977 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
978 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
979 self.assertEqual(response.status, 200)
980
981 if 'nextnonce' in info:
982 self.assertEqual(info2['nc'], 1)
983
984 def testDigestAuthStale(self):
985 # Test that we can handle a nonce becoming stale
986 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
987 self.http.add_credentials('joe', 'password')
988 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
989 info = httplib2._parse_www_authenticate(response, 'authentication-info')
990 self.assertEqual(response.status, 200)
991
992 time.sleep(3)
993 # Sleep long enough that the nonce becomes stale
994
995 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
996 self.assertFalse(response.fromcache)
997 self.assertTrue(response._stale_digest)
998 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
999 self.assertEqual(response.status, 200)
1000
1001 def reflector(self, content):
1002 return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
1003
1004 def testReflector(self):
1005 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1006 (response, content) = self.http.request(uri, "GET")
1007 d = self.reflector(content)
1008 self.assertTrue('HTTP_USER_AGENT' in d)
1009
Joe Gregorio84cc10a2009-09-01 13:02:49 -04001010
1011 def testConnectionClose(self):
1012 uri = "http://www.google.com/"
1013 (response, content) = self.http.request(uri, "GET")
1014 for c in self.http.connections.values():
1015 self.assertNotEqual(None, c.sock)
1016 (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
1017 for c in self.http.connections.values():
1018 self.assertEqual(None, c.sock)
1019
pilgrim00a352e2009-05-29 04:04:44 +00001020try:
1021 import memcache
1022 class HttpTestMemCached(HttpTest):
1023 def setUp(self):
1024 self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
1025 #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1026 self.http = httplib2.Http(self.cache)
1027 self.cache.flush_all()
1028 # Not exactly sure why the sleep is needed here, but
1029 # if not present then some unit tests that rely on caching
1030 # fail. Memcached seems to lose some sets immediately
1031 # after a flush_all if the set is to a value that
1032 # was previously cached. (Maybe the flush is handled async?)
1033 time.sleep(1)
1034 self.http.clear_credentials()
1035except:
1036 pass
1037
1038
1039
1040# ------------------------------------------------------------------------
1041
1042class HttpPrivateTest(unittest.TestCase):
1043
1044 def testParseCacheControl(self):
1045 # Test that we can parse the Cache-Control header
1046 self.assertEqual({}, httplib2._parse_cache_control({}))
1047 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
1048 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
1049 self.assertEqual(cc['no-cache'], 1)
1050 self.assertEqual(cc['max-age'], '7200')
1051 cc = httplib2._parse_cache_control({'cache-control': ' , '})
1052 self.assertEqual(cc[''], 1)
1053
Joe Gregorioe314e8b2009-07-16 20:11:28 -04001054 try:
1055 cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
1056 self.assertTrue("max-age" in cc)
1057 except:
1058 self.fail("Should not throw exception")
1059
1060
1061
1062
pilgrim00a352e2009-05-29 04:04:44 +00001063 def testNormalizeHeaders(self):
1064 # Test that we normalize headers to lowercase
1065 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
1066 self.assertTrue('cache-control' in h)
1067 self.assertTrue('other' in h)
1068 self.assertEqual('Stuff', h['other'])
1069
1070 def testExpirationModelTransparent(self):
1071 # Test that no-cache makes our request TRANSPARENT
1072 response_headers = {
1073 'cache-control': 'max-age=7200'
1074 }
1075 request_headers = {
1076 'cache-control': 'no-cache'
1077 }
1078 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
1079
1080 def testMaxAgeNonNumeric(self):
1081 # Test that no-cache makes our request TRANSPARENT
1082 response_headers = {
1083 'cache-control': 'max-age=fred, min-fresh=barney'
1084 }
1085 request_headers = {
1086 }
1087 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1088
1089
1090 def testExpirationModelNoCacheResponse(self):
1091 # The date and expires point to an entry that should be
1092 # FRESH, but the no-cache over-rides that.
1093 now = time.time()
1094 response_headers = {
1095 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1096 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1097 'cache-control': 'no-cache'
1098 }
1099 request_headers = {
1100 }
1101 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1102
1103 def testExpirationModelStaleRequestMustReval(self):
1104 # must-revalidate forces STALE
1105 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
1106
1107 def testExpirationModelStaleResponseMustReval(self):
1108 # must-revalidate forces STALE
1109 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
1110
1111 def testExpirationModelFresh(self):
1112 response_headers = {
1113 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1114 'cache-control': 'max-age=2'
1115 }
1116 request_headers = {
1117 }
1118 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1119 time.sleep(3)
1120 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1121
1122 def testExpirationMaxAge0(self):
1123 response_headers = {
1124 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1125 'cache-control': 'max-age=0'
1126 }
1127 request_headers = {
1128 }
1129 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1130
1131 def testExpirationModelDateAndExpires(self):
1132 now = time.time()
1133 response_headers = {
1134 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1135 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1136 }
1137 request_headers = {
1138 }
1139 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1140 time.sleep(3)
1141 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1142
1143 def testExpiresZero(self):
1144 now = time.time()
1145 response_headers = {
1146 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1147 'expires': "0",
1148 }
1149 request_headers = {
1150 }
1151 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1152
1153 def testExpirationModelDateOnly(self):
1154 now = time.time()
1155 response_headers = {
1156 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
1157 }
1158 request_headers = {
1159 }
1160 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1161
1162 def testExpirationModelOnlyIfCached(self):
1163 response_headers = {
1164 }
1165 request_headers = {
1166 'cache-control': 'only-if-cached',
1167 }
1168 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1169
1170 def testExpirationModelMaxAgeBoth(self):
1171 now = time.time()
1172 response_headers = {
1173 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1174 'cache-control': 'max-age=2'
1175 }
1176 request_headers = {
1177 'cache-control': 'max-age=0'
1178 }
1179 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1180
1181 def testExpirationModelDateAndExpiresMinFresh1(self):
1182 now = time.time()
1183 response_headers = {
1184 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1185 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1186 }
1187 request_headers = {
1188 'cache-control': 'min-fresh=2'
1189 }
1190 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1191
1192 def testExpirationModelDateAndExpiresMinFresh2(self):
1193 now = time.time()
1194 response_headers = {
1195 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1196 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1197 }
1198 request_headers = {
1199 'cache-control': 'min-fresh=2'
1200 }
1201 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1202
1203 def testParseWWWAuthenticateEmpty(self):
1204 res = httplib2._parse_www_authenticate({})
1205 self.assertEqual(len(list(res.keys())), 0)
1206
1207 def testParseWWWAuthenticate(self):
1208 # different uses of spaces around commas
1209 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
1210 self.assertEqual(len(list(res.keys())), 1)
1211 self.assertEqual(len(list(res['test'].keys())), 5)
1212
1213 # tokens with non-alphanum
1214 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
1215 self.assertEqual(len(list(res.keys())), 1)
1216 self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
1217
1218 # quoted string with quoted pairs
1219 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
1220 self.assertEqual(len(list(res.keys())), 1)
1221 self.assertEqual(res['test']['realm'], 'a "test" realm')
1222
1223 def testParseWWWAuthenticateStrict(self):
1224 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
1225 self.testParseWWWAuthenticate();
1226 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
1227
1228 def testParseWWWAuthenticateBasic(self):
1229 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
1230 basic = res['basic']
1231 self.assertEqual('me', basic['realm'])
1232
1233 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
1234 basic = res['basic']
1235 self.assertEqual('me', basic['realm'])
1236 self.assertEqual('MD5', basic['algorithm'])
1237
1238 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
1239 basic = res['basic']
1240 self.assertEqual('me', basic['realm'])
1241 self.assertEqual('MD5', basic['algorithm'])
1242
1243 def testParseWWWAuthenticateBasic2(self):
1244 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
1245 basic = res['basic']
1246 self.assertEqual('me', basic['realm'])
1247 self.assertEqual('fred', basic['other'])
1248
1249 def testParseWWWAuthenticateBasic3(self):
1250 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
1251 basic = res['basic']
1252 self.assertEqual('me', basic['realm'])
1253
1254
1255 def testParseWWWAuthenticateDigest(self):
1256 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1257 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
1258 digest = res['digest']
1259 self.assertEqual('testrealm@host.com', digest['realm'])
1260 self.assertEqual('auth,auth-int', digest['qop'])
1261
1262
1263 def testParseWWWAuthenticateMultiple(self):
1264 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1265 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
1266 digest = res['digest']
1267 self.assertEqual('testrealm@host.com', digest['realm'])
1268 self.assertEqual('auth,auth-int', digest['qop'])
1269 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1270 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1271 basic = res['basic']
1272 self.assertEqual('me', basic['realm'])
1273
1274 def testParseWWWAuthenticateMultiple2(self):
1275 # Handle an added comma between challenges, which might get thrown in if the challenges were
1276 # originally sent in separate www-authenticate headers.
1277 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1278 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
1279 digest = res['digest']
1280 self.assertEqual('testrealm@host.com', digest['realm'])
1281 self.assertEqual('auth,auth-int', digest['qop'])
1282 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1283 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1284 basic = res['basic']
1285 self.assertEqual('me', basic['realm'])
1286
1287 def testParseWWWAuthenticateMultiple3(self):
1288 # Handle an added comma between challenges, which might get thrown in if the challenges were
1289 # originally sent in separate www-authenticate headers.
1290 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1291 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1292 digest = res['digest']
1293 self.assertEqual('testrealm@host.com', digest['realm'])
1294 self.assertEqual('auth,auth-int', digest['qop'])
1295 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1296 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1297 basic = res['basic']
1298 self.assertEqual('me', basic['realm'])
1299 wsse = res['wsse']
1300 self.assertEqual('foo', wsse['realm'])
1301 self.assertEqual('UsernameToken', wsse['profile'])
1302
1303 def testParseWWWAuthenticateMultiple4(self):
1304 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1305 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1306 digest = res['digest']
1307 self.assertEqual('test-real.m@host.com', digest['realm'])
1308 self.assertEqual('\tauth,auth-int', digest['qop'])
1309 self.assertEqual('(*)&^&$%#', digest['nonce'])
1310
1311 def testParseWWWAuthenticateMoreQuoteCombos(self):
1312 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
1313 digest = res['digest']
1314 self.assertEqual('myrealm', digest['realm'])
1315
1316 def testDigestObject(self):
1317 credentials = ('joe', 'password')
1318 host = None
1319 request_uri = '/projects/httplib2/test/digest/'
1320 headers = {}
1321 response = {
1322 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
1323 }
1324 content = b""
1325
1326 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1327 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1328 our_request = "Authorization: %s" % headers['Authorization']
1329 working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
1330 self.assertEqual(our_request, working_request)
1331
1332
1333 def testDigestObjectStale(self):
1334 credentials = ('joe', 'password')
1335 host = None
1336 request_uri = '/projects/httplib2/test/digest/'
1337 headers = {}
1338 response = httplib2.Response({ })
1339 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1340 response.status = 401
1341 content = b""
1342 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1343 # Returns true to force a retry
1344 self.assertTrue( d.response(response, content) )
1345
1346 def testDigestObjectAuthInfo(self):
1347 credentials = ('joe', 'password')
1348 host = None
1349 request_uri = '/projects/httplib2/test/digest/'
1350 headers = {}
1351 response = httplib2.Response({ })
1352 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1353 response['authentication-info'] = 'nextnonce="fred"'
1354 content = b""
1355 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1356 # Returns true to force a retry
1357 self.assertFalse( d.response(response, content) )
1358 self.assertEqual('fred', d.challenge['nonce'])
1359 self.assertEqual(1, d.challenge['nc'])
1360
1361 def testWsseAlgorithm(self):
1362 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
1363 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1364 self.assertEqual(expected, digest)
1365
1366 def testEnd2End(self):
1367 # one end to end header
1368 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
1369 end2end = httplib2._get_end2end_headers(response)
1370 self.assertTrue('content-type' in end2end)
1371 self.assertTrue('te' not in end2end)
1372 self.assertTrue('connection' not in end2end)
1373
1374 # one end to end header that gets eliminated
1375 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
1376 end2end = httplib2._get_end2end_headers(response)
1377 self.assertTrue('content-type' not in end2end)
1378 self.assertTrue('te' not in end2end)
1379 self.assertTrue('connection' not in end2end)
1380
1381 # Degenerate case of no headers
1382 response = {}
1383 end2end = httplib2._get_end2end_headers(response)
1384 self.assertEquals(0, len(end2end))
1385
1386 # Degenerate case of connection referrring to a header not passed in
1387 response = {'connection': 'content-type'}
1388 end2end = httplib2._get_end2end_headers(response)
1389 self.assertEquals(0, len(end2end))
1390
1391unittest.main()