blob: de473ac4705e432bd98788b827ce185736c7995c [file] [log] [blame]
pilgrim00a352e2009-05-29 04:04:44 +00001#!/usr/bin/env python3
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 3.0 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = ["Mark Pilgrim"]
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.2 ($Rev: 118 $)"
16
Joe Gregoriod760a1a2011-02-11 01:03:22 -050017import base64
pilgrim00a352e2009-05-29 04:04:44 +000018import http.client
19import httplib2
pilgrim00a352e2009-05-29 04:04:44 +000020import io
Joe Gregoriod760a1a2011-02-11 01:03:22 -050021import os
22import socket
23import sys
24import time
25import unittest
26import urllib.parse
pilgrim00a352e2009-05-29 04:04:44 +000027
28# The test resources base uri
29base = 'http://bitworking.org/projects/httplib2/test/'
30#base = 'http://localhost/projects/httplib2/test/'
31cacheDirName = ".cache"
32
33
34class CredentialsTest(unittest.TestCase):
35 def test(self):
36 c = httplib2.Credentials()
37 c.add("joe", "password")
38 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
39 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
40 c.add("fred", "password2", "wellformedweb.org")
41 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
42 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
43 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
44 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
45 c.clear()
46 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
47 c.add("fred", "password2", "wellformedweb.org")
48 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
49 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
50 self.assertEqual(0, len(list(c.iter(""))))
51
52
53class ParserTest(unittest.TestCase):
54 def testFromStd66(self):
55 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
56 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
57 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
58 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
59 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
60 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
61 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
62 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
63
64
65class UrlNormTest(unittest.TestCase):
66 def test(self):
67 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
68 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
69 self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
70 self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
71 self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
72 self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
73 try:
74 httplib2.urlnorm("/")
75 self.fail("Non-absolute URIs should raise an exception")
76 except httplib2.RelativeURIError:
77 pass
78
79class UrlSafenameTest(unittest.TestCase):
80 def test(self):
81 # Test that different URIs end up generating different safe names
82 self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
83 self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
84 self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
85 self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
86 self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
87 self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
88 # Test the max length limits
89 uri = "http://" + ("w" * 200) + ".org"
90 uri2 = "http://" + ("w" * 201) + ".org"
91 self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
92 # Max length should be 200 + 1 (",") + 32
93 self.assertEqual(233, len(httplib2.safename(uri2)))
94 self.assertEqual(233, len(httplib2.safename(uri)))
95 # Unicode
96 if sys.version_info >= (2,3):
97 self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
98
99class _MyResponse(io.BytesIO):
100 def __init__(self, body, **kwargs):
101 io.BytesIO.__init__(self, body)
102 self.headers = kwargs
103
104 def items(self):
105 return self.headers.items()
106
107 def iteritems(self):
108 return iter(self.headers.items())
109
110
111class _MyHTTPConnection(object):
112 "This class is just a mock of httplib.HTTPConnection used for testing"
113
114 def __init__(self, host, port=None, key_file=None, cert_file=None,
115 strict=None, timeout=None, proxy_info=None):
116 self.host = host
117 self.port = port
118 self.timeout = timeout
119 self.log = ""
120
121 def set_debuglevel(self, level):
122 pass
123
124 def connect(self):
125 "Connect to a host on a given port."
126 pass
127
128 def close(self):
129 pass
130
131 def request(self, method, request_uri, body, headers):
132 pass
133
134 def getresponse(self):
135 return _MyResponse(b"the body", status="200")
136
137
138class HttpTest(unittest.TestCase):
139 def setUp(self):
140 if os.path.exists(cacheDirName):
141 [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
142 self.http = httplib2.Http(cacheDirName)
143 self.http.clear_credentials()
144
145 def testConnectionType(self):
146 self.http.force_exception_to_status_code = False
147 response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
148 self.assertEqual(response['content-location'], "http://bitworking.org")
149 self.assertEqual(content, b"the body")
150
151 def testGetUnknownServer(self):
152 self.http.force_exception_to_status_code = False
153 try:
154 self.http.request("http://fred.bitworking.org/")
155 self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
156 except httplib2.ServerNotFoundError:
157 pass
158
159 # Now test with exceptions turned off
160 self.http.force_exception_to_status_code = True
161
162 (response, content) = self.http.request("http://fred.bitworking.org/")
163 self.assertEqual(response['content-type'], 'text/plain')
164 self.assertTrue(content.startswith(b"Unable to find"))
165 self.assertEqual(response.status, 400)
166
Joe Gregoriod760a1a2011-02-11 01:03:22 -0500167 def testGetConnectionRefused(self):
168 self.http.force_exception_to_status_code = False
169 try:
170 self.http.request("http://localhost:7777/")
171 self.fail("An socket.error exception must be thrown on Connection Refused.")
172 except socket.error:
173 pass
174
175 # Now test with exceptions turned off
176 self.http.force_exception_to_status_code = True
177
178 (response, content) = self.http.request("http://localhost:7777/")
179 self.assertEqual(response['content-type'], 'text/plain')
180 self.assertTrue(b"Connection refused" in content)
181 self.assertEqual(response.status, 400)
182
pilgrim00a352e2009-05-29 04:04:44 +0000183 def testGetIRI(self):
184 if sys.version_info >= (2,3):
185 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
186 (response, content) = self.http.request(uri, "GET")
187 d = self.reflector(content)
188 self.assertTrue('QUERY_STRING' in d)
189 self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
190
191 def testGetIsDefaultMethod(self):
192 # Test that GET is the default method
193 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
194 (response, content) = self.http.request(uri)
195 self.assertEqual(response['x-method'], "GET")
196
197 def testDifferentMethods(self):
198 # Test that all methods can be used
199 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
200 for method in ["GET", "PUT", "DELETE", "POST"]:
201 (response, content) = self.http.request(uri, method, body=b" ")
202 self.assertEqual(response['x-method'], method)
203
Joe Gregoriob628c0b2009-07-16 12:28:04 -0400204 def testHeadRead(self):
205 # Test that we don't try to read the response of a HEAD request
206 # since httplib blocks response.read() for HEAD requests.
207 # Oddly enough this doesn't appear as a problem when doing HEAD requests
208 # against Apache servers.
209 uri = "http://www.google.com/"
210 (response, content) = self.http.request(uri, "HEAD")
211 self.assertEqual(response.status, 200)
212 self.assertEqual(content, b"")
213
pilgrim00a352e2009-05-29 04:04:44 +0000214 def testGetNoCache(self):
215 # Test that can do a GET w/o the cache turned on.
216 http = httplib2.Http()
217 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
218 (response, content) = http.request(uri, "GET")
219 self.assertEqual(response.status, 200)
220 self.assertEqual(response.previous, None)
221
Joe Gregorioe202d212009-07-16 14:57:52 -0400222 def testGetOnlyIfCachedCacheHit(self):
223 # Test that can do a GET with cache and 'only-if-cached'
224 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
225 (response, content) = self.http.request(uri, "GET")
226 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
227 self.assertEqual(response.fromcache, True)
228 self.assertEqual(response.status, 200)
229
pilgrim00a352e2009-05-29 04:04:44 +0000230 def testGetOnlyIfCachedCacheMiss(self):
231 # Test that can do a GET with no cache with 'only-if-cached'
pilgrim00a352e2009-05-29 04:04:44 +0000232 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorioe202d212009-07-16 14:57:52 -0400233 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
pilgrim00a352e2009-05-29 04:04:44 +0000234 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400235 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000236
237 def testGetOnlyIfCachedNoCacheAtAll(self):
238 # Test that can do a GET with no cache with 'only-if-cached'
239 # Of course, there might be an intermediary beyond us
240 # that responds to the 'only-if-cached', so this
241 # test can't really be guaranteed to pass.
242 http = httplib2.Http()
243 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
244 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
245 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400246 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000247
248 def testUserAgent(self):
249 # Test that we provide a default user-agent
250 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
251 (response, content) = self.http.request(uri, "GET")
252 self.assertEqual(response.status, 200)
253 self.assertTrue(content.startswith(b"Python-httplib2/"))
254
255 def testUserAgentNonDefault(self):
256 # Test that the default user-agent can be over-ridden
257
258 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
259 (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
260 self.assertEqual(response.status, 200)
261 self.assertTrue(content.startswith(b"fred/1.0"))
262
263 def testGet300WithLocation(self):
264 # Test the we automatically follow 300 redirects if a Location: header is provided
265 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
266 (response, content) = self.http.request(uri, "GET")
267 self.assertEqual(response.status, 200)
268 self.assertEqual(content, b"This is the final destination.\n")
269 self.assertEqual(response.previous.status, 300)
270 self.assertEqual(response.previous.fromcache, False)
271
272 # Confirm that the intermediate 300 is not cached
273 (response, content) = self.http.request(uri, "GET")
274 self.assertEqual(response.status, 200)
275 self.assertEqual(content, b"This is the final destination.\n")
276 self.assertEqual(response.previous.status, 300)
277 self.assertEqual(response.previous.fromcache, False)
278
279 def testGet300WithLocationNoRedirect(self):
280 # Test the we automatically follow 300 redirects if a Location: header is provided
281 self.http.follow_redirects = False
282 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
283 (response, content) = self.http.request(uri, "GET")
284 self.assertEqual(response.status, 300)
285
286 def testGet300WithoutLocation(self):
287 # Not giving a Location: header in a 300 response is acceptable
288 # In which case we just return the 300 response
289 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
290 (response, content) = self.http.request(uri, "GET")
291 self.assertEqual(response.status, 300)
292 self.assertTrue(response['content-type'].startswith("text/html"))
293 self.assertEqual(response.previous, None)
294
295 def testGet301(self):
296 # Test that we automatically follow 301 redirects
297 # and that we cache the 301 response
298 uri = urllib.parse.urljoin(base, "301/onestep.asis")
299 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
300 (response, content) = self.http.request(uri, "GET")
301 self.assertEqual(response.status, 200)
302 self.assertTrue('content-location' in response)
303 self.assertEqual(response['content-location'], destination)
304 self.assertEqual(content, b"This is the final destination.\n")
305 self.assertEqual(response.previous.status, 301)
306 self.assertEqual(response.previous.fromcache, False)
307
308 (response, content) = self.http.request(uri, "GET")
309 self.assertEqual(response.status, 200)
310 self.assertEqual(response['content-location'], destination)
311 self.assertEqual(content, b"This is the final destination.\n")
312 self.assertEqual(response.previous.status, 301)
313 self.assertEqual(response.previous.fromcache, True)
314
315
316 def testGet301NoRedirect(self):
317 # Test that we automatically follow 301 redirects
318 # and that we cache the 301 response
319 self.http.follow_redirects = False
320 uri = urllib.parse.urljoin(base, "301/onestep.asis")
321 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
322 (response, content) = self.http.request(uri, "GET")
323 self.assertEqual(response.status, 301)
324
325
326 def testGet302(self):
327 # Test that we automatically follow 302 redirects
328 # and that we DO NOT cache the 302 response
329 uri = urllib.parse.urljoin(base, "302/onestep.asis")
330 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
331 (response, content) = self.http.request(uri, "GET")
332 self.assertEqual(response.status, 200)
333 self.assertEqual(response['content-location'], destination)
334 self.assertEqual(content, b"This is the final destination.\n")
335 self.assertEqual(response.previous.status, 302)
336 self.assertEqual(response.previous.fromcache, False)
337
338 uri = urllib.parse.urljoin(base, "302/onestep.asis")
339 (response, content) = self.http.request(uri, "GET")
340 self.assertEqual(response.status, 200)
341 self.assertEqual(response.fromcache, True)
342 self.assertEqual(response['content-location'], destination)
343 self.assertEqual(content, b"This is the final destination.\n")
344 self.assertEqual(response.previous.status, 302)
345 self.assertEqual(response.previous.fromcache, False)
346 self.assertEqual(response.previous['content-location'], uri)
347
348 uri = urllib.parse.urljoin(base, "302/twostep.asis")
349
350 (response, content) = self.http.request(uri, "GET")
351 self.assertEqual(response.status, 200)
352 self.assertEqual(response.fromcache, True)
353 self.assertEqual(content, b"This is the final destination.\n")
354 self.assertEqual(response.previous.status, 302)
355 self.assertEqual(response.previous.fromcache, False)
356
357 def testGet302RedirectionLimit(self):
358 # Test that we can set a lower redirection limit
359 # and that we raise an exception when we exceed
360 # that limit.
361 self.http.force_exception_to_status_code = False
362
363 uri = urllib.parse.urljoin(base, "302/twostep.asis")
364 try:
365 (response, content) = self.http.request(uri, "GET", redirections = 1)
366 self.fail("This should not happen")
367 except httplib2.RedirectLimit:
368 pass
369 except Exception as e:
370 self.fail("Threw wrong kind of exception ")
371
372 # Re-run the test with out the exceptions
373 self.http.force_exception_to_status_code = True
374
375 (response, content) = self.http.request(uri, "GET", redirections = 1)
376 self.assertEqual(response.status, 500)
377 self.assertTrue(response.reason.startswith("Redirected more"))
378 self.assertEqual("302", response['status'])
379 self.assertTrue(content.startswith(b"<html>"))
380 self.assertTrue(response.previous != None)
381
382 def testGet302NoLocation(self):
383 # Test that we throw an exception when we get
384 # a 302 with no Location: header.
385 self.http.force_exception_to_status_code = False
386 uri = urllib.parse.urljoin(base, "302/no-location.asis")
387 try:
388 (response, content) = self.http.request(uri, "GET")
389 self.fail("Should never reach here")
390 except httplib2.RedirectMissingLocation:
391 pass
392 except Exception as e:
393 self.fail("Threw wrong kind of exception ")
394
395 # Re-run the test with out the exceptions
396 self.http.force_exception_to_status_code = True
397
398 (response, content) = self.http.request(uri, "GET")
399 self.assertEqual(response.status, 500)
400 self.assertTrue(response.reason.startswith("Redirected but"))
401 self.assertEqual("302", response['status'])
402 self.assertTrue(content.startswith(b"This is content"))
403
404 def testGet302ViaHttps(self):
405 # Google always redirects to http://google.com
406 (response, content) = self.http.request("https://google.com", "GET")
407 self.assertEqual(200, response.status)
408 self.assertEqual(302, response.previous.status)
409
410 def testGetViaHttps(self):
411 # Test that we can handle HTTPS
412 (response, content) = self.http.request("https://google.com/adsense/", "GET")
413 self.assertEqual(200, response.status)
414
415 def testGetViaHttpsSpecViolationOnLocation(self):
416 # Test that we follow redirects through HTTPS
417 # even if they violate the spec by including
418 # a relative Location: header instead of an
419 # absolute one.
420 (response, content) = self.http.request("https://google.com/adsense", "GET")
421 self.assertEqual(200, response.status)
422 self.assertNotEqual(None, response.previous)
423
424
425 def testGetViaHttpsKeyCert(self):
426 # At this point I can only test
427 # that the key and cert files are passed in
428 # correctly to httplib. It would be nice to have
429 # a real https endpoint to test against.
430 http = httplib2.Http(timeout=2)
431
432 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
433 try:
434 (response, content) = http.request("https://bitworking.org", "GET")
435 except:
436 pass
437 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
438 self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
439
440 try:
441 (response, content) = http.request("https://notthere.bitworking.org", "GET")
442 except:
443 pass
444 self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
445 self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
446
447
448
449
450 def testGet303(self):
451 # Do a follow-up GET on a Location: header
452 # returned from a POST that gave a 303.
453 uri = urllib.parse.urljoin(base, "303/303.cgi")
454 (response, content) = self.http.request(uri, "POST", " ")
455 self.assertEqual(response.status, 200)
456 self.assertEqual(content, b"This is the final destination.\n")
457 self.assertEqual(response.previous.status, 303)
458
459 def testGet303NoRedirect(self):
460 # Do a follow-up GET on a Location: header
461 # returned from a POST that gave a 303.
462 self.http.follow_redirects = False
463 uri = urllib.parse.urljoin(base, "303/303.cgi")
464 (response, content) = self.http.request(uri, "POST", " ")
465 self.assertEqual(response.status, 303)
466
467 def test303ForDifferentMethods(self):
468 # Test that all methods can be used
469 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
470 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
471 (response, content) = self.http.request(uri, method, body=b" ")
472 self.assertEqual(response['x-method'], method_on_303)
473
474 def testGet304(self):
475 # Test that we use ETags properly to validate our cache
476 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
477 (response, content) = self.http.request(uri, "GET")
478 self.assertNotEqual(response['etag'], "")
479
480 (response, content) = self.http.request(uri, "GET")
481 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
482 self.assertEqual(response.status, 200)
483 self.assertEqual(response.fromcache, True)
484
485 cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
486 f = open(cache_file_name, "r")
487 status_line = f.readline()
488 f.close()
489
490 self.assertTrue(status_line.startswith("status:"))
491
492 (response, content) = self.http.request(uri, "HEAD")
493 self.assertEqual(response.status, 200)
494 self.assertEqual(response.fromcache, True)
495
496 (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
497 self.assertEqual(response.status, 206)
498 self.assertEqual(response.fromcache, False)
499
500 def testGetIgnoreEtag(self):
501 # Test that we can forcibly ignore ETags
502 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
503 (response, content) = self.http.request(uri, "GET")
504 self.assertNotEqual(response['etag'], "")
505
506 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
507 d = self.reflector(content)
508 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
509
510 self.http.ignore_etag = True
511 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
512 d = self.reflector(content)
513 self.assertEqual(response.fromcache, False)
514 self.assertFalse('HTTP_IF_NONE_MATCH' in d)
515
516 def testOverrideEtag(self):
517 # Test that we can forcibly ignore ETags
518 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
519 (response, content) = self.http.request(uri, "GET")
520 self.assertNotEqual(response['etag'], "")
521
522 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
523 d = self.reflector(content)
524 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
525 self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
526
527 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
528 d = self.reflector(content)
529 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
530 self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
531
532#MAP-commented this out because it consistently fails
533# def testGet304EndToEnd(self):
534# # Test that end to end headers get overwritten in the cache
535# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
536# (response, content) = self.http.request(uri, "GET")
537# self.assertNotEqual(response['etag'], "")
538# old_date = response['date']
539# time.sleep(2)
540#
541# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
542# # The response should be from the cache, but the Date: header should be updated.
543# new_date = response['date']
544# self.assertNotEqual(new_date, old_date)
545# self.assertEqual(response.status, 200)
546# self.assertEqual(response.fromcache, True)
547
548 def testGet304LastModified(self):
549 # Test that we can still handle a 304
550 # by only using the last-modified cache validator.
551 uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
552 (response, content) = self.http.request(uri, "GET")
553
554 self.assertNotEqual(response['last-modified'], "")
555 (response, content) = self.http.request(uri, "GET")
556 (response, content) = self.http.request(uri, "GET")
557 self.assertEqual(response.status, 200)
558 self.assertEqual(response.fromcache, True)
559
560 def testGet307(self):
561 # Test that we do follow 307 redirects but
562 # do not cache the 307
563 uri = urllib.parse.urljoin(base, "307/onestep.asis")
564 (response, content) = self.http.request(uri, "GET")
565 self.assertEqual(response.status, 200)
566 self.assertEqual(content, b"This is the final destination.\n")
567 self.assertEqual(response.previous.status, 307)
568 self.assertEqual(response.previous.fromcache, False)
569
570 (response, content) = self.http.request(uri, "GET")
571 self.assertEqual(response.status, 200)
572 self.assertEqual(response.fromcache, True)
573 self.assertEqual(content, b"This is the final destination.\n")
574 self.assertEqual(response.previous.status, 307)
575 self.assertEqual(response.previous.fromcache, False)
576
577 def testGet410(self):
578 # Test that we pass 410's through
579 uri = urllib.parse.urljoin(base, "410/410.asis")
580 (response, content) = self.http.request(uri, "GET")
581 self.assertEqual(response.status, 410)
582
chris.dent@gmail.comae846ca2009-12-24 14:02:57 -0600583 def testVaryHeaderSimple(self):
584 """
585 RFC 2616 13.6
586 When the cache receives a subsequent request whose Request-URI
587 specifies one or more cache entries including a Vary header field,
588 the cache MUST NOT use such a cache entry to construct a response
589 to the new request unless all of the selecting request-headers
590 present in the new request match the corresponding stored
591 request-headers in the original request.
592 """
593 # test that the vary header is sent
594 uri = urllib.parse.urljoin(base, "vary/accept.asis")
595 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
596 self.assertEqual(response.status, 200)
597 self.assertTrue('vary' in response)
598
599 # get the resource again, from the cache since accept header in this
600 # request is the same as the request
601 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
602 self.assertEqual(response.status, 200)
603 self.assertEqual(response.fromcache, True, msg="Should be from cache")
604
605 # get the resource again, not from cache since Accept headers does not match
606 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
607 self.assertEqual(response.status, 200)
608 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
609
610 # get the resource again, without any Accept header, so again no match
611 (response, content) = self.http.request(uri, "GET")
612 self.assertEqual(response.status, 200)
613 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
614
615 def testNoVary(self):
616 # when there is no vary, a different Accept header (e.g.) should not
617 # impact if the cache is used
618 # test that the vary header is not sent
619 uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
620 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
621 self.assertEqual(response.status, 200)
622 self.assertFalse('vary' in response)
623
624 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
625 self.assertEqual(response.status, 200)
626 self.assertEqual(response.fromcache, True, msg="Should be from cache")
627
628 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
629 self.assertEqual(response.status, 200)
630 self.assertEqual(response.fromcache, True, msg="Should be from cache")
631
632 def testVaryHeaderDouble(self):
633 uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
634 (response, content) = self.http.request(uri, "GET", headers={
635 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
636 self.assertEqual(response.status, 200)
637 self.assertTrue('vary' in response)
638
639 # we are from cache
640 (response, content) = self.http.request(uri, "GET", headers={
641 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
642 self.assertEqual(response.fromcache, True, msg="Should be from cache")
643
644 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
645 self.assertEqual(response.status, 200)
646 self.assertEqual(response.fromcache, False)
647
648 # get the resource again, not from cache, varied headers don't match exact
649 (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
650 self.assertEqual(response.status, 200)
651 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
652
jcgregorio@localhost9e603da2010-05-13 23:42:11 -0400653 def testVaryUnusedHeader(self):
654 # A header's value is not considered to vary if it's not used at all.
655 uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
656 (response, content) = self.http.request(uri, "GET", headers={
657 'Accept': 'text/plain'})
658 self.assertEqual(response.status, 200)
659 self.assertTrue('vary' in response)
660
661 # we are from cache
662 (response, content) = self.http.request(uri, "GET", headers={
663 'Accept': 'text/plain',})
664 self.assertEqual(response.fromcache, True, msg="Should be from cache")
665
pilgrim00a352e2009-05-29 04:04:44 +0000666 def testHeadGZip(self):
667 # Test that we don't try to decompress a HEAD response
668 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
669 (response, content) = self.http.request(uri, "HEAD")
670 self.assertEqual(response.status, 200)
671 self.assertNotEqual(int(response['content-length']), 0)
672 self.assertEqual(content, b"")
673
674 def testGetGZip(self):
675 # Test that we support gzip compression
676 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
677 (response, content) = self.http.request(uri, "GET")
678 self.assertEqual(response.status, 200)
679 self.assertFalse('content-encoding' in response)
680 self.assertTrue('-content-encoding' in response)
681 self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
682 self.assertEqual(content, b"This is the final destination.\n")
683
684 def testGetGZipFailure(self):
685 # Test that we raise a good exception when the gzip fails
686 self.http.force_exception_to_status_code = False
687 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
688 try:
689 (response, content) = self.http.request(uri, "GET")
690 self.fail("Should never reach here")
691 except httplib2.FailedToDecompressContent:
692 pass
693 except Exception:
694 self.fail("Threw wrong kind of exception")
695
696 # Re-run the test with out the exceptions
697 self.http.force_exception_to_status_code = True
698
699 (response, content) = self.http.request(uri, "GET")
700 self.assertEqual(response.status, 500)
701 self.assertTrue(response.reason.startswith("Content purported"))
702
703 def testTimeout(self):
704 self.http.force_exception_to_status_code = True
705 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
706 try:
707 import socket
708 socket.setdefaulttimeout(1)
709 except:
710 # Don't run the test if we can't set the timeout
711 return
712 (response, content) = self.http.request(uri)
713 self.assertEqual(response.status, 408)
714 self.assertTrue(response.reason.startswith("Request Timeout"))
715 self.assertTrue(content.startswith(b"Request Timeout"))
716
717 def testIndividualTimeout(self):
718 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
719 http = httplib2.Http(timeout=1)
720 http.force_exception_to_status_code = True
721
722 (response, content) = http.request(uri)
723 self.assertEqual(response.status, 408)
724 self.assertTrue(response.reason.startswith("Request Timeout"))
725 self.assertTrue(content.startswith(b"Request Timeout"))
726
727
728 def testGetDeflate(self):
729 # Test that we support deflate compression
730 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
731 (response, content) = self.http.request(uri, "GET")
732 self.assertEqual(response.status, 200)
733 self.assertFalse('content-encoding' in response)
734 self.assertEqual(int(response['content-length']), len("This is the final destination."))
735 self.assertEqual(content, b"This is the final destination.")
736
737 def testGetDeflateFailure(self):
738 # Test that we raise a good exception when the deflate fails
739 self.http.force_exception_to_status_code = False
740
741 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
742 try:
743 (response, content) = self.http.request(uri, "GET")
744 self.fail("Should never reach here")
745 except httplib2.FailedToDecompressContent:
746 pass
747 except Exception:
748 self.fail("Threw wrong kind of exception")
749
750 # Re-run the test with out the exceptions
751 self.http.force_exception_to_status_code = True
752
753 (response, content) = self.http.request(uri, "GET")
754 self.assertEqual(response.status, 500)
755 self.assertTrue(response.reason.startswith("Content purported"))
756
757 def testGetDuplicateHeaders(self):
758 # Test that duplicate headers get concatenated via ','
759 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
760 (response, content) = self.http.request(uri, "GET")
761 self.assertEqual(response.status, 200)
762 self.assertEqual(content, b"This is content\n")
763 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
764
765 def testGetCacheControlNoCache(self):
766 # Test Cache-Control: no-cache on requests
767 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
768 (response, content) = self.http.request(uri, "GET")
769 self.assertNotEqual(response['etag'], "")
770 (response, content) = self.http.request(uri, "GET")
771 self.assertEqual(response.status, 200)
772 self.assertEqual(response.fromcache, True)
773
774 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
775 self.assertEqual(response.status, 200)
776 self.assertEqual(response.fromcache, False)
777
778 def testGetCacheControlPragmaNoCache(self):
779 # Test Pragma: no-cache on requests
780 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
781 (response, content) = self.http.request(uri, "GET")
782 self.assertNotEqual(response['etag'], "")
783 (response, content) = self.http.request(uri, "GET")
784 self.assertEqual(response.status, 200)
785 self.assertEqual(response.fromcache, True)
786
787 (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
788 self.assertEqual(response.status, 200)
789 self.assertEqual(response.fromcache, False)
790
791 def testGetCacheControlNoStoreRequest(self):
792 # A no-store request means that the response should not be stored.
793 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
794
795 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
796 self.assertEqual(response.status, 200)
797 self.assertEqual(response.fromcache, False)
798
799 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
800 self.assertEqual(response.status, 200)
801 self.assertEqual(response.fromcache, False)
802
803 def testGetCacheControlNoStoreResponse(self):
804 # A no-store response means that the response should not be stored.
805 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
806
807 (response, content) = self.http.request(uri, "GET")
808 self.assertEqual(response.status, 200)
809 self.assertEqual(response.fromcache, False)
810
811 (response, content) = self.http.request(uri, "GET")
812 self.assertEqual(response.status, 200)
813 self.assertEqual(response.fromcache, False)
814
815 def testGetCacheControlNoCacheNoStoreRequest(self):
816 # Test that a no-store, no-cache clears the entry from the cache
817 # even if it was cached previously.
818 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
819
820 (response, content) = self.http.request(uri, "GET")
821 (response, content) = self.http.request(uri, "GET")
822 self.assertEqual(response.fromcache, True)
823 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
824 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
825 self.assertEqual(response.status, 200)
826 self.assertEqual(response.fromcache, False)
827
828 def testUpdateInvalidatesCache(self):
829 # Test that calling PUT or DELETE on a
830 # URI that is cache invalidates that cache.
831 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
832
833 (response, content) = self.http.request(uri, "GET")
834 (response, content) = self.http.request(uri, "GET")
835 self.assertEqual(response.fromcache, True)
836 (response, content) = self.http.request(uri, "DELETE")
837 self.assertEqual(response.status, 405)
838
839 (response, content) = self.http.request(uri, "GET")
840 self.assertEqual(response.fromcache, False)
841
842 def testUpdateUsesCachedETag(self):
843 # Test that we natively support http://www.w3.org/1999/04/Editing/
844 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
845
846 (response, content) = self.http.request(uri, "GET")
847 self.assertEqual(response.status, 200)
848 self.assertEqual(response.fromcache, False)
849 (response, content) = self.http.request(uri, "GET")
850 self.assertEqual(response.status, 200)
851 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400852 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000853 self.assertEqual(response.status, 200)
Joe Gregorio799b2072009-09-29 17:21:19 -0400854 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000855 self.assertEqual(response.status, 412)
856
857 def testUpdateUsesCachedETagAndOCMethod(self):
858 # Test that we natively support http://www.w3.org/1999/04/Editing/
859 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
860
861 (response, content) = self.http.request(uri, "GET")
862 self.assertEqual(response.status, 200)
863 self.assertEqual(response.fromcache, False)
864 (response, content) = self.http.request(uri, "GET")
865 self.assertEqual(response.status, 200)
866 self.assertEqual(response.fromcache, True)
867 self.http.optimistic_concurrency_methods.append("DELETE")
868 (response, content) = self.http.request(uri, "DELETE")
869 self.assertEqual(response.status, 200)
870
871
872 def testUpdateUsesCachedETagOverridden(self):
873 # Test that we natively support http://www.w3.org/1999/04/Editing/
874 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
875
876 (response, content) = self.http.request(uri, "GET")
877 self.assertEqual(response.status, 200)
878 self.assertEqual(response.fromcache, False)
879 (response, content) = self.http.request(uri, "GET")
880 self.assertEqual(response.status, 200)
881 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400882 (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
pilgrim00a352e2009-05-29 04:04:44 +0000883 self.assertEqual(response.status, 412)
884
885 def testBasicAuth(self):
886 # Test Basic Authentication
887 uri = urllib.parse.urljoin(base, "basic/file.txt")
888 (response, content) = self.http.request(uri, "GET")
889 self.assertEqual(response.status, 401)
890
891 uri = urllib.parse.urljoin(base, "basic/")
892 (response, content) = self.http.request(uri, "GET")
893 self.assertEqual(response.status, 401)
894
895 self.http.add_credentials('joe', 'password')
896 (response, content) = self.http.request(uri, "GET")
897 self.assertEqual(response.status, 200)
898
899 uri = urllib.parse.urljoin(base, "basic/file.txt")
900 (response, content) = self.http.request(uri, "GET")
901 self.assertEqual(response.status, 200)
902
903 def testBasicAuthWithDomain(self):
904 # Test Basic Authentication
905 uri = urllib.parse.urljoin(base, "basic/file.txt")
906 (response, content) = self.http.request(uri, "GET")
907 self.assertEqual(response.status, 401)
908
909 uri = urllib.parse.urljoin(base, "basic/")
910 (response, content) = self.http.request(uri, "GET")
911 self.assertEqual(response.status, 401)
912
913 self.http.add_credentials('joe', 'password', "example.org")
914 (response, content) = self.http.request(uri, "GET")
915 self.assertEqual(response.status, 401)
916
917 uri = urllib.parse.urljoin(base, "basic/file.txt")
918 (response, content) = self.http.request(uri, "GET")
919 self.assertEqual(response.status, 401)
920
921 domain = urllib.parse.urlparse(base)[1]
922 self.http.add_credentials('joe', 'password', domain)
923 (response, content) = self.http.request(uri, "GET")
924 self.assertEqual(response.status, 200)
925
926 uri = urllib.parse.urljoin(base, "basic/file.txt")
927 (response, content) = self.http.request(uri, "GET")
928 self.assertEqual(response.status, 200)
929
930
931
932
933
934
935 def testBasicAuthTwoDifferentCredentials(self):
936 # Test Basic Authentication with multiple sets of credentials
937 uri = urllib.parse.urljoin(base, "basic2/file.txt")
938 (response, content) = self.http.request(uri, "GET")
939 self.assertEqual(response.status, 401)
940
941 uri = urllib.parse.urljoin(base, "basic2/")
942 (response, content) = self.http.request(uri, "GET")
943 self.assertEqual(response.status, 401)
944
945 self.http.add_credentials('fred', 'barney')
946 (response, content) = self.http.request(uri, "GET")
947 self.assertEqual(response.status, 200)
948
949 uri = urllib.parse.urljoin(base, "basic2/file.txt")
950 (response, content) = self.http.request(uri, "GET")
951 self.assertEqual(response.status, 200)
952
953 def testBasicAuthNested(self):
954 # Test Basic Authentication with resources
955 # that are nested
956 uri = urllib.parse.urljoin(base, "basic-nested/")
957 (response, content) = self.http.request(uri, "GET")
958 self.assertEqual(response.status, 401)
959
960 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
961 (response, content) = self.http.request(uri, "GET")
962 self.assertEqual(response.status, 401)
963
964 # Now add in credentials one at a time and test.
965 self.http.add_credentials('joe', 'password')
966
967 uri = urllib.parse.urljoin(base, "basic-nested/")
968 (response, content) = self.http.request(uri, "GET")
969 self.assertEqual(response.status, 200)
970
971 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
972 (response, content) = self.http.request(uri, "GET")
973 self.assertEqual(response.status, 401)
974
975 self.http.add_credentials('fred', 'barney')
976
977 uri = urllib.parse.urljoin(base, "basic-nested/")
978 (response, content) = self.http.request(uri, "GET")
979 self.assertEqual(response.status, 200)
980
981 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
982 (response, content) = self.http.request(uri, "GET")
983 self.assertEqual(response.status, 200)
984
985 def testDigestAuth(self):
986 # Test that we support Digest Authentication
987 uri = urllib.parse.urljoin(base, "digest/")
988 (response, content) = self.http.request(uri, "GET")
989 self.assertEqual(response.status, 401)
990
991 self.http.add_credentials('joe', 'password')
992 (response, content) = self.http.request(uri, "GET")
993 self.assertEqual(response.status, 200)
994
995 uri = urllib.parse.urljoin(base, "digest/file.txt")
996 (response, content) = self.http.request(uri, "GET")
997
998 def testDigestAuthNextNonceAndNC(self):
999 # Test that if the server sets nextnonce that we reset
1000 # the nonce count back to 1
1001 uri = urllib.parse.urljoin(base, "digest/file.txt")
1002 self.http.add_credentials('joe', 'password')
1003 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1004 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1005 self.assertEqual(response.status, 200)
1006 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1007 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
1008 self.assertEqual(response.status, 200)
1009
1010 if 'nextnonce' in info:
1011 self.assertEqual(info2['nc'], 1)
1012
1013 def testDigestAuthStale(self):
1014 # Test that we can handle a nonce becoming stale
1015 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
1016 self.http.add_credentials('joe', 'password')
1017 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1018 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1019 self.assertEqual(response.status, 200)
1020
1021 time.sleep(3)
1022 # Sleep long enough that the nonce becomes stale
1023
1024 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1025 self.assertFalse(response.fromcache)
1026 self.assertTrue(response._stale_digest)
1027 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
1028 self.assertEqual(response.status, 200)
1029
1030 def reflector(self, content):
1031 return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
1032
1033 def testReflector(self):
1034 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1035 (response, content) = self.http.request(uri, "GET")
1036 d = self.reflector(content)
1037 self.assertTrue('HTTP_USER_AGENT' in d)
1038
Joe Gregorio84cc10a2009-09-01 13:02:49 -04001039
1040 def testConnectionClose(self):
1041 uri = "http://www.google.com/"
1042 (response, content) = self.http.request(uri, "GET")
1043 for c in self.http.connections.values():
1044 self.assertNotEqual(None, c.sock)
1045 (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
1046 for c in self.http.connections.values():
1047 self.assertEqual(None, c.sock)
1048
pilgrim00a352e2009-05-29 04:04:44 +00001049try:
1050 import memcache
1051 class HttpTestMemCached(HttpTest):
1052 def setUp(self):
1053 self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
1054 #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1055 self.http = httplib2.Http(self.cache)
1056 self.cache.flush_all()
1057 # Not exactly sure why the sleep is needed here, but
1058 # if not present then some unit tests that rely on caching
1059 # fail. Memcached seems to lose some sets immediately
1060 # after a flush_all if the set is to a value that
1061 # was previously cached. (Maybe the flush is handled async?)
1062 time.sleep(1)
1063 self.http.clear_credentials()
1064except:
1065 pass
1066
1067
1068
1069# ------------------------------------------------------------------------
1070
1071class HttpPrivateTest(unittest.TestCase):
1072
1073 def testParseCacheControl(self):
1074 # Test that we can parse the Cache-Control header
1075 self.assertEqual({}, httplib2._parse_cache_control({}))
1076 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
1077 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
1078 self.assertEqual(cc['no-cache'], 1)
1079 self.assertEqual(cc['max-age'], '7200')
1080 cc = httplib2._parse_cache_control({'cache-control': ' , '})
1081 self.assertEqual(cc[''], 1)
1082
Joe Gregorioe314e8b2009-07-16 20:11:28 -04001083 try:
1084 cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
1085 self.assertTrue("max-age" in cc)
1086 except:
1087 self.fail("Should not throw exception")
1088
1089
1090
1091
pilgrim00a352e2009-05-29 04:04:44 +00001092 def testNormalizeHeaders(self):
1093 # Test that we normalize headers to lowercase
1094 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
1095 self.assertTrue('cache-control' in h)
1096 self.assertTrue('other' in h)
1097 self.assertEqual('Stuff', h['other'])
1098
1099 def testExpirationModelTransparent(self):
1100 # Test that no-cache makes our request TRANSPARENT
1101 response_headers = {
1102 'cache-control': 'max-age=7200'
1103 }
1104 request_headers = {
1105 'cache-control': 'no-cache'
1106 }
1107 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
1108
1109 def testMaxAgeNonNumeric(self):
1110 # Test that no-cache makes our request TRANSPARENT
1111 response_headers = {
1112 'cache-control': 'max-age=fred, min-fresh=barney'
1113 }
1114 request_headers = {
1115 }
1116 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1117
1118
1119 def testExpirationModelNoCacheResponse(self):
1120 # The date and expires point to an entry that should be
1121 # FRESH, but the no-cache over-rides that.
1122 now = time.time()
1123 response_headers = {
1124 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1125 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1126 'cache-control': 'no-cache'
1127 }
1128 request_headers = {
1129 }
1130 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1131
1132 def testExpirationModelStaleRequestMustReval(self):
1133 # must-revalidate forces STALE
1134 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
1135
1136 def testExpirationModelStaleResponseMustReval(self):
1137 # must-revalidate forces STALE
1138 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
1139
1140 def testExpirationModelFresh(self):
1141 response_headers = {
1142 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1143 'cache-control': 'max-age=2'
1144 }
1145 request_headers = {
1146 }
1147 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1148 time.sleep(3)
1149 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1150
1151 def testExpirationMaxAge0(self):
1152 response_headers = {
1153 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1154 'cache-control': 'max-age=0'
1155 }
1156 request_headers = {
1157 }
1158 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1159
1160 def testExpirationModelDateAndExpires(self):
1161 now = time.time()
1162 response_headers = {
1163 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1164 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1165 }
1166 request_headers = {
1167 }
1168 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1169 time.sleep(3)
1170 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1171
1172 def testExpiresZero(self):
1173 now = time.time()
1174 response_headers = {
1175 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1176 'expires': "0",
1177 }
1178 request_headers = {
1179 }
1180 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1181
1182 def testExpirationModelDateOnly(self):
1183 now = time.time()
1184 response_headers = {
1185 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
1186 }
1187 request_headers = {
1188 }
1189 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1190
1191 def testExpirationModelOnlyIfCached(self):
1192 response_headers = {
1193 }
1194 request_headers = {
1195 'cache-control': 'only-if-cached',
1196 }
1197 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1198
1199 def testExpirationModelMaxAgeBoth(self):
1200 now = time.time()
1201 response_headers = {
1202 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1203 'cache-control': 'max-age=2'
1204 }
1205 request_headers = {
1206 'cache-control': 'max-age=0'
1207 }
1208 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1209
1210 def testExpirationModelDateAndExpiresMinFresh1(self):
1211 now = time.time()
1212 response_headers = {
1213 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1214 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1215 }
1216 request_headers = {
1217 'cache-control': 'min-fresh=2'
1218 }
1219 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1220
1221 def testExpirationModelDateAndExpiresMinFresh2(self):
1222 now = time.time()
1223 response_headers = {
1224 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1225 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1226 }
1227 request_headers = {
1228 'cache-control': 'min-fresh=2'
1229 }
1230 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1231
1232 def testParseWWWAuthenticateEmpty(self):
1233 res = httplib2._parse_www_authenticate({})
1234 self.assertEqual(len(list(res.keys())), 0)
1235
1236 def testParseWWWAuthenticate(self):
1237 # different uses of spaces around commas
1238 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
1239 self.assertEqual(len(list(res.keys())), 1)
1240 self.assertEqual(len(list(res['test'].keys())), 5)
1241
1242 # tokens with non-alphanum
1243 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
1244 self.assertEqual(len(list(res.keys())), 1)
1245 self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
1246
1247 # quoted string with quoted pairs
1248 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
1249 self.assertEqual(len(list(res.keys())), 1)
1250 self.assertEqual(res['test']['realm'], 'a "test" realm')
1251
1252 def testParseWWWAuthenticateStrict(self):
1253 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
1254 self.testParseWWWAuthenticate();
1255 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
1256
1257 def testParseWWWAuthenticateBasic(self):
1258 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
1259 basic = res['basic']
1260 self.assertEqual('me', basic['realm'])
1261
1262 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
1263 basic = res['basic']
1264 self.assertEqual('me', basic['realm'])
1265 self.assertEqual('MD5', basic['algorithm'])
1266
1267 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
1268 basic = res['basic']
1269 self.assertEqual('me', basic['realm'])
1270 self.assertEqual('MD5', basic['algorithm'])
1271
1272 def testParseWWWAuthenticateBasic2(self):
1273 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
1274 basic = res['basic']
1275 self.assertEqual('me', basic['realm'])
1276 self.assertEqual('fred', basic['other'])
1277
1278 def testParseWWWAuthenticateBasic3(self):
1279 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
1280 basic = res['basic']
1281 self.assertEqual('me', basic['realm'])
1282
1283
1284 def testParseWWWAuthenticateDigest(self):
1285 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1286 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
1287 digest = res['digest']
1288 self.assertEqual('testrealm@host.com', digest['realm'])
1289 self.assertEqual('auth,auth-int', digest['qop'])
1290
1291
1292 def testParseWWWAuthenticateMultiple(self):
1293 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1294 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
1295 digest = res['digest']
1296 self.assertEqual('testrealm@host.com', digest['realm'])
1297 self.assertEqual('auth,auth-int', digest['qop'])
1298 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1299 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1300 basic = res['basic']
1301 self.assertEqual('me', basic['realm'])
1302
1303 def testParseWWWAuthenticateMultiple2(self):
1304 # Handle an added comma between challenges, which might get thrown in if the challenges were
1305 # originally sent in separate www-authenticate headers.
1306 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1307 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
1308 digest = res['digest']
1309 self.assertEqual('testrealm@host.com', digest['realm'])
1310 self.assertEqual('auth,auth-int', digest['qop'])
1311 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1312 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1313 basic = res['basic']
1314 self.assertEqual('me', basic['realm'])
1315
1316 def testParseWWWAuthenticateMultiple3(self):
1317 # Handle an added comma between challenges, which might get thrown in if the challenges were
1318 # originally sent in separate www-authenticate headers.
1319 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1320 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1321 digest = res['digest']
1322 self.assertEqual('testrealm@host.com', digest['realm'])
1323 self.assertEqual('auth,auth-int', digest['qop'])
1324 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1325 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1326 basic = res['basic']
1327 self.assertEqual('me', basic['realm'])
1328 wsse = res['wsse']
1329 self.assertEqual('foo', wsse['realm'])
1330 self.assertEqual('UsernameToken', wsse['profile'])
1331
1332 def testParseWWWAuthenticateMultiple4(self):
1333 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1334 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1335 digest = res['digest']
1336 self.assertEqual('test-real.m@host.com', digest['realm'])
1337 self.assertEqual('\tauth,auth-int', digest['qop'])
1338 self.assertEqual('(*)&^&$%#', digest['nonce'])
1339
1340 def testParseWWWAuthenticateMoreQuoteCombos(self):
1341 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
1342 digest = res['digest']
1343 self.assertEqual('myrealm', digest['realm'])
1344
1345 def testDigestObject(self):
1346 credentials = ('joe', 'password')
1347 host = None
1348 request_uri = '/projects/httplib2/test/digest/'
1349 headers = {}
1350 response = {
1351 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
1352 }
1353 content = b""
1354
1355 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1356 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1357 our_request = "Authorization: %s" % headers['Authorization']
1358 working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
1359 self.assertEqual(our_request, working_request)
1360
1361
1362 def testDigestObjectStale(self):
1363 credentials = ('joe', 'password')
1364 host = None
1365 request_uri = '/projects/httplib2/test/digest/'
1366 headers = {}
1367 response = httplib2.Response({ })
1368 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1369 response.status = 401
1370 content = b""
1371 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1372 # Returns true to force a retry
1373 self.assertTrue( d.response(response, content) )
1374
1375 def testDigestObjectAuthInfo(self):
1376 credentials = ('joe', 'password')
1377 host = None
1378 request_uri = '/projects/httplib2/test/digest/'
1379 headers = {}
1380 response = httplib2.Response({ })
1381 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1382 response['authentication-info'] = 'nextnonce="fred"'
1383 content = b""
1384 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1385 # Returns true to force a retry
1386 self.assertFalse( d.response(response, content) )
1387 self.assertEqual('fred', d.challenge['nonce'])
1388 self.assertEqual(1, d.challenge['nc'])
1389
1390 def testWsseAlgorithm(self):
1391 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
1392 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1393 self.assertEqual(expected, digest)
1394
1395 def testEnd2End(self):
1396 # one end to end header
1397 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
1398 end2end = httplib2._get_end2end_headers(response)
1399 self.assertTrue('content-type' in end2end)
1400 self.assertTrue('te' not in end2end)
1401 self.assertTrue('connection' not in end2end)
1402
1403 # one end to end header that gets eliminated
1404 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
1405 end2end = httplib2._get_end2end_headers(response)
1406 self.assertTrue('content-type' not in end2end)
1407 self.assertTrue('te' not in end2end)
1408 self.assertTrue('connection' not in end2end)
1409
1410 # Degenerate case of no headers
1411 response = {}
1412 end2end = httplib2._get_end2end_headers(response)
1413 self.assertEquals(0, len(end2end))
1414
1415 # Degenerate case of connection referrring to a header not passed in
1416 response = {'connection': 'content-type'}
1417 end2end = httplib2._get_end2end_headers(response)
1418 self.assertEquals(0, len(end2end))
1419
1420unittest.main()