blob: 86569ce6ffb84a1ec8b0554d2892f071e10d9fbf [file] [log] [blame]
pilgrim00a352e2009-05-29 04:04:44 +00001#!/usr/bin/env python3
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 3.0 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = ["Mark Pilgrim"]
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.2 ($Rev: 118 $)"
16
Joe Gregoriod760a1a2011-02-11 01:03:22 -050017import base64
pilgrim00a352e2009-05-29 04:04:44 +000018import http.client
19import httplib2
pilgrim00a352e2009-05-29 04:04:44 +000020import io
Joe Gregoriod760a1a2011-02-11 01:03:22 -050021import os
22import socket
23import sys
24import time
25import unittest
26import urllib.parse
pilgrim00a352e2009-05-29 04:04:44 +000027
28# The test resources base uri
29base = 'http://bitworking.org/projects/httplib2/test/'
30#base = 'http://localhost/projects/httplib2/test/'
31cacheDirName = ".cache"
32
33
34class CredentialsTest(unittest.TestCase):
35 def test(self):
36 c = httplib2.Credentials()
37 c.add("joe", "password")
38 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
39 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
40 c.add("fred", "password2", "wellformedweb.org")
41 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
42 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
43 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
44 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
45 c.clear()
46 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
47 c.add("fred", "password2", "wellformedweb.org")
48 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
49 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
50 self.assertEqual(0, len(list(c.iter(""))))
51
52
53class ParserTest(unittest.TestCase):
54 def testFromStd66(self):
55 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
56 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
57 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
58 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
59 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
60 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
61 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
62 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
63
64
65class UrlNormTest(unittest.TestCase):
66 def test(self):
67 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
68 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
69 self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
70 self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
71 self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
72 self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
73 try:
74 httplib2.urlnorm("/")
75 self.fail("Non-absolute URIs should raise an exception")
76 except httplib2.RelativeURIError:
77 pass
78
79class UrlSafenameTest(unittest.TestCase):
80 def test(self):
81 # Test that different URIs end up generating different safe names
82 self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
83 self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
84 self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
85 self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
86 self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
87 self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
88 # Test the max length limits
89 uri = "http://" + ("w" * 200) + ".org"
90 uri2 = "http://" + ("w" * 201) + ".org"
91 self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
92 # Max length should be 200 + 1 (",") + 32
93 self.assertEqual(233, len(httplib2.safename(uri2)))
94 self.assertEqual(233, len(httplib2.safename(uri)))
95 # Unicode
96 if sys.version_info >= (2,3):
97 self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
98
99class _MyResponse(io.BytesIO):
100 def __init__(self, body, **kwargs):
101 io.BytesIO.__init__(self, body)
102 self.headers = kwargs
103
104 def items(self):
105 return self.headers.items()
106
107 def iteritems(self):
108 return iter(self.headers.items())
109
110
111class _MyHTTPConnection(object):
112 "This class is just a mock of httplib.HTTPConnection used for testing"
113
114 def __init__(self, host, port=None, key_file=None, cert_file=None,
115 strict=None, timeout=None, proxy_info=None):
116 self.host = host
117 self.port = port
118 self.timeout = timeout
119 self.log = ""
120
121 def set_debuglevel(self, level):
122 pass
123
124 def connect(self):
125 "Connect to a host on a given port."
126 pass
127
128 def close(self):
129 pass
130
131 def request(self, method, request_uri, body, headers):
132 pass
133
134 def getresponse(self):
135 return _MyResponse(b"the body", status="200")
136
137
138class HttpTest(unittest.TestCase):
139 def setUp(self):
140 if os.path.exists(cacheDirName):
141 [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
142 self.http = httplib2.Http(cacheDirName)
143 self.http.clear_credentials()
144
Joe Gregorio756d3b32011-02-13 11:59:51 -0500145 def testIPv6NoSSL(self):
146 try:
147 self.http.request("http://[::1]/")
148 except socket.gaierror:
149 self.fail("should get the address family right for IPv6")
150 except socket.error:
151 # Even if IPv6 isn't installed on a machine it should just raise socket.error
152 pass
153
154 def testIPv6SSL(self):
155 try:
156 self.http.request("https://[::1]/")
157 except socket.gaierror:
158 self.fail("should get the address family right for IPv6")
159 except socket.error:
160 # Even if IPv6 isn't installed on a machine it should just raise socket.error
161 pass
162
pilgrim00a352e2009-05-29 04:04:44 +0000163 def testConnectionType(self):
164 self.http.force_exception_to_status_code = False
165 response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
166 self.assertEqual(response['content-location'], "http://bitworking.org")
167 self.assertEqual(content, b"the body")
168
169 def testGetUnknownServer(self):
170 self.http.force_exception_to_status_code = False
171 try:
172 self.http.request("http://fred.bitworking.org/")
173 self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
174 except httplib2.ServerNotFoundError:
175 pass
176
177 # Now test with exceptions turned off
178 self.http.force_exception_to_status_code = True
179
180 (response, content) = self.http.request("http://fred.bitworking.org/")
181 self.assertEqual(response['content-type'], 'text/plain')
182 self.assertTrue(content.startswith(b"Unable to find"))
183 self.assertEqual(response.status, 400)
184
Joe Gregoriod760a1a2011-02-11 01:03:22 -0500185 def testGetConnectionRefused(self):
186 self.http.force_exception_to_status_code = False
187 try:
188 self.http.request("http://localhost:7777/")
189 self.fail("An socket.error exception must be thrown on Connection Refused.")
190 except socket.error:
191 pass
192
193 # Now test with exceptions turned off
194 self.http.force_exception_to_status_code = True
195
196 (response, content) = self.http.request("http://localhost:7777/")
197 self.assertEqual(response['content-type'], 'text/plain')
198 self.assertTrue(b"Connection refused" in content)
199 self.assertEqual(response.status, 400)
200
pilgrim00a352e2009-05-29 04:04:44 +0000201 def testGetIRI(self):
202 if sys.version_info >= (2,3):
203 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
204 (response, content) = self.http.request(uri, "GET")
205 d = self.reflector(content)
206 self.assertTrue('QUERY_STRING' in d)
207 self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
208
209 def testGetIsDefaultMethod(self):
210 # Test that GET is the default method
211 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
212 (response, content) = self.http.request(uri)
213 self.assertEqual(response['x-method'], "GET")
214
215 def testDifferentMethods(self):
216 # Test that all methods can be used
217 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
218 for method in ["GET", "PUT", "DELETE", "POST"]:
219 (response, content) = self.http.request(uri, method, body=b" ")
220 self.assertEqual(response['x-method'], method)
221
Joe Gregoriob628c0b2009-07-16 12:28:04 -0400222 def testHeadRead(self):
223 # Test that we don't try to read the response of a HEAD request
224 # since httplib blocks response.read() for HEAD requests.
225 # Oddly enough this doesn't appear as a problem when doing HEAD requests
226 # against Apache servers.
227 uri = "http://www.google.com/"
228 (response, content) = self.http.request(uri, "HEAD")
229 self.assertEqual(response.status, 200)
230 self.assertEqual(content, b"")
231
pilgrim00a352e2009-05-29 04:04:44 +0000232 def testGetNoCache(self):
233 # Test that can do a GET w/o the cache turned on.
234 http = httplib2.Http()
235 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
236 (response, content) = http.request(uri, "GET")
237 self.assertEqual(response.status, 200)
238 self.assertEqual(response.previous, None)
239
Joe Gregorioe202d212009-07-16 14:57:52 -0400240 def testGetOnlyIfCachedCacheHit(self):
241 # Test that can do a GET with cache and 'only-if-cached'
242 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
243 (response, content) = self.http.request(uri, "GET")
244 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
245 self.assertEqual(response.fromcache, True)
246 self.assertEqual(response.status, 200)
247
pilgrim00a352e2009-05-29 04:04:44 +0000248 def testGetOnlyIfCachedCacheMiss(self):
249 # Test that can do a GET with no cache with 'only-if-cached'
pilgrim00a352e2009-05-29 04:04:44 +0000250 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorioe202d212009-07-16 14:57:52 -0400251 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
pilgrim00a352e2009-05-29 04:04:44 +0000252 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400253 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000254
255 def testGetOnlyIfCachedNoCacheAtAll(self):
256 # Test that can do a GET with no cache with 'only-if-cached'
257 # Of course, there might be an intermediary beyond us
258 # that responds to the 'only-if-cached', so this
259 # test can't really be guaranteed to pass.
260 http = httplib2.Http()
261 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
262 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
263 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400264 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000265
266 def testUserAgent(self):
267 # Test that we provide a default user-agent
268 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
269 (response, content) = self.http.request(uri, "GET")
270 self.assertEqual(response.status, 200)
271 self.assertTrue(content.startswith(b"Python-httplib2/"))
272
273 def testUserAgentNonDefault(self):
274 # Test that the default user-agent can be over-ridden
275
276 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
277 (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
278 self.assertEqual(response.status, 200)
279 self.assertTrue(content.startswith(b"fred/1.0"))
280
281 def testGet300WithLocation(self):
282 # Test the we automatically follow 300 redirects if a Location: header is provided
283 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
284 (response, content) = self.http.request(uri, "GET")
285 self.assertEqual(response.status, 200)
286 self.assertEqual(content, b"This is the final destination.\n")
287 self.assertEqual(response.previous.status, 300)
288 self.assertEqual(response.previous.fromcache, False)
289
290 # Confirm that the intermediate 300 is not cached
291 (response, content) = self.http.request(uri, "GET")
292 self.assertEqual(response.status, 200)
293 self.assertEqual(content, b"This is the final destination.\n")
294 self.assertEqual(response.previous.status, 300)
295 self.assertEqual(response.previous.fromcache, False)
296
297 def testGet300WithLocationNoRedirect(self):
298 # Test the we automatically follow 300 redirects if a Location: header is provided
299 self.http.follow_redirects = False
300 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
301 (response, content) = self.http.request(uri, "GET")
302 self.assertEqual(response.status, 300)
303
304 def testGet300WithoutLocation(self):
305 # Not giving a Location: header in a 300 response is acceptable
306 # In which case we just return the 300 response
307 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
308 (response, content) = self.http.request(uri, "GET")
309 self.assertEqual(response.status, 300)
310 self.assertTrue(response['content-type'].startswith("text/html"))
311 self.assertEqual(response.previous, None)
312
313 def testGet301(self):
314 # Test that we automatically follow 301 redirects
315 # and that we cache the 301 response
316 uri = urllib.parse.urljoin(base, "301/onestep.asis")
317 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
318 (response, content) = self.http.request(uri, "GET")
319 self.assertEqual(response.status, 200)
320 self.assertTrue('content-location' in response)
321 self.assertEqual(response['content-location'], destination)
322 self.assertEqual(content, b"This is the final destination.\n")
323 self.assertEqual(response.previous.status, 301)
324 self.assertEqual(response.previous.fromcache, False)
325
326 (response, content) = self.http.request(uri, "GET")
327 self.assertEqual(response.status, 200)
328 self.assertEqual(response['content-location'], destination)
329 self.assertEqual(content, b"This is the final destination.\n")
330 self.assertEqual(response.previous.status, 301)
331 self.assertEqual(response.previous.fromcache, True)
332
333
334 def testGet301NoRedirect(self):
335 # Test that we automatically follow 301 redirects
336 # and that we cache the 301 response
337 self.http.follow_redirects = False
338 uri = urllib.parse.urljoin(base, "301/onestep.asis")
339 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
340 (response, content) = self.http.request(uri, "GET")
341 self.assertEqual(response.status, 301)
342
343
344 def testGet302(self):
345 # Test that we automatically follow 302 redirects
346 # and that we DO NOT cache the 302 response
347 uri = urllib.parse.urljoin(base, "302/onestep.asis")
348 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
349 (response, content) = self.http.request(uri, "GET")
350 self.assertEqual(response.status, 200)
351 self.assertEqual(response['content-location'], destination)
352 self.assertEqual(content, b"This is the final destination.\n")
353 self.assertEqual(response.previous.status, 302)
354 self.assertEqual(response.previous.fromcache, False)
355
356 uri = urllib.parse.urljoin(base, "302/onestep.asis")
357 (response, content) = self.http.request(uri, "GET")
358 self.assertEqual(response.status, 200)
359 self.assertEqual(response.fromcache, True)
360 self.assertEqual(response['content-location'], destination)
361 self.assertEqual(content, b"This is the final destination.\n")
362 self.assertEqual(response.previous.status, 302)
363 self.assertEqual(response.previous.fromcache, False)
364 self.assertEqual(response.previous['content-location'], uri)
365
366 uri = urllib.parse.urljoin(base, "302/twostep.asis")
367
368 (response, content) = self.http.request(uri, "GET")
369 self.assertEqual(response.status, 200)
370 self.assertEqual(response.fromcache, True)
371 self.assertEqual(content, b"This is the final destination.\n")
372 self.assertEqual(response.previous.status, 302)
373 self.assertEqual(response.previous.fromcache, False)
374
375 def testGet302RedirectionLimit(self):
376 # Test that we can set a lower redirection limit
377 # and that we raise an exception when we exceed
378 # that limit.
379 self.http.force_exception_to_status_code = False
380
381 uri = urllib.parse.urljoin(base, "302/twostep.asis")
382 try:
383 (response, content) = self.http.request(uri, "GET", redirections = 1)
384 self.fail("This should not happen")
385 except httplib2.RedirectLimit:
386 pass
387 except Exception as e:
388 self.fail("Threw wrong kind of exception ")
389
390 # Re-run the test with out the exceptions
391 self.http.force_exception_to_status_code = True
392
393 (response, content) = self.http.request(uri, "GET", redirections = 1)
394 self.assertEqual(response.status, 500)
395 self.assertTrue(response.reason.startswith("Redirected more"))
396 self.assertEqual("302", response['status'])
397 self.assertTrue(content.startswith(b"<html>"))
398 self.assertTrue(response.previous != None)
399
400 def testGet302NoLocation(self):
401 # Test that we throw an exception when we get
402 # a 302 with no Location: header.
403 self.http.force_exception_to_status_code = False
404 uri = urllib.parse.urljoin(base, "302/no-location.asis")
405 try:
406 (response, content) = self.http.request(uri, "GET")
407 self.fail("Should never reach here")
408 except httplib2.RedirectMissingLocation:
409 pass
410 except Exception as e:
411 self.fail("Threw wrong kind of exception ")
412
413 # Re-run the test with out the exceptions
414 self.http.force_exception_to_status_code = True
415
416 (response, content) = self.http.request(uri, "GET")
417 self.assertEqual(response.status, 500)
418 self.assertTrue(response.reason.startswith("Redirected but"))
419 self.assertEqual("302", response['status'])
420 self.assertTrue(content.startswith(b"This is content"))
421
422 def testGet302ViaHttps(self):
423 # Google always redirects to http://google.com
424 (response, content) = self.http.request("https://google.com", "GET")
425 self.assertEqual(200, response.status)
426 self.assertEqual(302, response.previous.status)
427
428 def testGetViaHttps(self):
429 # Test that we can handle HTTPS
430 (response, content) = self.http.request("https://google.com/adsense/", "GET")
431 self.assertEqual(200, response.status)
432
433 def testGetViaHttpsSpecViolationOnLocation(self):
434 # Test that we follow redirects through HTTPS
435 # even if they violate the spec by including
436 # a relative Location: header instead of an
437 # absolute one.
438 (response, content) = self.http.request("https://google.com/adsense", "GET")
439 self.assertEqual(200, response.status)
440 self.assertNotEqual(None, response.previous)
441
442
443 def testGetViaHttpsKeyCert(self):
444 # At this point I can only test
445 # that the key and cert files are passed in
446 # correctly to httplib. It would be nice to have
447 # a real https endpoint to test against.
448 http = httplib2.Http(timeout=2)
449
450 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
451 try:
452 (response, content) = http.request("https://bitworking.org", "GET")
453 except:
454 pass
455 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
456 self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
457
458 try:
459 (response, content) = http.request("https://notthere.bitworking.org", "GET")
460 except:
461 pass
462 self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
463 self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
464
465
466
467
468 def testGet303(self):
469 # Do a follow-up GET on a Location: header
470 # returned from a POST that gave a 303.
471 uri = urllib.parse.urljoin(base, "303/303.cgi")
472 (response, content) = self.http.request(uri, "POST", " ")
473 self.assertEqual(response.status, 200)
474 self.assertEqual(content, b"This is the final destination.\n")
475 self.assertEqual(response.previous.status, 303)
476
477 def testGet303NoRedirect(self):
478 # Do a follow-up GET on a Location: header
479 # returned from a POST that gave a 303.
480 self.http.follow_redirects = False
481 uri = urllib.parse.urljoin(base, "303/303.cgi")
482 (response, content) = self.http.request(uri, "POST", " ")
483 self.assertEqual(response.status, 303)
484
485 def test303ForDifferentMethods(self):
486 # Test that all methods can be used
487 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
488 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
489 (response, content) = self.http.request(uri, method, body=b" ")
490 self.assertEqual(response['x-method'], method_on_303)
491
492 def testGet304(self):
493 # Test that we use ETags properly to validate our cache
494 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
495 (response, content) = self.http.request(uri, "GET")
496 self.assertNotEqual(response['etag'], "")
497
498 (response, content) = self.http.request(uri, "GET")
499 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
500 self.assertEqual(response.status, 200)
501 self.assertEqual(response.fromcache, True)
502
503 cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
504 f = open(cache_file_name, "r")
505 status_line = f.readline()
506 f.close()
507
508 self.assertTrue(status_line.startswith("status:"))
509
510 (response, content) = self.http.request(uri, "HEAD")
511 self.assertEqual(response.status, 200)
512 self.assertEqual(response.fromcache, True)
513
514 (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
515 self.assertEqual(response.status, 206)
516 self.assertEqual(response.fromcache, False)
517
518 def testGetIgnoreEtag(self):
519 # Test that we can forcibly ignore ETags
520 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
521 (response, content) = self.http.request(uri, "GET")
522 self.assertNotEqual(response['etag'], "")
523
524 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
525 d = self.reflector(content)
526 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
527
528 self.http.ignore_etag = True
529 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
530 d = self.reflector(content)
531 self.assertEqual(response.fromcache, False)
532 self.assertFalse('HTTP_IF_NONE_MATCH' in d)
533
534 def testOverrideEtag(self):
535 # Test that we can forcibly ignore ETags
536 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
537 (response, content) = self.http.request(uri, "GET")
538 self.assertNotEqual(response['etag'], "")
539
540 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
541 d = self.reflector(content)
542 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
543 self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
544
545 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
546 d = self.reflector(content)
547 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
548 self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
549
550#MAP-commented this out because it consistently fails
551# def testGet304EndToEnd(self):
552# # Test that end to end headers get overwritten in the cache
553# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
554# (response, content) = self.http.request(uri, "GET")
555# self.assertNotEqual(response['etag'], "")
556# old_date = response['date']
557# time.sleep(2)
558#
559# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
560# # The response should be from the cache, but the Date: header should be updated.
561# new_date = response['date']
562# self.assertNotEqual(new_date, old_date)
563# self.assertEqual(response.status, 200)
564# self.assertEqual(response.fromcache, True)
565
566 def testGet304LastModified(self):
567 # Test that we can still handle a 304
568 # by only using the last-modified cache validator.
569 uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
570 (response, content) = self.http.request(uri, "GET")
571
572 self.assertNotEqual(response['last-modified'], "")
573 (response, content) = self.http.request(uri, "GET")
574 (response, content) = self.http.request(uri, "GET")
575 self.assertEqual(response.status, 200)
576 self.assertEqual(response.fromcache, True)
577
578 def testGet307(self):
579 # Test that we do follow 307 redirects but
580 # do not cache the 307
581 uri = urllib.parse.urljoin(base, "307/onestep.asis")
582 (response, content) = self.http.request(uri, "GET")
583 self.assertEqual(response.status, 200)
584 self.assertEqual(content, b"This is the final destination.\n")
585 self.assertEqual(response.previous.status, 307)
586 self.assertEqual(response.previous.fromcache, False)
587
588 (response, content) = self.http.request(uri, "GET")
589 self.assertEqual(response.status, 200)
590 self.assertEqual(response.fromcache, True)
591 self.assertEqual(content, b"This is the final destination.\n")
592 self.assertEqual(response.previous.status, 307)
593 self.assertEqual(response.previous.fromcache, False)
594
595 def testGet410(self):
596 # Test that we pass 410's through
597 uri = urllib.parse.urljoin(base, "410/410.asis")
598 (response, content) = self.http.request(uri, "GET")
599 self.assertEqual(response.status, 410)
600
chris.dent@gmail.comae846ca2009-12-24 14:02:57 -0600601 def testVaryHeaderSimple(self):
602 """
603 RFC 2616 13.6
604 When the cache receives a subsequent request whose Request-URI
605 specifies one or more cache entries including a Vary header field,
606 the cache MUST NOT use such a cache entry to construct a response
607 to the new request unless all of the selecting request-headers
608 present in the new request match the corresponding stored
609 request-headers in the original request.
610 """
611 # test that the vary header is sent
612 uri = urllib.parse.urljoin(base, "vary/accept.asis")
613 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
614 self.assertEqual(response.status, 200)
615 self.assertTrue('vary' in response)
616
617 # get the resource again, from the cache since accept header in this
618 # request is the same as the request
619 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
620 self.assertEqual(response.status, 200)
621 self.assertEqual(response.fromcache, True, msg="Should be from cache")
622
623 # get the resource again, not from cache since Accept headers does not match
624 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
625 self.assertEqual(response.status, 200)
626 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
627
628 # get the resource again, without any Accept header, so again no match
629 (response, content) = self.http.request(uri, "GET")
630 self.assertEqual(response.status, 200)
631 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
632
633 def testNoVary(self):
634 # when there is no vary, a different Accept header (e.g.) should not
635 # impact if the cache is used
636 # test that the vary header is not sent
637 uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
638 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
639 self.assertEqual(response.status, 200)
640 self.assertFalse('vary' in response)
641
642 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
643 self.assertEqual(response.status, 200)
644 self.assertEqual(response.fromcache, True, msg="Should be from cache")
645
646 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
647 self.assertEqual(response.status, 200)
648 self.assertEqual(response.fromcache, True, msg="Should be from cache")
649
650 def testVaryHeaderDouble(self):
651 uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
652 (response, content) = self.http.request(uri, "GET", headers={
653 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
654 self.assertEqual(response.status, 200)
655 self.assertTrue('vary' in response)
656
657 # we are from cache
658 (response, content) = self.http.request(uri, "GET", headers={
659 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
660 self.assertEqual(response.fromcache, True, msg="Should be from cache")
661
662 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
663 self.assertEqual(response.status, 200)
664 self.assertEqual(response.fromcache, False)
665
666 # get the resource again, not from cache, varied headers don't match exact
667 (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
668 self.assertEqual(response.status, 200)
669 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
670
jcgregorio@localhost9e603da2010-05-13 23:42:11 -0400671 def testVaryUnusedHeader(self):
672 # A header's value is not considered to vary if it's not used at all.
673 uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
674 (response, content) = self.http.request(uri, "GET", headers={
675 'Accept': 'text/plain'})
676 self.assertEqual(response.status, 200)
677 self.assertTrue('vary' in response)
678
679 # we are from cache
680 (response, content) = self.http.request(uri, "GET", headers={
681 'Accept': 'text/plain',})
682 self.assertEqual(response.fromcache, True, msg="Should be from cache")
683
pilgrim00a352e2009-05-29 04:04:44 +0000684 def testHeadGZip(self):
685 # Test that we don't try to decompress a HEAD response
686 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
687 (response, content) = self.http.request(uri, "HEAD")
688 self.assertEqual(response.status, 200)
689 self.assertNotEqual(int(response['content-length']), 0)
690 self.assertEqual(content, b"")
691
692 def testGetGZip(self):
693 # Test that we support gzip compression
694 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
695 (response, content) = self.http.request(uri, "GET")
696 self.assertEqual(response.status, 200)
697 self.assertFalse('content-encoding' in response)
698 self.assertTrue('-content-encoding' in response)
699 self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
700 self.assertEqual(content, b"This is the final destination.\n")
701
702 def testGetGZipFailure(self):
703 # Test that we raise a good exception when the gzip fails
704 self.http.force_exception_to_status_code = False
705 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
706 try:
707 (response, content) = self.http.request(uri, "GET")
708 self.fail("Should never reach here")
709 except httplib2.FailedToDecompressContent:
710 pass
711 except Exception:
712 self.fail("Threw wrong kind of exception")
713
714 # Re-run the test with out the exceptions
715 self.http.force_exception_to_status_code = True
716
717 (response, content) = self.http.request(uri, "GET")
718 self.assertEqual(response.status, 500)
719 self.assertTrue(response.reason.startswith("Content purported"))
720
721 def testTimeout(self):
722 self.http.force_exception_to_status_code = True
723 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
724 try:
725 import socket
726 socket.setdefaulttimeout(1)
727 except:
728 # Don't run the test if we can't set the timeout
729 return
730 (response, content) = self.http.request(uri)
731 self.assertEqual(response.status, 408)
732 self.assertTrue(response.reason.startswith("Request Timeout"))
733 self.assertTrue(content.startswith(b"Request Timeout"))
734
735 def testIndividualTimeout(self):
736 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
737 http = httplib2.Http(timeout=1)
738 http.force_exception_to_status_code = True
739
740 (response, content) = http.request(uri)
741 self.assertEqual(response.status, 408)
742 self.assertTrue(response.reason.startswith("Request Timeout"))
743 self.assertTrue(content.startswith(b"Request Timeout"))
744
745
746 def testGetDeflate(self):
747 # Test that we support deflate compression
748 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
749 (response, content) = self.http.request(uri, "GET")
750 self.assertEqual(response.status, 200)
751 self.assertFalse('content-encoding' in response)
752 self.assertEqual(int(response['content-length']), len("This is the final destination."))
753 self.assertEqual(content, b"This is the final destination.")
754
755 def testGetDeflateFailure(self):
756 # Test that we raise a good exception when the deflate fails
757 self.http.force_exception_to_status_code = False
758
759 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
760 try:
761 (response, content) = self.http.request(uri, "GET")
762 self.fail("Should never reach here")
763 except httplib2.FailedToDecompressContent:
764 pass
765 except Exception:
766 self.fail("Threw wrong kind of exception")
767
768 # Re-run the test with out the exceptions
769 self.http.force_exception_to_status_code = True
770
771 (response, content) = self.http.request(uri, "GET")
772 self.assertEqual(response.status, 500)
773 self.assertTrue(response.reason.startswith("Content purported"))
774
775 def testGetDuplicateHeaders(self):
776 # Test that duplicate headers get concatenated via ','
777 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
778 (response, content) = self.http.request(uri, "GET")
779 self.assertEqual(response.status, 200)
780 self.assertEqual(content, b"This is content\n")
781 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
782
783 def testGetCacheControlNoCache(self):
784 # Test Cache-Control: no-cache on requests
785 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
786 (response, content) = self.http.request(uri, "GET")
787 self.assertNotEqual(response['etag'], "")
788 (response, content) = self.http.request(uri, "GET")
789 self.assertEqual(response.status, 200)
790 self.assertEqual(response.fromcache, True)
791
792 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
793 self.assertEqual(response.status, 200)
794 self.assertEqual(response.fromcache, False)
795
796 def testGetCacheControlPragmaNoCache(self):
797 # Test Pragma: no-cache on requests
798 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
799 (response, content) = self.http.request(uri, "GET")
800 self.assertNotEqual(response['etag'], "")
801 (response, content) = self.http.request(uri, "GET")
802 self.assertEqual(response.status, 200)
803 self.assertEqual(response.fromcache, True)
804
805 (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
806 self.assertEqual(response.status, 200)
807 self.assertEqual(response.fromcache, False)
808
809 def testGetCacheControlNoStoreRequest(self):
810 # A no-store request means that the response should not be stored.
811 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
812
813 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
814 self.assertEqual(response.status, 200)
815 self.assertEqual(response.fromcache, False)
816
817 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
818 self.assertEqual(response.status, 200)
819 self.assertEqual(response.fromcache, False)
820
821 def testGetCacheControlNoStoreResponse(self):
822 # A no-store response means that the response should not be stored.
823 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
824
825 (response, content) = self.http.request(uri, "GET")
826 self.assertEqual(response.status, 200)
827 self.assertEqual(response.fromcache, False)
828
829 (response, content) = self.http.request(uri, "GET")
830 self.assertEqual(response.status, 200)
831 self.assertEqual(response.fromcache, False)
832
833 def testGetCacheControlNoCacheNoStoreRequest(self):
834 # Test that a no-store, no-cache clears the entry from the cache
835 # even if it was cached previously.
836 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
837
838 (response, content) = self.http.request(uri, "GET")
839 (response, content) = self.http.request(uri, "GET")
840 self.assertEqual(response.fromcache, True)
841 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
842 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
843 self.assertEqual(response.status, 200)
844 self.assertEqual(response.fromcache, False)
845
846 def testUpdateInvalidatesCache(self):
847 # Test that calling PUT or DELETE on a
848 # URI that is cache invalidates that cache.
849 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
850
851 (response, content) = self.http.request(uri, "GET")
852 (response, content) = self.http.request(uri, "GET")
853 self.assertEqual(response.fromcache, True)
854 (response, content) = self.http.request(uri, "DELETE")
855 self.assertEqual(response.status, 405)
856
857 (response, content) = self.http.request(uri, "GET")
858 self.assertEqual(response.fromcache, False)
859
860 def testUpdateUsesCachedETag(self):
861 # Test that we natively support http://www.w3.org/1999/04/Editing/
862 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
863
864 (response, content) = self.http.request(uri, "GET")
865 self.assertEqual(response.status, 200)
866 self.assertEqual(response.fromcache, False)
867 (response, content) = self.http.request(uri, "GET")
868 self.assertEqual(response.status, 200)
869 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400870 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000871 self.assertEqual(response.status, 200)
Joe Gregorio799b2072009-09-29 17:21:19 -0400872 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000873 self.assertEqual(response.status, 412)
874
875 def testUpdateUsesCachedETagAndOCMethod(self):
876 # Test that we natively support http://www.w3.org/1999/04/Editing/
877 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
878
879 (response, content) = self.http.request(uri, "GET")
880 self.assertEqual(response.status, 200)
881 self.assertEqual(response.fromcache, False)
882 (response, content) = self.http.request(uri, "GET")
883 self.assertEqual(response.status, 200)
884 self.assertEqual(response.fromcache, True)
885 self.http.optimistic_concurrency_methods.append("DELETE")
886 (response, content) = self.http.request(uri, "DELETE")
887 self.assertEqual(response.status, 200)
888
889
890 def testUpdateUsesCachedETagOverridden(self):
891 # Test that we natively support http://www.w3.org/1999/04/Editing/
892 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
893
894 (response, content) = self.http.request(uri, "GET")
895 self.assertEqual(response.status, 200)
896 self.assertEqual(response.fromcache, False)
897 (response, content) = self.http.request(uri, "GET")
898 self.assertEqual(response.status, 200)
899 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400900 (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
pilgrim00a352e2009-05-29 04:04:44 +0000901 self.assertEqual(response.status, 412)
902
903 def testBasicAuth(self):
904 # Test Basic Authentication
905 uri = urllib.parse.urljoin(base, "basic/file.txt")
906 (response, content) = self.http.request(uri, "GET")
907 self.assertEqual(response.status, 401)
908
909 uri = urllib.parse.urljoin(base, "basic/")
910 (response, content) = self.http.request(uri, "GET")
911 self.assertEqual(response.status, 401)
912
913 self.http.add_credentials('joe', 'password')
914 (response, content) = self.http.request(uri, "GET")
915 self.assertEqual(response.status, 200)
916
917 uri = urllib.parse.urljoin(base, "basic/file.txt")
918 (response, content) = self.http.request(uri, "GET")
919 self.assertEqual(response.status, 200)
920
921 def testBasicAuthWithDomain(self):
922 # Test Basic Authentication
923 uri = urllib.parse.urljoin(base, "basic/file.txt")
924 (response, content) = self.http.request(uri, "GET")
925 self.assertEqual(response.status, 401)
926
927 uri = urllib.parse.urljoin(base, "basic/")
928 (response, content) = self.http.request(uri, "GET")
929 self.assertEqual(response.status, 401)
930
931 self.http.add_credentials('joe', 'password', "example.org")
932 (response, content) = self.http.request(uri, "GET")
933 self.assertEqual(response.status, 401)
934
935 uri = urllib.parse.urljoin(base, "basic/file.txt")
936 (response, content) = self.http.request(uri, "GET")
937 self.assertEqual(response.status, 401)
938
939 domain = urllib.parse.urlparse(base)[1]
940 self.http.add_credentials('joe', 'password', domain)
941 (response, content) = self.http.request(uri, "GET")
942 self.assertEqual(response.status, 200)
943
944 uri = urllib.parse.urljoin(base, "basic/file.txt")
945 (response, content) = self.http.request(uri, "GET")
946 self.assertEqual(response.status, 200)
947
948
949
950
951
952
953 def testBasicAuthTwoDifferentCredentials(self):
954 # Test Basic Authentication with multiple sets of credentials
955 uri = urllib.parse.urljoin(base, "basic2/file.txt")
956 (response, content) = self.http.request(uri, "GET")
957 self.assertEqual(response.status, 401)
958
959 uri = urllib.parse.urljoin(base, "basic2/")
960 (response, content) = self.http.request(uri, "GET")
961 self.assertEqual(response.status, 401)
962
963 self.http.add_credentials('fred', 'barney')
964 (response, content) = self.http.request(uri, "GET")
965 self.assertEqual(response.status, 200)
966
967 uri = urllib.parse.urljoin(base, "basic2/file.txt")
968 (response, content) = self.http.request(uri, "GET")
969 self.assertEqual(response.status, 200)
970
971 def testBasicAuthNested(self):
972 # Test Basic Authentication with resources
973 # that are nested
974 uri = urllib.parse.urljoin(base, "basic-nested/")
975 (response, content) = self.http.request(uri, "GET")
976 self.assertEqual(response.status, 401)
977
978 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
979 (response, content) = self.http.request(uri, "GET")
980 self.assertEqual(response.status, 401)
981
982 # Now add in credentials one at a time and test.
983 self.http.add_credentials('joe', 'password')
984
985 uri = urllib.parse.urljoin(base, "basic-nested/")
986 (response, content) = self.http.request(uri, "GET")
987 self.assertEqual(response.status, 200)
988
989 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
990 (response, content) = self.http.request(uri, "GET")
991 self.assertEqual(response.status, 401)
992
993 self.http.add_credentials('fred', 'barney')
994
995 uri = urllib.parse.urljoin(base, "basic-nested/")
996 (response, content) = self.http.request(uri, "GET")
997 self.assertEqual(response.status, 200)
998
999 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1000 (response, content) = self.http.request(uri, "GET")
1001 self.assertEqual(response.status, 200)
1002
1003 def testDigestAuth(self):
1004 # Test that we support Digest Authentication
1005 uri = urllib.parse.urljoin(base, "digest/")
1006 (response, content) = self.http.request(uri, "GET")
1007 self.assertEqual(response.status, 401)
1008
1009 self.http.add_credentials('joe', 'password')
1010 (response, content) = self.http.request(uri, "GET")
1011 self.assertEqual(response.status, 200)
1012
1013 uri = urllib.parse.urljoin(base, "digest/file.txt")
1014 (response, content) = self.http.request(uri, "GET")
1015
1016 def testDigestAuthNextNonceAndNC(self):
1017 # Test that if the server sets nextnonce that we reset
1018 # the nonce count back to 1
1019 uri = urllib.parse.urljoin(base, "digest/file.txt")
1020 self.http.add_credentials('joe', 'password')
1021 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1022 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1023 self.assertEqual(response.status, 200)
1024 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1025 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
1026 self.assertEqual(response.status, 200)
1027
1028 if 'nextnonce' in info:
1029 self.assertEqual(info2['nc'], 1)
1030
1031 def testDigestAuthStale(self):
1032 # Test that we can handle a nonce becoming stale
1033 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
1034 self.http.add_credentials('joe', 'password')
1035 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1036 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1037 self.assertEqual(response.status, 200)
1038
1039 time.sleep(3)
1040 # Sleep long enough that the nonce becomes stale
1041
1042 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1043 self.assertFalse(response.fromcache)
1044 self.assertTrue(response._stale_digest)
1045 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
1046 self.assertEqual(response.status, 200)
1047
1048 def reflector(self, content):
1049 return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
1050
1051 def testReflector(self):
1052 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1053 (response, content) = self.http.request(uri, "GET")
1054 d = self.reflector(content)
1055 self.assertTrue('HTTP_USER_AGENT' in d)
1056
Joe Gregorio84cc10a2009-09-01 13:02:49 -04001057
1058 def testConnectionClose(self):
1059 uri = "http://www.google.com/"
1060 (response, content) = self.http.request(uri, "GET")
1061 for c in self.http.connections.values():
1062 self.assertNotEqual(None, c.sock)
1063 (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
1064 for c in self.http.connections.values():
1065 self.assertEqual(None, c.sock)
1066
pilgrim00a352e2009-05-29 04:04:44 +00001067try:
1068 import memcache
1069 class HttpTestMemCached(HttpTest):
1070 def setUp(self):
1071 self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
1072 #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1073 self.http = httplib2.Http(self.cache)
1074 self.cache.flush_all()
1075 # Not exactly sure why the sleep is needed here, but
1076 # if not present then some unit tests that rely on caching
1077 # fail. Memcached seems to lose some sets immediately
1078 # after a flush_all if the set is to a value that
1079 # was previously cached. (Maybe the flush is handled async?)
1080 time.sleep(1)
1081 self.http.clear_credentials()
1082except:
1083 pass
1084
1085
1086
1087# ------------------------------------------------------------------------
1088
1089class HttpPrivateTest(unittest.TestCase):
1090
1091 def testParseCacheControl(self):
1092 # Test that we can parse the Cache-Control header
1093 self.assertEqual({}, httplib2._parse_cache_control({}))
1094 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
1095 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
1096 self.assertEqual(cc['no-cache'], 1)
1097 self.assertEqual(cc['max-age'], '7200')
1098 cc = httplib2._parse_cache_control({'cache-control': ' , '})
1099 self.assertEqual(cc[''], 1)
1100
Joe Gregorioe314e8b2009-07-16 20:11:28 -04001101 try:
1102 cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
1103 self.assertTrue("max-age" in cc)
1104 except:
1105 self.fail("Should not throw exception")
1106
1107
1108
1109
pilgrim00a352e2009-05-29 04:04:44 +00001110 def testNormalizeHeaders(self):
1111 # Test that we normalize headers to lowercase
1112 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
1113 self.assertTrue('cache-control' in h)
1114 self.assertTrue('other' in h)
1115 self.assertEqual('Stuff', h['other'])
1116
1117 def testExpirationModelTransparent(self):
1118 # Test that no-cache makes our request TRANSPARENT
1119 response_headers = {
1120 'cache-control': 'max-age=7200'
1121 }
1122 request_headers = {
1123 'cache-control': 'no-cache'
1124 }
1125 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
1126
1127 def testMaxAgeNonNumeric(self):
1128 # Test that no-cache makes our request TRANSPARENT
1129 response_headers = {
1130 'cache-control': 'max-age=fred, min-fresh=barney'
1131 }
1132 request_headers = {
1133 }
1134 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1135
1136
1137 def testExpirationModelNoCacheResponse(self):
1138 # The date and expires point to an entry that should be
1139 # FRESH, but the no-cache over-rides that.
1140 now = time.time()
1141 response_headers = {
1142 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1143 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1144 'cache-control': 'no-cache'
1145 }
1146 request_headers = {
1147 }
1148 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1149
1150 def testExpirationModelStaleRequestMustReval(self):
1151 # must-revalidate forces STALE
1152 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
1153
1154 def testExpirationModelStaleResponseMustReval(self):
1155 # must-revalidate forces STALE
1156 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
1157
1158 def testExpirationModelFresh(self):
1159 response_headers = {
1160 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1161 'cache-control': 'max-age=2'
1162 }
1163 request_headers = {
1164 }
1165 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1166 time.sleep(3)
1167 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1168
1169 def testExpirationMaxAge0(self):
1170 response_headers = {
1171 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1172 'cache-control': 'max-age=0'
1173 }
1174 request_headers = {
1175 }
1176 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1177
1178 def testExpirationModelDateAndExpires(self):
1179 now = time.time()
1180 response_headers = {
1181 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1182 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1183 }
1184 request_headers = {
1185 }
1186 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1187 time.sleep(3)
1188 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1189
1190 def testExpiresZero(self):
1191 now = time.time()
1192 response_headers = {
1193 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1194 'expires': "0",
1195 }
1196 request_headers = {
1197 }
1198 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1199
1200 def testExpirationModelDateOnly(self):
1201 now = time.time()
1202 response_headers = {
1203 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
1204 }
1205 request_headers = {
1206 }
1207 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1208
1209 def testExpirationModelOnlyIfCached(self):
1210 response_headers = {
1211 }
1212 request_headers = {
1213 'cache-control': 'only-if-cached',
1214 }
1215 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1216
1217 def testExpirationModelMaxAgeBoth(self):
1218 now = time.time()
1219 response_headers = {
1220 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1221 'cache-control': 'max-age=2'
1222 }
1223 request_headers = {
1224 'cache-control': 'max-age=0'
1225 }
1226 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1227
1228 def testExpirationModelDateAndExpiresMinFresh1(self):
1229 now = time.time()
1230 response_headers = {
1231 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1232 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1233 }
1234 request_headers = {
1235 'cache-control': 'min-fresh=2'
1236 }
1237 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1238
1239 def testExpirationModelDateAndExpiresMinFresh2(self):
1240 now = time.time()
1241 response_headers = {
1242 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1243 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1244 }
1245 request_headers = {
1246 'cache-control': 'min-fresh=2'
1247 }
1248 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1249
1250 def testParseWWWAuthenticateEmpty(self):
1251 res = httplib2._parse_www_authenticate({})
1252 self.assertEqual(len(list(res.keys())), 0)
1253
1254 def testParseWWWAuthenticate(self):
1255 # different uses of spaces around commas
1256 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
1257 self.assertEqual(len(list(res.keys())), 1)
1258 self.assertEqual(len(list(res['test'].keys())), 5)
1259
1260 # tokens with non-alphanum
1261 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
1262 self.assertEqual(len(list(res.keys())), 1)
1263 self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
1264
1265 # quoted string with quoted pairs
1266 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
1267 self.assertEqual(len(list(res.keys())), 1)
1268 self.assertEqual(res['test']['realm'], 'a "test" realm')
1269
1270 def testParseWWWAuthenticateStrict(self):
1271 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
1272 self.testParseWWWAuthenticate();
1273 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
1274
1275 def testParseWWWAuthenticateBasic(self):
1276 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
1277 basic = res['basic']
1278 self.assertEqual('me', basic['realm'])
1279
1280 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
1281 basic = res['basic']
1282 self.assertEqual('me', basic['realm'])
1283 self.assertEqual('MD5', basic['algorithm'])
1284
1285 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
1286 basic = res['basic']
1287 self.assertEqual('me', basic['realm'])
1288 self.assertEqual('MD5', basic['algorithm'])
1289
1290 def testParseWWWAuthenticateBasic2(self):
1291 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
1292 basic = res['basic']
1293 self.assertEqual('me', basic['realm'])
1294 self.assertEqual('fred', basic['other'])
1295
1296 def testParseWWWAuthenticateBasic3(self):
1297 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
1298 basic = res['basic']
1299 self.assertEqual('me', basic['realm'])
1300
1301
1302 def testParseWWWAuthenticateDigest(self):
1303 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1304 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
1305 digest = res['digest']
1306 self.assertEqual('testrealm@host.com', digest['realm'])
1307 self.assertEqual('auth,auth-int', digest['qop'])
1308
1309
1310 def testParseWWWAuthenticateMultiple(self):
1311 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1312 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
1313 digest = res['digest']
1314 self.assertEqual('testrealm@host.com', digest['realm'])
1315 self.assertEqual('auth,auth-int', digest['qop'])
1316 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1317 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1318 basic = res['basic']
1319 self.assertEqual('me', basic['realm'])
1320
1321 def testParseWWWAuthenticateMultiple2(self):
1322 # Handle an added comma between challenges, which might get thrown in if the challenges were
1323 # originally sent in separate www-authenticate headers.
1324 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1325 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
1326 digest = res['digest']
1327 self.assertEqual('testrealm@host.com', digest['realm'])
1328 self.assertEqual('auth,auth-int', digest['qop'])
1329 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1330 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1331 basic = res['basic']
1332 self.assertEqual('me', basic['realm'])
1333
1334 def testParseWWWAuthenticateMultiple3(self):
1335 # Handle an added comma between challenges, which might get thrown in if the challenges were
1336 # originally sent in separate www-authenticate headers.
1337 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1338 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1339 digest = res['digest']
1340 self.assertEqual('testrealm@host.com', digest['realm'])
1341 self.assertEqual('auth,auth-int', digest['qop'])
1342 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1343 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1344 basic = res['basic']
1345 self.assertEqual('me', basic['realm'])
1346 wsse = res['wsse']
1347 self.assertEqual('foo', wsse['realm'])
1348 self.assertEqual('UsernameToken', wsse['profile'])
1349
1350 def testParseWWWAuthenticateMultiple4(self):
1351 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1352 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1353 digest = res['digest']
1354 self.assertEqual('test-real.m@host.com', digest['realm'])
1355 self.assertEqual('\tauth,auth-int', digest['qop'])
1356 self.assertEqual('(*)&^&$%#', digest['nonce'])
1357
1358 def testParseWWWAuthenticateMoreQuoteCombos(self):
1359 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
1360 digest = res['digest']
1361 self.assertEqual('myrealm', digest['realm'])
1362
1363 def testDigestObject(self):
1364 credentials = ('joe', 'password')
1365 host = None
1366 request_uri = '/projects/httplib2/test/digest/'
1367 headers = {}
1368 response = {
1369 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
1370 }
1371 content = b""
1372
1373 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1374 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1375 our_request = "Authorization: %s" % headers['Authorization']
1376 working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
1377 self.assertEqual(our_request, working_request)
1378
1379
1380 def testDigestObjectStale(self):
1381 credentials = ('joe', 'password')
1382 host = None
1383 request_uri = '/projects/httplib2/test/digest/'
1384 headers = {}
1385 response = httplib2.Response({ })
1386 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1387 response.status = 401
1388 content = b""
1389 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1390 # Returns true to force a retry
1391 self.assertTrue( d.response(response, content) )
1392
1393 def testDigestObjectAuthInfo(self):
1394 credentials = ('joe', 'password')
1395 host = None
1396 request_uri = '/projects/httplib2/test/digest/'
1397 headers = {}
1398 response = httplib2.Response({ })
1399 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1400 response['authentication-info'] = 'nextnonce="fred"'
1401 content = b""
1402 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1403 # Returns true to force a retry
1404 self.assertFalse( d.response(response, content) )
1405 self.assertEqual('fred', d.challenge['nonce'])
1406 self.assertEqual(1, d.challenge['nc'])
1407
1408 def testWsseAlgorithm(self):
1409 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
1410 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1411 self.assertEqual(expected, digest)
1412
1413 def testEnd2End(self):
1414 # one end to end header
1415 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
1416 end2end = httplib2._get_end2end_headers(response)
1417 self.assertTrue('content-type' in end2end)
1418 self.assertTrue('te' not in end2end)
1419 self.assertTrue('connection' not in end2end)
1420
1421 # one end to end header that gets eliminated
1422 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
1423 end2end = httplib2._get_end2end_headers(response)
1424 self.assertTrue('content-type' not in end2end)
1425 self.assertTrue('te' not in end2end)
1426 self.assertTrue('connection' not in end2end)
1427
1428 # Degenerate case of no headers
1429 response = {}
1430 end2end = httplib2._get_end2end_headers(response)
1431 self.assertEquals(0, len(end2end))
1432
1433 # Degenerate case of connection referrring to a header not passed in
1434 response = {'connection': 'content-type'}
1435 end2end = httplib2._get_end2end_headers(response)
1436 self.assertEquals(0, len(end2end))
1437
1438unittest.main()