blob: ebaf4163791009f2d993b6079142f1a814ea9c7c [file] [log] [blame]
pilgrim00a352e2009-05-29 04:04:44 +00001#!/usr/bin/env python3
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 3.0 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = ["Mark Pilgrim"]
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.2 ($Rev: 118 $)"
16
Joe Gregoriob6c90c42011-02-11 01:03:22 -050017import base64
pilgrim00a352e2009-05-29 04:04:44 +000018import http.client
19import httplib2
pilgrim00a352e2009-05-29 04:04:44 +000020import io
Joe Gregoriob6c90c42011-02-11 01:03:22 -050021import os
22import socket
23import sys
24import time
25import unittest
26import urllib.parse
pilgrim00a352e2009-05-29 04:04:44 +000027
28# The test resources base uri
29base = 'http://bitworking.org/projects/httplib2/test/'
30#base = 'http://localhost/projects/httplib2/test/'
31cacheDirName = ".cache"
32
33
34class CredentialsTest(unittest.TestCase):
35 def test(self):
36 c = httplib2.Credentials()
37 c.add("joe", "password")
38 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
39 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
40 c.add("fred", "password2", "wellformedweb.org")
41 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
42 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
43 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
44 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
45 c.clear()
46 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
47 c.add("fred", "password2", "wellformedweb.org")
48 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
49 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
50 self.assertEqual(0, len(list(c.iter(""))))
51
52
53class ParserTest(unittest.TestCase):
54 def testFromStd66(self):
55 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
56 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
57 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
58 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
59 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
60 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
61 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
62 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
63
64
65class UrlNormTest(unittest.TestCase):
66 def test(self):
67 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
68 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
69 self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
70 self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
71 self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
72 self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
73 try:
74 httplib2.urlnorm("/")
75 self.fail("Non-absolute URIs should raise an exception")
76 except httplib2.RelativeURIError:
77 pass
78
79class UrlSafenameTest(unittest.TestCase):
80 def test(self):
81 # Test that different URIs end up generating different safe names
82 self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
83 self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
84 self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
85 self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
86 self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
87 self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
88 # Test the max length limits
89 uri = "http://" + ("w" * 200) + ".org"
90 uri2 = "http://" + ("w" * 201) + ".org"
91 self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
92 # Max length should be 200 + 1 (",") + 32
93 self.assertEqual(233, len(httplib2.safename(uri2)))
94 self.assertEqual(233, len(httplib2.safename(uri)))
95 # Unicode
96 if sys.version_info >= (2,3):
97 self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
98
99class _MyResponse(io.BytesIO):
100 def __init__(self, body, **kwargs):
101 io.BytesIO.__init__(self, body)
102 self.headers = kwargs
103
104 def items(self):
105 return self.headers.items()
106
107 def iteritems(self):
108 return iter(self.headers.items())
109
110
111class _MyHTTPConnection(object):
112 "This class is just a mock of httplib.HTTPConnection used for testing"
113
114 def __init__(self, host, port=None, key_file=None, cert_file=None,
115 strict=None, timeout=None, proxy_info=None):
116 self.host = host
117 self.port = port
118 self.timeout = timeout
119 self.log = ""
120
121 def set_debuglevel(self, level):
122 pass
123
124 def connect(self):
125 "Connect to a host on a given port."
126 pass
127
128 def close(self):
129 pass
130
131 def request(self, method, request_uri, body, headers):
132 pass
133
134 def getresponse(self):
135 return _MyResponse(b"the body", status="200")
136
137
138class HttpTest(unittest.TestCase):
139 def setUp(self):
140 if os.path.exists(cacheDirName):
141 [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
142 self.http = httplib2.Http(cacheDirName)
143 self.http.clear_credentials()
144
Joe Gregoriof3ee17b2011-02-13 11:59:51 -0500145 def testIPv6NoSSL(self):
146 try:
147 self.http.request("http://[::1]/")
148 except socket.gaierror:
149 self.fail("should get the address family right for IPv6")
150 except socket.error:
151 # Even if IPv6 isn't installed on a machine it should just raise socket.error
152 pass
153
154 def testIPv6SSL(self):
155 try:
156 self.http.request("https://[::1]/")
157 except socket.gaierror:
158 self.fail("should get the address family right for IPv6")
159 except socket.error:
160 # Even if IPv6 isn't installed on a machine it should just raise socket.error
161 pass
162
pilgrim00a352e2009-05-29 04:04:44 +0000163 def testConnectionType(self):
164 self.http.force_exception_to_status_code = False
165 response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
166 self.assertEqual(response['content-location'], "http://bitworking.org")
167 self.assertEqual(content, b"the body")
168
169 def testGetUnknownServer(self):
170 self.http.force_exception_to_status_code = False
171 try:
172 self.http.request("http://fred.bitworking.org/")
173 self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
174 except httplib2.ServerNotFoundError:
175 pass
176
177 # Now test with exceptions turned off
178 self.http.force_exception_to_status_code = True
179
180 (response, content) = self.http.request("http://fred.bitworking.org/")
181 self.assertEqual(response['content-type'], 'text/plain')
182 self.assertTrue(content.startswith(b"Unable to find"))
183 self.assertEqual(response.status, 400)
184
Joe Gregoriob6c90c42011-02-11 01:03:22 -0500185 def testGetConnectionRefused(self):
186 self.http.force_exception_to_status_code = False
187 try:
188 self.http.request("http://localhost:7777/")
189 self.fail("An socket.error exception must be thrown on Connection Refused.")
190 except socket.error:
191 pass
192
193 # Now test with exceptions turned off
194 self.http.force_exception_to_status_code = True
195
196 (response, content) = self.http.request("http://localhost:7777/")
197 self.assertEqual(response['content-type'], 'text/plain')
198 self.assertTrue(b"Connection refused" in content)
199 self.assertEqual(response.status, 400)
200
pilgrim00a352e2009-05-29 04:04:44 +0000201 def testGetIRI(self):
202 if sys.version_info >= (2,3):
203 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
204 (response, content) = self.http.request(uri, "GET")
205 d = self.reflector(content)
206 self.assertTrue('QUERY_STRING' in d)
207 self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
208
209 def testGetIsDefaultMethod(self):
210 # Test that GET is the default method
211 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
212 (response, content) = self.http.request(uri)
213 self.assertEqual(response['x-method'], "GET")
214
215 def testDifferentMethods(self):
216 # Test that all methods can be used
217 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
218 for method in ["GET", "PUT", "DELETE", "POST"]:
219 (response, content) = self.http.request(uri, method, body=b" ")
220 self.assertEqual(response['x-method'], method)
221
Joe Gregoriob628c0b2009-07-16 12:28:04 -0400222 def testHeadRead(self):
223 # Test that we don't try to read the response of a HEAD request
224 # since httplib blocks response.read() for HEAD requests.
225 # Oddly enough this doesn't appear as a problem when doing HEAD requests
226 # against Apache servers.
227 uri = "http://www.google.com/"
228 (response, content) = self.http.request(uri, "HEAD")
229 self.assertEqual(response.status, 200)
230 self.assertEqual(content, b"")
231
pilgrim00a352e2009-05-29 04:04:44 +0000232 def testGetNoCache(self):
233 # Test that can do a GET w/o the cache turned on.
234 http = httplib2.Http()
235 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
236 (response, content) = http.request(uri, "GET")
237 self.assertEqual(response.status, 200)
238 self.assertEqual(response.previous, None)
239
Joe Gregorioe202d212009-07-16 14:57:52 -0400240 def testGetOnlyIfCachedCacheHit(self):
241 # Test that can do a GET with cache and 'only-if-cached'
242 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
243 (response, content) = self.http.request(uri, "GET")
244 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
245 self.assertEqual(response.fromcache, True)
246 self.assertEqual(response.status, 200)
247
pilgrim00a352e2009-05-29 04:04:44 +0000248 def testGetOnlyIfCachedCacheMiss(self):
249 # Test that can do a GET with no cache with 'only-if-cached'
pilgrim00a352e2009-05-29 04:04:44 +0000250 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorioe202d212009-07-16 14:57:52 -0400251 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
pilgrim00a352e2009-05-29 04:04:44 +0000252 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400253 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000254
255 def testGetOnlyIfCachedNoCacheAtAll(self):
256 # Test that can do a GET with no cache with 'only-if-cached'
257 # Of course, there might be an intermediary beyond us
258 # that responds to the 'only-if-cached', so this
259 # test can't really be guaranteed to pass.
260 http = httplib2.Http()
261 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
262 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
263 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400264 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000265
266 def testUserAgent(self):
267 # Test that we provide a default user-agent
268 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
269 (response, content) = self.http.request(uri, "GET")
270 self.assertEqual(response.status, 200)
271 self.assertTrue(content.startswith(b"Python-httplib2/"))
272
273 def testUserAgentNonDefault(self):
274 # Test that the default user-agent can be over-ridden
275
276 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
277 (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
278 self.assertEqual(response.status, 200)
279 self.assertTrue(content.startswith(b"fred/1.0"))
280
281 def testGet300WithLocation(self):
282 # Test the we automatically follow 300 redirects if a Location: header is provided
283 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
284 (response, content) = self.http.request(uri, "GET")
285 self.assertEqual(response.status, 200)
286 self.assertEqual(content, b"This is the final destination.\n")
287 self.assertEqual(response.previous.status, 300)
288 self.assertEqual(response.previous.fromcache, False)
289
290 # Confirm that the intermediate 300 is not cached
291 (response, content) = self.http.request(uri, "GET")
292 self.assertEqual(response.status, 200)
293 self.assertEqual(content, b"This is the final destination.\n")
294 self.assertEqual(response.previous.status, 300)
295 self.assertEqual(response.previous.fromcache, False)
296
297 def testGet300WithLocationNoRedirect(self):
298 # Test the we automatically follow 300 redirects if a Location: header is provided
299 self.http.follow_redirects = False
300 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
301 (response, content) = self.http.request(uri, "GET")
302 self.assertEqual(response.status, 300)
303
304 def testGet300WithoutLocation(self):
305 # Not giving a Location: header in a 300 response is acceptable
306 # In which case we just return the 300 response
307 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
308 (response, content) = self.http.request(uri, "GET")
309 self.assertEqual(response.status, 300)
310 self.assertTrue(response['content-type'].startswith("text/html"))
311 self.assertEqual(response.previous, None)
312
313 def testGet301(self):
314 # Test that we automatically follow 301 redirects
315 # and that we cache the 301 response
316 uri = urllib.parse.urljoin(base, "301/onestep.asis")
317 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
318 (response, content) = self.http.request(uri, "GET")
319 self.assertEqual(response.status, 200)
320 self.assertTrue('content-location' in response)
321 self.assertEqual(response['content-location'], destination)
322 self.assertEqual(content, b"This is the final destination.\n")
323 self.assertEqual(response.previous.status, 301)
324 self.assertEqual(response.previous.fromcache, False)
325
326 (response, content) = self.http.request(uri, "GET")
327 self.assertEqual(response.status, 200)
328 self.assertEqual(response['content-location'], destination)
329 self.assertEqual(content, b"This is the final destination.\n")
330 self.assertEqual(response.previous.status, 301)
331 self.assertEqual(response.previous.fromcache, True)
332
333
334 def testGet301NoRedirect(self):
335 # Test that we automatically follow 301 redirects
336 # and that we cache the 301 response
337 self.http.follow_redirects = False
338 uri = urllib.parse.urljoin(base, "301/onestep.asis")
339 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
340 (response, content) = self.http.request(uri, "GET")
341 self.assertEqual(response.status, 301)
342
343
344 def testGet302(self):
345 # Test that we automatically follow 302 redirects
346 # and that we DO NOT cache the 302 response
347 uri = urllib.parse.urljoin(base, "302/onestep.asis")
348 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
349 (response, content) = self.http.request(uri, "GET")
350 self.assertEqual(response.status, 200)
351 self.assertEqual(response['content-location'], destination)
352 self.assertEqual(content, b"This is the final destination.\n")
353 self.assertEqual(response.previous.status, 302)
354 self.assertEqual(response.previous.fromcache, False)
355
356 uri = urllib.parse.urljoin(base, "302/onestep.asis")
357 (response, content) = self.http.request(uri, "GET")
358 self.assertEqual(response.status, 200)
359 self.assertEqual(response.fromcache, True)
360 self.assertEqual(response['content-location'], destination)
361 self.assertEqual(content, b"This is the final destination.\n")
362 self.assertEqual(response.previous.status, 302)
363 self.assertEqual(response.previous.fromcache, False)
364 self.assertEqual(response.previous['content-location'], uri)
365
366 uri = urllib.parse.urljoin(base, "302/twostep.asis")
367
368 (response, content) = self.http.request(uri, "GET")
369 self.assertEqual(response.status, 200)
370 self.assertEqual(response.fromcache, True)
371 self.assertEqual(content, b"This is the final destination.\n")
372 self.assertEqual(response.previous.status, 302)
373 self.assertEqual(response.previous.fromcache, False)
374
375 def testGet302RedirectionLimit(self):
376 # Test that we can set a lower redirection limit
377 # and that we raise an exception when we exceed
378 # that limit.
379 self.http.force_exception_to_status_code = False
380
381 uri = urllib.parse.urljoin(base, "302/twostep.asis")
382 try:
383 (response, content) = self.http.request(uri, "GET", redirections = 1)
384 self.fail("This should not happen")
385 except httplib2.RedirectLimit:
386 pass
387 except Exception as e:
388 self.fail("Threw wrong kind of exception ")
389
390 # Re-run the test with out the exceptions
391 self.http.force_exception_to_status_code = True
392
393 (response, content) = self.http.request(uri, "GET", redirections = 1)
394 self.assertEqual(response.status, 500)
395 self.assertTrue(response.reason.startswith("Redirected more"))
396 self.assertEqual("302", response['status'])
397 self.assertTrue(content.startswith(b"<html>"))
398 self.assertTrue(response.previous != None)
399
400 def testGet302NoLocation(self):
401 # Test that we throw an exception when we get
402 # a 302 with no Location: header.
403 self.http.force_exception_to_status_code = False
404 uri = urllib.parse.urljoin(base, "302/no-location.asis")
405 try:
406 (response, content) = self.http.request(uri, "GET")
407 self.fail("Should never reach here")
408 except httplib2.RedirectMissingLocation:
409 pass
410 except Exception as e:
411 self.fail("Threw wrong kind of exception ")
412
413 # Re-run the test with out the exceptions
414 self.http.force_exception_to_status_code = True
415
416 (response, content) = self.http.request(uri, "GET")
417 self.assertEqual(response.status, 500)
418 self.assertTrue(response.reason.startswith("Redirected but"))
419 self.assertEqual("302", response['status'])
420 self.assertTrue(content.startswith(b"This is content"))
421
422 def testGet302ViaHttps(self):
423 # Google always redirects to http://google.com
424 (response, content) = self.http.request("https://google.com", "GET")
425 self.assertEqual(200, response.status)
426 self.assertEqual(302, response.previous.status)
427
428 def testGetViaHttps(self):
429 # Test that we can handle HTTPS
430 (response, content) = self.http.request("https://google.com/adsense/", "GET")
431 self.assertEqual(200, response.status)
432
433 def testGetViaHttpsSpecViolationOnLocation(self):
434 # Test that we follow redirects through HTTPS
435 # even if they violate the spec by including
436 # a relative Location: header instead of an
437 # absolute one.
438 (response, content) = self.http.request("https://google.com/adsense", "GET")
439 self.assertEqual(200, response.status)
440 self.assertNotEqual(None, response.previous)
441
442
443 def testGetViaHttpsKeyCert(self):
444 # At this point I can only test
445 # that the key and cert files are passed in
446 # correctly to httplib. It would be nice to have
447 # a real https endpoint to test against.
448 http = httplib2.Http(timeout=2)
449
450 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
451 try:
452 (response, content) = http.request("https://bitworking.org", "GET")
453 except:
454 pass
455 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
456 self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
457
458 try:
459 (response, content) = http.request("https://notthere.bitworking.org", "GET")
460 except:
461 pass
462 self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
463 self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
464
465
466
467
468 def testGet303(self):
469 # Do a follow-up GET on a Location: header
470 # returned from a POST that gave a 303.
471 uri = urllib.parse.urljoin(base, "303/303.cgi")
472 (response, content) = self.http.request(uri, "POST", " ")
473 self.assertEqual(response.status, 200)
474 self.assertEqual(content, b"This is the final destination.\n")
475 self.assertEqual(response.previous.status, 303)
476
477 def testGet303NoRedirect(self):
478 # Do a follow-up GET on a Location: header
479 # returned from a POST that gave a 303.
480 self.http.follow_redirects = False
481 uri = urllib.parse.urljoin(base, "303/303.cgi")
482 (response, content) = self.http.request(uri, "POST", " ")
483 self.assertEqual(response.status, 303)
484
485 def test303ForDifferentMethods(self):
486 # Test that all methods can be used
487 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
488 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
489 (response, content) = self.http.request(uri, method, body=b" ")
490 self.assertEqual(response['x-method'], method_on_303)
491
492 def testGet304(self):
493 # Test that we use ETags properly to validate our cache
494 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
495 (response, content) = self.http.request(uri, "GET")
496 self.assertNotEqual(response['etag'], "")
497
498 (response, content) = self.http.request(uri, "GET")
499 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
500 self.assertEqual(response.status, 200)
501 self.assertEqual(response.fromcache, True)
502
503 cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
504 f = open(cache_file_name, "r")
505 status_line = f.readline()
506 f.close()
507
508 self.assertTrue(status_line.startswith("status:"))
509
510 (response, content) = self.http.request(uri, "HEAD")
511 self.assertEqual(response.status, 200)
512 self.assertEqual(response.fromcache, True)
513
514 (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
515 self.assertEqual(response.status, 206)
516 self.assertEqual(response.fromcache, False)
517
518 def testGetIgnoreEtag(self):
519 # Test that we can forcibly ignore ETags
520 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
521 (response, content) = self.http.request(uri, "GET")
522 self.assertNotEqual(response['etag'], "")
523
524 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
525 d = self.reflector(content)
526 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
527
528 self.http.ignore_etag = True
529 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
530 d = self.reflector(content)
531 self.assertEqual(response.fromcache, False)
532 self.assertFalse('HTTP_IF_NONE_MATCH' in d)
533
534 def testOverrideEtag(self):
535 # Test that we can forcibly ignore ETags
536 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
537 (response, content) = self.http.request(uri, "GET")
538 self.assertNotEqual(response['etag'], "")
539
540 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
541 d = self.reflector(content)
542 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
543 self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
544
545 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
546 d = self.reflector(content)
547 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
548 self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
549
550#MAP-commented this out because it consistently fails
551# def testGet304EndToEnd(self):
552# # Test that end to end headers get overwritten in the cache
553# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
554# (response, content) = self.http.request(uri, "GET")
555# self.assertNotEqual(response['etag'], "")
556# old_date = response['date']
557# time.sleep(2)
558#
559# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
560# # The response should be from the cache, but the Date: header should be updated.
561# new_date = response['date']
562# self.assertNotEqual(new_date, old_date)
563# self.assertEqual(response.status, 200)
564# self.assertEqual(response.fromcache, True)
565
566 def testGet304LastModified(self):
567 # Test that we can still handle a 304
568 # by only using the last-modified cache validator.
569 uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
570 (response, content) = self.http.request(uri, "GET")
571
572 self.assertNotEqual(response['last-modified'], "")
573 (response, content) = self.http.request(uri, "GET")
574 (response, content) = self.http.request(uri, "GET")
575 self.assertEqual(response.status, 200)
576 self.assertEqual(response.fromcache, True)
577
578 def testGet307(self):
579 # Test that we do follow 307 redirects but
580 # do not cache the 307
581 uri = urllib.parse.urljoin(base, "307/onestep.asis")
582 (response, content) = self.http.request(uri, "GET")
583 self.assertEqual(response.status, 200)
584 self.assertEqual(content, b"This is the final destination.\n")
585 self.assertEqual(response.previous.status, 307)
586 self.assertEqual(response.previous.fromcache, False)
587
588 (response, content) = self.http.request(uri, "GET")
589 self.assertEqual(response.status, 200)
590 self.assertEqual(response.fromcache, True)
591 self.assertEqual(content, b"This is the final destination.\n")
592 self.assertEqual(response.previous.status, 307)
593 self.assertEqual(response.previous.fromcache, False)
594
595 def testGet410(self):
596 # Test that we pass 410's through
597 uri = urllib.parse.urljoin(base, "410/410.asis")
598 (response, content) = self.http.request(uri, "GET")
599 self.assertEqual(response.status, 410)
600
chris dent89f15142009-12-24 14:02:57 -0600601 def testVaryHeaderSimple(self):
602 """
603 RFC 2616 13.6
604 When the cache receives a subsequent request whose Request-URI
605 specifies one or more cache entries including a Vary header field,
606 the cache MUST NOT use such a cache entry to construct a response
607 to the new request unless all of the selecting request-headers
608 present in the new request match the corresponding stored
609 request-headers in the original request.
610 """
611 # test that the vary header is sent
612 uri = urllib.parse.urljoin(base, "vary/accept.asis")
613 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
614 self.assertEqual(response.status, 200)
615 self.assertTrue('vary' in response)
616
617 # get the resource again, from the cache since accept header in this
618 # request is the same as the request
619 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
620 self.assertEqual(response.status, 200)
621 self.assertEqual(response.fromcache, True, msg="Should be from cache")
622
623 # get the resource again, not from cache since Accept headers does not match
624 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
625 self.assertEqual(response.status, 200)
626 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
627
628 # get the resource again, without any Accept header, so again no match
629 (response, content) = self.http.request(uri, "GET")
630 self.assertEqual(response.status, 200)
631 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
632
633 def testNoVary(self):
634 # when there is no vary, a different Accept header (e.g.) should not
635 # impact if the cache is used
636 # test that the vary header is not sent
637 uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
638 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
639 self.assertEqual(response.status, 200)
640 self.assertFalse('vary' in response)
641
642 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
643 self.assertEqual(response.status, 200)
644 self.assertEqual(response.fromcache, True, msg="Should be from cache")
645
646 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
647 self.assertEqual(response.status, 200)
648 self.assertEqual(response.fromcache, True, msg="Should be from cache")
649
650 def testVaryHeaderDouble(self):
651 uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
652 (response, content) = self.http.request(uri, "GET", headers={
653 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
654 self.assertEqual(response.status, 200)
655 self.assertTrue('vary' in response)
656
657 # we are from cache
658 (response, content) = self.http.request(uri, "GET", headers={
659 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
660 self.assertEqual(response.fromcache, True, msg="Should be from cache")
661
662 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
663 self.assertEqual(response.status, 200)
664 self.assertEqual(response.fromcache, False)
665
666 # get the resource again, not from cache, varied headers don't match exact
667 (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
668 self.assertEqual(response.status, 200)
669 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
670
jcgregorio88ef89b2010-05-13 23:42:11 -0400671 def testVaryUnusedHeader(self):
672 # A header's value is not considered to vary if it's not used at all.
673 uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
674 (response, content) = self.http.request(uri, "GET", headers={
675 'Accept': 'text/plain'})
676 self.assertEqual(response.status, 200)
677 self.assertTrue('vary' in response)
678
679 # we are from cache
680 (response, content) = self.http.request(uri, "GET", headers={
681 'Accept': 'text/plain',})
682 self.assertEqual(response.fromcache, True, msg="Should be from cache")
683
pilgrim00a352e2009-05-29 04:04:44 +0000684 def testHeadGZip(self):
685 # Test that we don't try to decompress a HEAD response
686 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
687 (response, content) = self.http.request(uri, "HEAD")
688 self.assertEqual(response.status, 200)
689 self.assertNotEqual(int(response['content-length']), 0)
690 self.assertEqual(content, b"")
691
692 def testGetGZip(self):
693 # Test that we support gzip compression
694 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
695 (response, content) = self.http.request(uri, "GET")
696 self.assertEqual(response.status, 200)
697 self.assertFalse('content-encoding' in response)
698 self.assertTrue('-content-encoding' in response)
699 self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
700 self.assertEqual(content, b"This is the final destination.\n")
701
Joe Gregoriod1137c52011-02-13 19:27:35 -0500702 def testPostAndGZipResponse(self):
703 uri = urllib.parse.urljoin(base, "gzip/post.cgi")
704 (response, content) = self.http.request(uri, "POST", body=" ")
705 self.assertEqual(response.status, 200)
706 self.assertFalse('content-encoding' in response)
707 self.assertTrue('-content-encoding' in response)
708
pilgrim00a352e2009-05-29 04:04:44 +0000709 def testGetGZipFailure(self):
710 # Test that we raise a good exception when the gzip fails
711 self.http.force_exception_to_status_code = False
712 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
713 try:
714 (response, content) = self.http.request(uri, "GET")
715 self.fail("Should never reach here")
716 except httplib2.FailedToDecompressContent:
717 pass
718 except Exception:
719 self.fail("Threw wrong kind of exception")
720
721 # Re-run the test with out the exceptions
722 self.http.force_exception_to_status_code = True
723
724 (response, content) = self.http.request(uri, "GET")
725 self.assertEqual(response.status, 500)
726 self.assertTrue(response.reason.startswith("Content purported"))
727
728 def testTimeout(self):
729 self.http.force_exception_to_status_code = True
730 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
731 try:
732 import socket
733 socket.setdefaulttimeout(1)
734 except:
735 # Don't run the test if we can't set the timeout
736 return
737 (response, content) = self.http.request(uri)
738 self.assertEqual(response.status, 408)
739 self.assertTrue(response.reason.startswith("Request Timeout"))
740 self.assertTrue(content.startswith(b"Request Timeout"))
741
742 def testIndividualTimeout(self):
743 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
744 http = httplib2.Http(timeout=1)
745 http.force_exception_to_status_code = True
746
747 (response, content) = http.request(uri)
748 self.assertEqual(response.status, 408)
749 self.assertTrue(response.reason.startswith("Request Timeout"))
750 self.assertTrue(content.startswith(b"Request Timeout"))
751
752
753 def testGetDeflate(self):
754 # Test that we support deflate compression
755 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
756 (response, content) = self.http.request(uri, "GET")
757 self.assertEqual(response.status, 200)
758 self.assertFalse('content-encoding' in response)
759 self.assertEqual(int(response['content-length']), len("This is the final destination."))
760 self.assertEqual(content, b"This is the final destination.")
761
762 def testGetDeflateFailure(self):
763 # Test that we raise a good exception when the deflate fails
764 self.http.force_exception_to_status_code = False
765
766 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
767 try:
768 (response, content) = self.http.request(uri, "GET")
769 self.fail("Should never reach here")
770 except httplib2.FailedToDecompressContent:
771 pass
772 except Exception:
773 self.fail("Threw wrong kind of exception")
774
775 # Re-run the test with out the exceptions
776 self.http.force_exception_to_status_code = True
777
778 (response, content) = self.http.request(uri, "GET")
779 self.assertEqual(response.status, 500)
780 self.assertTrue(response.reason.startswith("Content purported"))
781
782 def testGetDuplicateHeaders(self):
783 # Test that duplicate headers get concatenated via ','
784 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
785 (response, content) = self.http.request(uri, "GET")
786 self.assertEqual(response.status, 200)
787 self.assertEqual(content, b"This is content\n")
788 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
789
790 def testGetCacheControlNoCache(self):
791 # Test Cache-Control: no-cache on requests
792 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
793 (response, content) = self.http.request(uri, "GET")
794 self.assertNotEqual(response['etag'], "")
795 (response, content) = self.http.request(uri, "GET")
796 self.assertEqual(response.status, 200)
797 self.assertEqual(response.fromcache, True)
798
799 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
800 self.assertEqual(response.status, 200)
801 self.assertEqual(response.fromcache, False)
802
803 def testGetCacheControlPragmaNoCache(self):
804 # Test Pragma: no-cache on requests
805 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
806 (response, content) = self.http.request(uri, "GET")
807 self.assertNotEqual(response['etag'], "")
808 (response, content) = self.http.request(uri, "GET")
809 self.assertEqual(response.status, 200)
810 self.assertEqual(response.fromcache, True)
811
812 (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
813 self.assertEqual(response.status, 200)
814 self.assertEqual(response.fromcache, False)
815
816 def testGetCacheControlNoStoreRequest(self):
817 # A no-store request means that the response should not be stored.
818 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
819
820 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
821 self.assertEqual(response.status, 200)
822 self.assertEqual(response.fromcache, False)
823
824 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
825 self.assertEqual(response.status, 200)
826 self.assertEqual(response.fromcache, False)
827
828 def testGetCacheControlNoStoreResponse(self):
829 # A no-store response means that the response should not be stored.
830 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
831
832 (response, content) = self.http.request(uri, "GET")
833 self.assertEqual(response.status, 200)
834 self.assertEqual(response.fromcache, False)
835
836 (response, content) = self.http.request(uri, "GET")
837 self.assertEqual(response.status, 200)
838 self.assertEqual(response.fromcache, False)
839
840 def testGetCacheControlNoCacheNoStoreRequest(self):
841 # Test that a no-store, no-cache clears the entry from the cache
842 # even if it was cached previously.
843 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
844
845 (response, content) = self.http.request(uri, "GET")
846 (response, content) = self.http.request(uri, "GET")
847 self.assertEqual(response.fromcache, True)
848 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
849 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
850 self.assertEqual(response.status, 200)
851 self.assertEqual(response.fromcache, False)
852
853 def testUpdateInvalidatesCache(self):
854 # Test that calling PUT or DELETE on a
855 # URI that is cache invalidates that cache.
856 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
857
858 (response, content) = self.http.request(uri, "GET")
859 (response, content) = self.http.request(uri, "GET")
860 self.assertEqual(response.fromcache, True)
861 (response, content) = self.http.request(uri, "DELETE")
862 self.assertEqual(response.status, 405)
863
864 (response, content) = self.http.request(uri, "GET")
865 self.assertEqual(response.fromcache, False)
866
867 def testUpdateUsesCachedETag(self):
868 # Test that we natively support http://www.w3.org/1999/04/Editing/
869 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
870
871 (response, content) = self.http.request(uri, "GET")
872 self.assertEqual(response.status, 200)
873 self.assertEqual(response.fromcache, False)
874 (response, content) = self.http.request(uri, "GET")
875 self.assertEqual(response.status, 200)
876 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400877 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000878 self.assertEqual(response.status, 200)
Joe Gregorio799b2072009-09-29 17:21:19 -0400879 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000880 self.assertEqual(response.status, 412)
881
882 def testUpdateUsesCachedETagAndOCMethod(self):
883 # Test that we natively support http://www.w3.org/1999/04/Editing/
884 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
885
886 (response, content) = self.http.request(uri, "GET")
887 self.assertEqual(response.status, 200)
888 self.assertEqual(response.fromcache, False)
889 (response, content) = self.http.request(uri, "GET")
890 self.assertEqual(response.status, 200)
891 self.assertEqual(response.fromcache, True)
892 self.http.optimistic_concurrency_methods.append("DELETE")
893 (response, content) = self.http.request(uri, "DELETE")
894 self.assertEqual(response.status, 200)
895
896
897 def testUpdateUsesCachedETagOverridden(self):
898 # Test that we natively support http://www.w3.org/1999/04/Editing/
899 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
900
901 (response, content) = self.http.request(uri, "GET")
902 self.assertEqual(response.status, 200)
903 self.assertEqual(response.fromcache, False)
904 (response, content) = self.http.request(uri, "GET")
905 self.assertEqual(response.status, 200)
906 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400907 (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
pilgrim00a352e2009-05-29 04:04:44 +0000908 self.assertEqual(response.status, 412)
909
910 def testBasicAuth(self):
911 # Test Basic Authentication
912 uri = urllib.parse.urljoin(base, "basic/file.txt")
913 (response, content) = self.http.request(uri, "GET")
914 self.assertEqual(response.status, 401)
915
916 uri = urllib.parse.urljoin(base, "basic/")
917 (response, content) = self.http.request(uri, "GET")
918 self.assertEqual(response.status, 401)
919
920 self.http.add_credentials('joe', 'password')
921 (response, content) = self.http.request(uri, "GET")
922 self.assertEqual(response.status, 200)
923
924 uri = urllib.parse.urljoin(base, "basic/file.txt")
925 (response, content) = self.http.request(uri, "GET")
926 self.assertEqual(response.status, 200)
927
928 def testBasicAuthWithDomain(self):
929 # Test Basic Authentication
930 uri = urllib.parse.urljoin(base, "basic/file.txt")
931 (response, content) = self.http.request(uri, "GET")
932 self.assertEqual(response.status, 401)
933
934 uri = urllib.parse.urljoin(base, "basic/")
935 (response, content) = self.http.request(uri, "GET")
936 self.assertEqual(response.status, 401)
937
938 self.http.add_credentials('joe', 'password', "example.org")
939 (response, content) = self.http.request(uri, "GET")
940 self.assertEqual(response.status, 401)
941
942 uri = urllib.parse.urljoin(base, "basic/file.txt")
943 (response, content) = self.http.request(uri, "GET")
944 self.assertEqual(response.status, 401)
945
946 domain = urllib.parse.urlparse(base)[1]
947 self.http.add_credentials('joe', 'password', domain)
948 (response, content) = self.http.request(uri, "GET")
949 self.assertEqual(response.status, 200)
950
951 uri = urllib.parse.urljoin(base, "basic/file.txt")
952 (response, content) = self.http.request(uri, "GET")
953 self.assertEqual(response.status, 200)
954
955
956
957
958
959
960 def testBasicAuthTwoDifferentCredentials(self):
961 # Test Basic Authentication with multiple sets of credentials
962 uri = urllib.parse.urljoin(base, "basic2/file.txt")
963 (response, content) = self.http.request(uri, "GET")
964 self.assertEqual(response.status, 401)
965
966 uri = urllib.parse.urljoin(base, "basic2/")
967 (response, content) = self.http.request(uri, "GET")
968 self.assertEqual(response.status, 401)
969
970 self.http.add_credentials('fred', 'barney')
971 (response, content) = self.http.request(uri, "GET")
972 self.assertEqual(response.status, 200)
973
974 uri = urllib.parse.urljoin(base, "basic2/file.txt")
975 (response, content) = self.http.request(uri, "GET")
976 self.assertEqual(response.status, 200)
977
978 def testBasicAuthNested(self):
979 # Test Basic Authentication with resources
980 # that are nested
981 uri = urllib.parse.urljoin(base, "basic-nested/")
982 (response, content) = self.http.request(uri, "GET")
983 self.assertEqual(response.status, 401)
984
985 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
986 (response, content) = self.http.request(uri, "GET")
987 self.assertEqual(response.status, 401)
988
989 # Now add in credentials one at a time and test.
990 self.http.add_credentials('joe', 'password')
991
992 uri = urllib.parse.urljoin(base, "basic-nested/")
993 (response, content) = self.http.request(uri, "GET")
994 self.assertEqual(response.status, 200)
995
996 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
997 (response, content) = self.http.request(uri, "GET")
998 self.assertEqual(response.status, 401)
999
1000 self.http.add_credentials('fred', 'barney')
1001
1002 uri = urllib.parse.urljoin(base, "basic-nested/")
1003 (response, content) = self.http.request(uri, "GET")
1004 self.assertEqual(response.status, 200)
1005
1006 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1007 (response, content) = self.http.request(uri, "GET")
1008 self.assertEqual(response.status, 200)
1009
1010 def testDigestAuth(self):
1011 # Test that we support Digest Authentication
1012 uri = urllib.parse.urljoin(base, "digest/")
1013 (response, content) = self.http.request(uri, "GET")
1014 self.assertEqual(response.status, 401)
1015
1016 self.http.add_credentials('joe', 'password')
1017 (response, content) = self.http.request(uri, "GET")
1018 self.assertEqual(response.status, 200)
1019
1020 uri = urllib.parse.urljoin(base, "digest/file.txt")
1021 (response, content) = self.http.request(uri, "GET")
1022
1023 def testDigestAuthNextNonceAndNC(self):
1024 # Test that if the server sets nextnonce that we reset
1025 # the nonce count back to 1
1026 uri = urllib.parse.urljoin(base, "digest/file.txt")
1027 self.http.add_credentials('joe', 'password')
1028 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1029 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1030 self.assertEqual(response.status, 200)
1031 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1032 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
1033 self.assertEqual(response.status, 200)
1034
1035 if 'nextnonce' in info:
1036 self.assertEqual(info2['nc'], 1)
1037
1038 def testDigestAuthStale(self):
1039 # Test that we can handle a nonce becoming stale
1040 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
1041 self.http.add_credentials('joe', 'password')
1042 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1043 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1044 self.assertEqual(response.status, 200)
1045
1046 time.sleep(3)
1047 # Sleep long enough that the nonce becomes stale
1048
1049 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1050 self.assertFalse(response.fromcache)
1051 self.assertTrue(response._stale_digest)
1052 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
1053 self.assertEqual(response.status, 200)
1054
1055 def reflector(self, content):
1056 return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
1057
1058 def testReflector(self):
1059 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1060 (response, content) = self.http.request(uri, "GET")
1061 d = self.reflector(content)
1062 self.assertTrue('HTTP_USER_AGENT' in d)
1063
Joe Gregorio84cc10a2009-09-01 13:02:49 -04001064
1065 def testConnectionClose(self):
1066 uri = "http://www.google.com/"
1067 (response, content) = self.http.request(uri, "GET")
1068 for c in self.http.connections.values():
1069 self.assertNotEqual(None, c.sock)
1070 (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
1071 for c in self.http.connections.values():
1072 self.assertEqual(None, c.sock)
1073
pilgrim00a352e2009-05-29 04:04:44 +00001074try:
1075 import memcache
1076 class HttpTestMemCached(HttpTest):
1077 def setUp(self):
1078 self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
1079 #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1080 self.http = httplib2.Http(self.cache)
1081 self.cache.flush_all()
1082 # Not exactly sure why the sleep is needed here, but
1083 # if not present then some unit tests that rely on caching
1084 # fail. Memcached seems to lose some sets immediately
1085 # after a flush_all if the set is to a value that
1086 # was previously cached. (Maybe the flush is handled async?)
1087 time.sleep(1)
1088 self.http.clear_credentials()
1089except:
1090 pass
1091
1092
1093
1094# ------------------------------------------------------------------------
1095
1096class HttpPrivateTest(unittest.TestCase):
1097
1098 def testParseCacheControl(self):
1099 # Test that we can parse the Cache-Control header
1100 self.assertEqual({}, httplib2._parse_cache_control({}))
1101 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
1102 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
1103 self.assertEqual(cc['no-cache'], 1)
1104 self.assertEqual(cc['max-age'], '7200')
1105 cc = httplib2._parse_cache_control({'cache-control': ' , '})
1106 self.assertEqual(cc[''], 1)
1107
Joe Gregorioe314e8b2009-07-16 20:11:28 -04001108 try:
1109 cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
1110 self.assertTrue("max-age" in cc)
1111 except:
1112 self.fail("Should not throw exception")
1113
1114
1115
1116
pilgrim00a352e2009-05-29 04:04:44 +00001117 def testNormalizeHeaders(self):
1118 # Test that we normalize headers to lowercase
1119 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
1120 self.assertTrue('cache-control' in h)
1121 self.assertTrue('other' in h)
1122 self.assertEqual('Stuff', h['other'])
1123
1124 def testExpirationModelTransparent(self):
1125 # Test that no-cache makes our request TRANSPARENT
1126 response_headers = {
1127 'cache-control': 'max-age=7200'
1128 }
1129 request_headers = {
1130 'cache-control': 'no-cache'
1131 }
1132 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
1133
1134 def testMaxAgeNonNumeric(self):
1135 # Test that no-cache makes our request TRANSPARENT
1136 response_headers = {
1137 'cache-control': 'max-age=fred, min-fresh=barney'
1138 }
1139 request_headers = {
1140 }
1141 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1142
1143
1144 def testExpirationModelNoCacheResponse(self):
1145 # The date and expires point to an entry that should be
1146 # FRESH, but the no-cache over-rides that.
1147 now = time.time()
1148 response_headers = {
1149 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1150 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1151 'cache-control': 'no-cache'
1152 }
1153 request_headers = {
1154 }
1155 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1156
1157 def testExpirationModelStaleRequestMustReval(self):
1158 # must-revalidate forces STALE
1159 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
1160
1161 def testExpirationModelStaleResponseMustReval(self):
1162 # must-revalidate forces STALE
1163 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
1164
1165 def testExpirationModelFresh(self):
1166 response_headers = {
1167 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1168 'cache-control': 'max-age=2'
1169 }
1170 request_headers = {
1171 }
1172 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1173 time.sleep(3)
1174 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1175
1176 def testExpirationMaxAge0(self):
1177 response_headers = {
1178 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1179 'cache-control': 'max-age=0'
1180 }
1181 request_headers = {
1182 }
1183 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1184
1185 def testExpirationModelDateAndExpires(self):
1186 now = time.time()
1187 response_headers = {
1188 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1189 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1190 }
1191 request_headers = {
1192 }
1193 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1194 time.sleep(3)
1195 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1196
1197 def testExpiresZero(self):
1198 now = time.time()
1199 response_headers = {
1200 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1201 'expires': "0",
1202 }
1203 request_headers = {
1204 }
1205 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1206
1207 def testExpirationModelDateOnly(self):
1208 now = time.time()
1209 response_headers = {
1210 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
1211 }
1212 request_headers = {
1213 }
1214 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1215
1216 def testExpirationModelOnlyIfCached(self):
1217 response_headers = {
1218 }
1219 request_headers = {
1220 'cache-control': 'only-if-cached',
1221 }
1222 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1223
1224 def testExpirationModelMaxAgeBoth(self):
1225 now = time.time()
1226 response_headers = {
1227 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1228 'cache-control': 'max-age=2'
1229 }
1230 request_headers = {
1231 'cache-control': 'max-age=0'
1232 }
1233 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1234
1235 def testExpirationModelDateAndExpiresMinFresh1(self):
1236 now = time.time()
1237 response_headers = {
1238 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1239 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1240 }
1241 request_headers = {
1242 'cache-control': 'min-fresh=2'
1243 }
1244 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1245
1246 def testExpirationModelDateAndExpiresMinFresh2(self):
1247 now = time.time()
1248 response_headers = {
1249 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1250 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1251 }
1252 request_headers = {
1253 'cache-control': 'min-fresh=2'
1254 }
1255 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1256
1257 def testParseWWWAuthenticateEmpty(self):
1258 res = httplib2._parse_www_authenticate({})
1259 self.assertEqual(len(list(res.keys())), 0)
1260
1261 def testParseWWWAuthenticate(self):
1262 # different uses of spaces around commas
1263 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
1264 self.assertEqual(len(list(res.keys())), 1)
1265 self.assertEqual(len(list(res['test'].keys())), 5)
1266
1267 # tokens with non-alphanum
1268 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
1269 self.assertEqual(len(list(res.keys())), 1)
1270 self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
1271
1272 # quoted string with quoted pairs
1273 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
1274 self.assertEqual(len(list(res.keys())), 1)
1275 self.assertEqual(res['test']['realm'], 'a "test" realm')
1276
1277 def testParseWWWAuthenticateStrict(self):
1278 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
1279 self.testParseWWWAuthenticate();
1280 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
1281
1282 def testParseWWWAuthenticateBasic(self):
1283 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
1284 basic = res['basic']
1285 self.assertEqual('me', basic['realm'])
1286
1287 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
1288 basic = res['basic']
1289 self.assertEqual('me', basic['realm'])
1290 self.assertEqual('MD5', basic['algorithm'])
1291
1292 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
1293 basic = res['basic']
1294 self.assertEqual('me', basic['realm'])
1295 self.assertEqual('MD5', basic['algorithm'])
1296
1297 def testParseWWWAuthenticateBasic2(self):
1298 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
1299 basic = res['basic']
1300 self.assertEqual('me', basic['realm'])
1301 self.assertEqual('fred', basic['other'])
1302
1303 def testParseWWWAuthenticateBasic3(self):
1304 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
1305 basic = res['basic']
1306 self.assertEqual('me', basic['realm'])
1307
1308
1309 def testParseWWWAuthenticateDigest(self):
1310 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1311 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
1312 digest = res['digest']
1313 self.assertEqual('testrealm@host.com', digest['realm'])
1314 self.assertEqual('auth,auth-int', digest['qop'])
1315
1316
1317 def testParseWWWAuthenticateMultiple(self):
1318 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1319 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
1320 digest = res['digest']
1321 self.assertEqual('testrealm@host.com', digest['realm'])
1322 self.assertEqual('auth,auth-int', digest['qop'])
1323 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1324 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1325 basic = res['basic']
1326 self.assertEqual('me', basic['realm'])
1327
1328 def testParseWWWAuthenticateMultiple2(self):
1329 # Handle an added comma between challenges, which might get thrown in if the challenges were
1330 # originally sent in separate www-authenticate headers.
1331 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1332 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
1333 digest = res['digest']
1334 self.assertEqual('testrealm@host.com', digest['realm'])
1335 self.assertEqual('auth,auth-int', digest['qop'])
1336 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1337 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1338 basic = res['basic']
1339 self.assertEqual('me', basic['realm'])
1340
1341 def testParseWWWAuthenticateMultiple3(self):
1342 # Handle an added comma between challenges, which might get thrown in if the challenges were
1343 # originally sent in separate www-authenticate headers.
1344 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1345 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1346 digest = res['digest']
1347 self.assertEqual('testrealm@host.com', digest['realm'])
1348 self.assertEqual('auth,auth-int', digest['qop'])
1349 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1350 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1351 basic = res['basic']
1352 self.assertEqual('me', basic['realm'])
1353 wsse = res['wsse']
1354 self.assertEqual('foo', wsse['realm'])
1355 self.assertEqual('UsernameToken', wsse['profile'])
1356
1357 def testParseWWWAuthenticateMultiple4(self):
1358 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1359 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1360 digest = res['digest']
1361 self.assertEqual('test-real.m@host.com', digest['realm'])
1362 self.assertEqual('\tauth,auth-int', digest['qop'])
1363 self.assertEqual('(*)&^&$%#', digest['nonce'])
1364
1365 def testParseWWWAuthenticateMoreQuoteCombos(self):
1366 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
1367 digest = res['digest']
1368 self.assertEqual('myrealm', digest['realm'])
1369
1370 def testDigestObject(self):
1371 credentials = ('joe', 'password')
1372 host = None
1373 request_uri = '/projects/httplib2/test/digest/'
1374 headers = {}
1375 response = {
1376 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
1377 }
1378 content = b""
1379
1380 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1381 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1382 our_request = "Authorization: %s" % headers['Authorization']
1383 working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
1384 self.assertEqual(our_request, working_request)
1385
1386
1387 def testDigestObjectStale(self):
1388 credentials = ('joe', 'password')
1389 host = None
1390 request_uri = '/projects/httplib2/test/digest/'
1391 headers = {}
1392 response = httplib2.Response({ })
1393 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1394 response.status = 401
1395 content = b""
1396 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1397 # Returns true to force a retry
1398 self.assertTrue( d.response(response, content) )
1399
1400 def testDigestObjectAuthInfo(self):
1401 credentials = ('joe', 'password')
1402 host = None
1403 request_uri = '/projects/httplib2/test/digest/'
1404 headers = {}
1405 response = httplib2.Response({ })
1406 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1407 response['authentication-info'] = 'nextnonce="fred"'
1408 content = b""
1409 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1410 # Returns true to force a retry
1411 self.assertFalse( d.response(response, content) )
1412 self.assertEqual('fred', d.challenge['nonce'])
1413 self.assertEqual(1, d.challenge['nc'])
1414
1415 def testWsseAlgorithm(self):
1416 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
1417 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1418 self.assertEqual(expected, digest)
1419
1420 def testEnd2End(self):
1421 # one end to end header
1422 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
1423 end2end = httplib2._get_end2end_headers(response)
1424 self.assertTrue('content-type' in end2end)
1425 self.assertTrue('te' not in end2end)
1426 self.assertTrue('connection' not in end2end)
1427
1428 # one end to end header that gets eliminated
1429 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
1430 end2end = httplib2._get_end2end_headers(response)
1431 self.assertTrue('content-type' not in end2end)
1432 self.assertTrue('te' not in end2end)
1433 self.assertTrue('connection' not in end2end)
1434
1435 # Degenerate case of no headers
1436 response = {}
1437 end2end = httplib2._get_end2end_headers(response)
1438 self.assertEquals(0, len(end2end))
1439
1440 # Degenerate case of connection referrring to a header not passed in
1441 response = {'connection': 'content-type'}
1442 end2end = httplib2._get_end2end_headers(response)
1443 self.assertEquals(0, len(end2end))
1444
1445unittest.main()