blob: 20820bec9eadc567150d6475aa6b82a07e6d64f2 [file] [log] [blame]
jcgregorio2d66d4f2006-02-07 05:34:14 +00001#!/usr/bin/env python2.4
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 2.4 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = []
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.1 ($Rev: 118 $)"
16
17
18import unittest, httplib2, os, urlparse, time, base64
19
jcgregorio8421f272006-02-14 18:19:51 +000020
21# Python 2.3 support
22if not hasattr(unittest.TestCase, 'assertTrue'):
23 unittest.TestCase.assertTrue = unittest.TestCase.failUnless
24 unittest.TestCase.assertFalse = unittest.TestCase.failIf
25
jcgregorio2d66d4f2006-02-07 05:34:14 +000026# The test resources base uri
27base = 'http://bitworking.org/projects/httplib2/test/'
28#base = 'http://localhost/projects/httplib2/test/'
29
30class ParserTest(unittest.TestCase):
31 def testFromStd66(self):
32 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
33 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
34 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
35 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
36 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
37 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
38 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
39 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
40
41http = httplib2.Http(".cache")
42
43class HttpTest(unittest.TestCase):
44 def setUp(self):
45 [os.remove(os.path.join(".cache", file)) for file in os.listdir(".cache")]
46 http.clear_credentials()
47
48 def testGetIsDefaultMethod(self):
49 # Test that GET is the default method
50 uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
51 (response, content) = http.request(uri)
52 self.assertEqual(response['x-method'], "GET")
53
54 def testDifferentMethods(self):
55 # Test that all methods can be used
56 uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
57 for method in ["GET", "PUT", "DELETE", "POST"]:
58 (response, content) = http.request(uri, method, body=" ")
59 self.assertEqual(response['x-method'], method)
60
61 def testGetNoCache(self):
62 # Test that can do a GET w/o the cache turned on.
63 http = httplib2.Http()
64 uri = urlparse.urljoin(base, "304/test_etag.txt")
65 (response, content) = http.request(uri, "GET")
66 self.assertEqual(response.status, 200)
67 self.assertEqual(response._previous, None)
68
jcgregorioe4ce13e2006-04-02 03:05:08 +000069 def testGetOnlyIfCachedCacheMiss(self):
70 # Test that can do a GET with no cache with 'only-if-cached'
71 http = httplib2.Http(".cache")
72 uri = urlparse.urljoin(base, "304/test_etag.txt")
73 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
74 self.assertEqual(response.fromcache, False)
75 self.assertEqual(response.status, 504)
76
77 def testGetOnlyIfCachedNoCacheAtAll(self):
78 # Test that can do a GET with no cache with 'only-if-cached'
79 # Of course, there might be an intermediary beyond us
80 # that responds to the 'only-if-cached', so this
81 # test can't really be guaranteed to pass.
82 http = httplib2.Http()
83 uri = urlparse.urljoin(base, "304/test_etag.txt")
84 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
85 self.assertEqual(response.fromcache, False)
86 self.assertEqual(response.status, 200)
87
jcgregorio2d66d4f2006-02-07 05:34:14 +000088 def testUserAgent(self):
89 # Test that we provide a default user-agent
90 uri = urlparse.urljoin(base, "user-agent/test.cgi")
91 (response, content) = http.request(uri, "GET")
92 self.assertEqual(response.status, 200)
93 self.assertTrue(content.startswith("Python-httplib2/"))
94
95 def testUserAgentNonDefault(self):
96 # Test that the default user-agent can be over-ridden
97 uri = urlparse.urljoin(base, "user-agent/test.cgi")
98 (response, content) = http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
99 self.assertEqual(response.status, 200)
100 self.assertTrue(content.startswith("fred/1.0"))
101
102 def testGet300WithLocation(self):
103 # Test the we automatically follow 300 redirects if a Location: header is provided
104 uri = urlparse.urljoin(base, "300/with-location-header.asis")
105 (response, content) = http.request(uri, "GET")
106 self.assertEqual(response.status, 200)
107 self.assertEqual(content, "This is the final destination.\n")
108 self.assertEqual(response._previous.status, 300)
109 self.assertEqual(response._previous.fromcache, False)
110
111 # Confirm that the intermediate 300 is not cached
112 (response, content) = http.request(uri, "GET")
113 self.assertEqual(response.status, 200)
114 self.assertEqual(content, "This is the final destination.\n")
115 self.assertEqual(response._previous.status, 300)
116 self.assertEqual(response._previous.fromcache, False)
117
118 def testGet300WithoutLocation(self):
119 # Not giving a Location: header in a 300 response is acceptable
120 # In which case we just return the 300 response
121 uri = urlparse.urljoin(base, "300/without-location-header.asis")
122 (response, content) = http.request(uri, "GET")
123 self.assertEqual(response.status, 300)
124 self.assertTrue(response['content-type'].startswith("text/html"))
125 self.assertEqual(response._previous, None)
126
127 def testGet301(self):
128 # Test that we automatically follow 301 redirects
129 # and that we cache the 301 response
130 uri = urlparse.urljoin(base, "301/onestep.asis")
131 (response, content) = http.request(uri, "GET")
132 self.assertEqual(response.status, 200)
133 self.assertEqual(content, "This is the final destination.\n")
134 self.assertEqual(response._previous.status, 301)
135 self.assertEqual(response._previous.fromcache, False)
136
137 (response, content) = http.request(uri, "GET")
138 self.assertEqual(response.status, 200)
139 self.assertEqual(content, "This is the final destination.\n")
140 self.assertEqual(response._previous.status, 301)
141 self.assertEqual(response._previous.fromcache, True)
142
143 def testGet302(self):
144 # Test that we automatically follow 302 redirects
145 # and that we DO NOT cache the 302 response
146 uri = urlparse.urljoin(base, "302/onestep.asis")
147 (response, content) = http.request(uri, "GET")
148 self.assertEqual(response.status, 200)
149 self.assertEqual(content, "This is the final destination.\n")
150 self.assertEqual(response._previous.status, 302)
151 self.assertEqual(response._previous.fromcache, False)
152
153 uri = urlparse.urljoin(base, "302/onestep.asis")
154 (response, content) = http.request(uri, "GET")
155 self.assertEqual(response.status, 200)
156 self.assertEqual(response.fromcache, True)
157 self.assertEqual(content, "This is the final destination.\n")
158 self.assertEqual(response._previous.status, 302)
159 self.assertEqual(response._previous.fromcache, False)
160
161 uri = urlparse.urljoin(base, "302/twostep.asis")
162
163 (response, content) = http.request(uri, "GET")
164 self.assertEqual(response.status, 200)
165 self.assertEqual(response.fromcache, True)
166 self.assertEqual(content, "This is the final destination.\n")
167 self.assertEqual(response._previous.status, 302)
168 self.assertEqual(response._previous.fromcache, False)
169
170 def testGet302RedirectionLimit(self):
171 # Test that we can set a lower redirection limit
172 # and that we raise an exception when we exceed
173 # that limit.
174 uri = urlparse.urljoin(base, "302/twostep.asis")
175 try:
176 (response, content) = http.request(uri, "GET", redirections = 1)
177 self.fail("This should not happen")
178 except httplib2.RedirectLimit:
179 pass
180 except Exception, e:
181 self.fail("Threw wrong kind of exception ")
182
183 def testGet302NoLocation(self):
184 # Test that we throw an exception when we get
185 # a 302 with no Location: header.
186 uri = urlparse.urljoin(base, "302/no-location.asis")
187 try:
188 (response, content) = http.request(uri, "GET")
189 self.fail("Should never reach here")
190 except httplib2.RedirectMissingLocation:
191 pass
192 except Exception, e:
193 self.fail("Threw wrong kind of exception ")
194
195 def testGet302ViaHttps(self):
196 # goole always redirects to http://google.com
197 (response, content) = http.request("https://google.com", "GET")
198 self.assertEqual(200, response.status)
199 self.assertEqual(302, response._previous.status)
200
201 def testGetViaHttps(self):
202 # Test that we can handle HTTPS
203 (response, content) = http.request("https://google.com/adsense/", "GET")
204 self.assertEqual(200, response.status)
205 self.assertEqual(None, response._previous)
206
207 def testGetViaHttpsSpecViolationOnLocation(self):
208 # Test that we follow redirects through HTTPS
209 # even if they violate the spec by including
210 # a relative Location: header instead of an
211 # absolute one.
212 (response, content) = http.request("https://google.com/adsense", "GET")
213 self.assertEqual(200, response.status)
214 self.assertNotEqual(None, response._previous)
215
216 def testGet303(self):
217 # Do a follow-up GET on a Location: header
218 # returned from a POST that gave a 303.
219 uri = urlparse.urljoin(base, "303/303.cgi")
220 (response, content) = http.request(uri, "POST", " ")
221 self.assertEqual(response.status, 200)
222 self.assertEqual(content, "This is the final destination.\n")
223 self.assertEqual(response._previous.status, 303)
224
225 def test303ForDifferentMethods(self):
226 # Test that all methods can be used
227 uri = urlparse.urljoin(base, "303/redirect-to-reflector.cgi")
228 # HEAD really does send a HEAD, but apparently Apache changes
229 # every HEAD into a GET, so our script returns x-method: GET.
230 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
231 (response, content) = http.request(uri, method, body=" ")
232 self.assertEqual(response['x-method'], method_on_303)
233
234 def testGet304(self):
235 # Test that we use ETags properly to validate our cache
236 uri = urlparse.urljoin(base, "304/test_etag.txt")
237 (response, content) = http.request(uri, "GET")
238 self.assertNotEqual(response['etag'], "")
239
240 (response, content) = http.request(uri, "GET")
241 (response, content) = http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
242 self.assertEqual(response.status, 200)
243 self.assertEqual(response.fromcache, True)
244
245 (response, content) = http.request(uri, "HEAD")
246 self.assertEqual(response.status, 200)
247 self.assertEqual(response.fromcache, True)
248
249 (response, content) = http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
250 self.assertEqual(response.status, 206)
251 self.assertEqual(response.fromcache, False)
252
253 def testGet304EndToEnd(self):
254 # Test that end to end headers get overwritten in the cache
255 uri = urlparse.urljoin(base, "304/end2end.cgi")
256 (response, content) = http.request(uri, "GET")
257 self.assertNotEqual(response['etag'], "")
258 old_date = response['date']
259 time.sleep(2)
260
261 (response, content) = http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
262 # The response should be from the cache, but the Date: header should be updated.
263 new_date = response['date']
264 self.assertNotEqual(new_date, old_date)
265 self.assertEqual(response.status, 200)
266 self.assertEqual(response.fromcache, True)
267
268 def testGet304LastModified(self):
269 # Test that we can still handle a 304
270 # by only using the last-modified cache validator.
271 uri = urlparse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
272 (response, content) = http.request(uri, "GET")
273
274 self.assertNotEqual(response['last-modified'], "")
275 (response, content) = http.request(uri, "GET")
276 (response, content) = http.request(uri, "GET")
277 self.assertEqual(response.status, 200)
278 self.assertEqual(response.fromcache, True)
279
280 def testGet307(self):
281 # Test that we do follow 307 redirects but
282 # do not cache the 307
283 uri = urlparse.urljoin(base, "307/onestep.asis")
284 (response, content) = http.request(uri, "GET")
285 self.assertEqual(response.status, 200)
286 self.assertEqual(content, "This is the final destination.\n")
287 self.assertEqual(response._previous.status, 307)
288 self.assertEqual(response._previous.fromcache, False)
289
290 (response, content) = http.request(uri, "GET")
291 self.assertEqual(response.status, 200)
292 self.assertEqual(response.fromcache, True)
293 self.assertEqual(content, "This is the final destination.\n")
294 self.assertEqual(response._previous.status, 307)
295 self.assertEqual(response._previous.fromcache, False)
296
297 def testGet410(self):
298 # Test that we pass 410's through
299 uri = urlparse.urljoin(base, "410/410.asis")
300 (response, content) = http.request(uri, "GET")
301 self.assertEqual(response.status, 410)
302
303 def testGetGZip(self):
304 # Test that we support gzip compression
305 uri = urlparse.urljoin(base, "gzip/final-destination.txt")
306 (response, content) = http.request(uri, "GET")
307 self.assertEqual(response.status, 200)
308 self.assertEqual(response['content-encoding'], "gzip")
309 self.assertEqual(content, "This is the final destination.\n")
310
311 def testGetGZipFailure(self):
312 # Test that we raise a good exception when the gzip fails
313 uri = urlparse.urljoin(base, "gzip/failed-compression.asis")
314 try:
315 (response, content) = http.request(uri, "GET")
316 self.fail("Should never reach here")
317 except httplib2.FailedToDecompressContent:
318 pass
319 except Exception:
320 self.fail("Threw wrong kind of exception")
321
322 def testGetDeflate(self):
323 # Test that we support deflate compression
324 uri = urlparse.urljoin(base, "deflate/deflated.asis")
325 (response, content) = http.request(uri, "GET")
326 self.assertEqual(response.status, 200)
327 self.assertEqual(response['content-encoding'], "deflate")
328 self.assertEqual(content, "This is the final destination.")
329
330 def testGetDeflateFailure(self):
331 # Test that we raise a good exception when the deflate fails
332 uri = urlparse.urljoin(base, "deflate/deflated.asis")
333 uri = urlparse.urljoin(base, "deflate/failed-compression.asis")
334 try:
335 (response, content) = http.request(uri, "GET")
336 self.fail("Should never reach here")
337 except httplib2.FailedToDecompressContent:
338 pass
339 except Exception:
340 self.fail("Threw wrong kind of exception")
341
342 def testGetDuplicateHeaders(self):
343 # Test that duplicate headers get concatenated via ','
344 uri = urlparse.urljoin(base, "duplicate-headers/multilink.asis")
345 (response, content) = http.request(uri, "GET")
346 self.assertEqual(response.status, 200)
347 self.assertEqual(content, "This is content\n")
348 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
349
350 def testGetCacheControlNoCache(self):
351 # Test Cache-Control: no-cache on requests
352 uri = urlparse.urljoin(base, "304/test_etag.txt")
353 (response, content) = http.request(uri, "GET")
354 self.assertNotEqual(response['etag'], "")
355 (response, content) = http.request(uri, "GET")
356 self.assertEqual(response.status, 200)
357 self.assertEqual(response.fromcache, True)
358
359 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
360 self.assertEqual(response.status, 200)
361 self.assertEqual(response.fromcache, False)
362
363 def testGetCacheControlPragmaNoCache(self):
364 # Test Pragma: no-cache on requests
365 uri = urlparse.urljoin(base, "304/test_etag.txt")
366 (response, content) = http.request(uri, "GET")
367 self.assertNotEqual(response['etag'], "")
368 (response, content) = http.request(uri, "GET")
369 self.assertEqual(response.status, 200)
370 self.assertEqual(response.fromcache, True)
371
372 (response, content) = http.request(uri, "GET", headers={'Pragma': 'no-cache'})
373 self.assertEqual(response.status, 200)
374 self.assertEqual(response.fromcache, False)
375
376 def testGetCacheControlNoStoreRequest(self):
377 # A no-store request means that the response should not be stored.
378 uri = urlparse.urljoin(base, "304/test_etag.txt")
379
380 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
381 self.assertEqual(response.status, 200)
382 self.assertEqual(response.fromcache, False)
383
384 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
385 self.assertEqual(response.status, 200)
386 self.assertEqual(response.fromcache, False)
387
388 def testGetCacheControlNoStoreResponse(self):
389 # A no-store response means that the response should not be stored.
390 uri = urlparse.urljoin(base, "no-store/no-store.asis")
391
392 (response, content) = http.request(uri, "GET")
393 self.assertEqual(response.status, 200)
394 self.assertEqual(response.fromcache, False)
395
396 (response, content) = http.request(uri, "GET")
397 self.assertEqual(response.status, 200)
398 self.assertEqual(response.fromcache, False)
399 self.assertEqual(0, len(os.listdir(".cache")))
400
401 def testGetCacheControlNoCacheNoStoreRequest(self):
402 # Test that a no-store, no-cache clears the entry from the cache
403 # even if it was cached previously.
404 uri = urlparse.urljoin(base, "304/test_etag.txt")
405
406 (response, content) = http.request(uri, "GET")
407 (response, content) = http.request(uri, "GET")
408 self.assertEqual(response.fromcache, True)
409 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
410 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
411 self.assertEqual(response.status, 200)
412 self.assertEqual(response.fromcache, False)
413 self.assertEqual(0, len(os.listdir(".cache")))
414
415 def testUpdateInvalidatesCache(self):
416 # Test that calling PUT or DELETE on a
417 # URI that is cache invalidates that cache.
418 uri = urlparse.urljoin(base, "304/test_etag.txt")
419
420 (response, content) = http.request(uri, "GET")
421 (response, content) = http.request(uri, "GET")
422 self.assertEqual(response.fromcache, True)
423 (response, content) = http.request(uri, "DELETE")
424 self.assertEqual(response.status, 405)
425
426 (response, content) = http.request(uri, "GET")
427 self.assertEqual(response.fromcache, False)
428
429 def testUpdateUsesCachedETag(self):
430 # Test that we natively support http://www.w3.org/1999/04/Editing/
431 uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
432
433 (response, content) = http.request(uri, "GET")
434 self.assertEqual(response.status, 200)
435 self.assertEqual(response.fromcache, False)
436 (response, content) = http.request(uri, "GET")
437 self.assertEqual(response.status, 200)
438 self.assertEqual(response.fromcache, True)
439 (response, content) = http.request(uri, "PUT")
440 self.assertEqual(response.status, 200)
441 (response, content) = http.request(uri, "PUT")
442 self.assertEqual(response.status, 412)
443
444 def testBasicAuth(self):
445 # Test Basic Authentication
446 uri = urlparse.urljoin(base, "basic/file.txt")
447 (response, content) = http.request(uri, "GET")
448 self.assertEqual(response.status, 401)
449
450 uri = urlparse.urljoin(base, "basic/")
451 (response, content) = http.request(uri, "GET")
452 self.assertEqual(response.status, 401)
453
454 http.add_credentials('joe', 'password')
455 (response, content) = http.request(uri, "GET")
456 self.assertEqual(response.status, 200)
457
458 uri = urlparse.urljoin(base, "basic/file.txt")
459 (response, content) = http.request(uri, "GET")
460 self.assertEqual(response.status, 200)
461
462 def testBasicAuthTwoDifferentCredentials(self):
463 # Test Basic Authentication with multple sets of credentials
464 uri = urlparse.urljoin(base, "basic2/file.txt")
465 (response, content) = http.request(uri, "GET")
466 self.assertEqual(response.status, 401)
467
468 uri = urlparse.urljoin(base, "basic2/")
469 (response, content) = http.request(uri, "GET")
470 self.assertEqual(response.status, 401)
471
472 http.add_credentials('fred', 'barney')
473 (response, content) = http.request(uri, "GET")
474 self.assertEqual(response.status, 200)
475
476 uri = urlparse.urljoin(base, "basic2/file.txt")
477 (response, content) = http.request(uri, "GET")
478 self.assertEqual(response.status, 200)
479
480 def testBasicAuthNested(self):
481 # Test Basic Authentication with resources
482 # that are nested
483 uri = urlparse.urljoin(base, "basic-nested/")
484 (response, content) = http.request(uri, "GET")
485 self.assertEqual(response.status, 401)
486
487 uri = urlparse.urljoin(base, "basic-nested/subdir")
488 (response, content) = http.request(uri, "GET")
489 self.assertEqual(response.status, 401)
490
491 # Now add in creditials one at a time and test.
492 http.add_credentials('joe', 'password')
493
494 uri = urlparse.urljoin(base, "basic-nested/")
495 (response, content) = http.request(uri, "GET")
496 self.assertEqual(response.status, 200)
497
498 uri = urlparse.urljoin(base, "basic-nested/subdir")
499 (response, content) = http.request(uri, "GET")
500 self.assertEqual(response.status, 401)
501
502 http.add_credentials('fred', 'barney')
503
504 uri = urlparse.urljoin(base, "basic-nested/")
505 (response, content) = http.request(uri, "GET")
506 self.assertEqual(response.status, 200)
507
508 uri = urlparse.urljoin(base, "basic-nested/subdir")
509 (response, content) = http.request(uri, "GET")
510 self.assertEqual(response.status, 200)
511
512 def testDigestAuth(self):
513 # Test that we support Digest Authentication
514 uri = urlparse.urljoin(base, "digest/")
515 (response, content) = http.request(uri, "GET")
516 self.assertEqual(response.status, 401)
517
518 http.add_credentials('joe', 'password')
519 (response, content) = http.request(uri, "GET")
520 self.assertEqual(response.status, 200)
521
522 uri = urlparse.urljoin(base, "digest/file.txt")
523 (response, content) = http.request(uri, "GET")
524
525 def testDigestAuthNextNonceAndNC(self):
526 # Test that if the server sets nextnonce that we reset
527 # the nonce count back to 1
528 uri = urlparse.urljoin(base, "digest/file.txt")
529 http.add_credentials('joe', 'password')
530 (response, content) = http.request(uri, "GET", headers = {"cache-control":"no-cache"})
531 info = httplib2._parse_www_authenticate(response, 'authentication-info')
532 self.assertEqual(response.status, 200)
533 (response, content) = http.request(uri, "GET", headers = {"cache-control":"no-cache"})
534 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
535 self.assertEqual(response.status, 200)
536
537 if info.has_key('nextnonce'):
538 self.assertEqual(info2['nc'], 1)
539
540 def testDigestAuthStale(self):
541 # Test that we can handle a nonce becoming stale
542 uri = urlparse.urljoin(base, "digest-expire/file.txt")
543 http.add_credentials('joe', 'password')
544 (response, content) = http.request(uri, "GET", headers = {"cache-control":"no-cache"})
545 info = httplib2._parse_www_authenticate(response, 'authentication-info')
546 self.assertEqual(response.status, 200)
547
548 time.sleep(3)
549 # Sleep long enough that the nonce becomes stale
550
551 (response, content) = http.request(uri, "GET", headers = {"cache-control":"no-cache"})
552 self.assertFalse(response.fromcache)
553 self.assertTrue(response._stale_digest)
554 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
555 self.assertEqual(response.status, 200)
556
557 def reflector(self, content):
558 return dict( [tuple(x.split("=")) for x in content.strip().split("\n")] )
559
560 def testReflector(self):
561 uri = urlparse.urljoin(base, "reflector/reflector.cgi")
562 (response, content) = http.request(uri, "GET")
563 d = self.reflector(content)
564 self.assertTrue(d.has_key('HTTP_USER_AGENT'))
565
jcgregoriodb8dfc82006-03-31 14:59:46 +0000566# ------------------------------------------------------------------------
jcgregorio2d66d4f2006-02-07 05:34:14 +0000567
568class HttpPrivateTest(unittest.TestCase):
569
570 def testParseCacheControl(self):
571 # Test that we can parse the Cache-Control header
572 self.assertEqual({}, httplib2._parse_cache_control({}))
573 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
574 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
575 self.assertEqual(cc['no-cache'], 1)
576 self.assertEqual(cc['max-age'], '7200')
577 cc = httplib2._parse_cache_control({'cache-control': ' , '})
578 self.assertEqual(cc[''], 1)
579
580 def testNormalizeHeaders(self):
581 # Test that we normalize headers to lowercase
582 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
583 self.assertTrue(h.has_key('cache-control'))
584 self.assertTrue(h.has_key('other'))
585 self.assertEqual('Stuff', h['other'])
586
587 def testExpirationModelTransparent(self):
588 # Test that no-cache makes our request TRANSPARENT
589 response_headers = {
590 'cache-control': 'max-age=7200'
591 }
592 request_headers = {
593 'cache-control': 'no-cache'
594 }
595 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
596
597 def testExpirationModelNoCacheResponse(self):
598 # The date and expires point to an entry that should be
599 # FRESH, but the no-cache over-rides that.
600 now = time.time()
601 response_headers = {
602 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
603 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
604 'cache-control': 'no-cache'
605 }
606 request_headers = {
607 }
608 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
609
610 def testExpirationModelStaleRequestMustReval(self):
611 # must-revalidate forces STALE
612 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
613
614 def testExpirationModelStaleResponseMustReval(self):
615 # must-revalidate forces STALE
616 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
617
618 def testExpirationModelFresh(self):
619 response_headers = {
620 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
621 'cache-control': 'max-age=2'
622 }
623 request_headers = {
624 }
625 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
626 time.sleep(3)
627 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
628
629 def testExpirationMaxAge0(self):
630 response_headers = {
631 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
632 'cache-control': 'max-age=0'
633 }
634 request_headers = {
635 }
636 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
637
638 def testExpirationModelDateAndExpires(self):
639 now = time.time()
640 response_headers = {
641 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
642 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
643 }
644 request_headers = {
645 }
646 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
647 time.sleep(3)
648 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
649
650 def testExpirationModelDateOnly(self):
651 now = time.time()
652 response_headers = {
653 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
654 }
655 request_headers = {
656 }
657 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
658
659 def testExpirationModelOnlyIfCached(self):
660 response_headers = {
661 }
662 request_headers = {
663 'cache-control': 'only-if-cached',
664 }
665 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
666
667 def testExpirationModelMaxAgeBoth(self):
668 now = time.time()
669 response_headers = {
670 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
671 'cache-control': 'max-age=2'
672 }
673 request_headers = {
674 'cache-control': 'max-age=0'
675 }
676 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
677
678 def testExpirationModelDateAndExpiresMinFresh1(self):
679 now = time.time()
680 response_headers = {
681 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
682 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
683 }
684 request_headers = {
685 'cache-control': 'min-fresh=2'
686 }
687 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
688
689 def testExpirationModelDateAndExpiresMinFresh2(self):
690 now = time.time()
691 response_headers = {
692 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
693 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
694 }
695 request_headers = {
696 'cache-control': 'min-fresh=2'
697 }
698 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
699
700 def testParseWWWAuthenticateEmpty(self):
701 res = httplib2._parse_www_authenticate({})
702 self.assertEqual(len(res.keys()), 0)
703
704 def testParseWWWAuthenticateBasic(self):
705 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
706 basic = res['basic']
707 self.assertEqual('me', basic['realm'])
708
709 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
710 basic = res['basic']
711 self.assertEqual('me', basic['realm'])
712 self.assertEqual('MD5', basic['algorithm'])
713
714 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
715 basic = res['basic']
716 self.assertEqual('me', basic['realm'])
717 self.assertEqual('MD5', basic['algorithm'])
718
719 def testParseWWWAuthenticateBasic2(self):
720 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
721 basic = res['basic']
722 self.assertEqual('me', basic['realm'])
723 self.assertEqual('fred', basic['other'])
724
725 def testParseWWWAuthenticateBasic3(self):
726 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
727 basic = res['basic']
728 self.assertEqual('me', basic['realm'])
729
730
731 def testParseWWWAuthenticateDigest(self):
732 res = httplib2._parse_www_authenticate({ 'www-authenticate':
733 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
734 digest = res['digest']
735 self.assertEqual('testrealm@host.com', digest['realm'])
736 self.assertEqual('auth,auth-int', digest['qop'])
737
738
739 def testParseWWWAuthenticateMultiple(self):
740 res = httplib2._parse_www_authenticate({ 'www-authenticate':
741 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
742 digest = res['digest']
743 self.assertEqual('testrealm@host.com', digest['realm'])
744 self.assertEqual('auth,auth-int', digest['qop'])
745 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
746 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
747 basic = res['basic']
748 self.assertEqual('me', basic['realm'])
749
750 def testParseWWWAuthenticateMultiple2(self):
751 # Handle an added comma between challenges, which might get thrown in if the challenges were
752 # originally sent in separate www-authenticate headers.
753 res = httplib2._parse_www_authenticate({ 'www-authenticate':
754 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
755 digest = res['digest']
756 self.assertEqual('testrealm@host.com', digest['realm'])
757 self.assertEqual('auth,auth-int', digest['qop'])
758 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
759 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
760 basic = res['basic']
761 self.assertEqual('me', basic['realm'])
762
763 def testParseWWWAuthenticateMultiple3(self):
764 # Handle an added comma between challenges, which might get thrown in if the challenges were
765 # originally sent in separate www-authenticate headers.
766 res = httplib2._parse_www_authenticate({ 'www-authenticate':
767 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
768 digest = res['digest']
769 self.assertEqual('testrealm@host.com', digest['realm'])
770 self.assertEqual('auth,auth-int', digest['qop'])
771 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
772 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
773 basic = res['basic']
774 self.assertEqual('me', basic['realm'])
775 wsse = res['wsse']
776 self.assertEqual('foo', wsse['realm'])
777 self.assertEqual('UsernameToken', wsse['profile'])
778
779 def testParseWWWAuthenticateMultiple4(self):
780 res = httplib2._parse_www_authenticate({ 'www-authenticate':
781 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
782 digest = res['digest']
783 self.assertEqual('test-real.m@host.com', digest['realm'])
784 self.assertEqual('\tauth,auth-int', digest['qop'])
785 self.assertEqual('(*)&^&$%#', digest['nonce'])
786
787 def testParseWWWAuthenticateMoreQuoteCombos(self):
788 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
789 digest = res['digest']
790 self.assertEqual('myrealm', digest['realm'])
791
792 def testDigestObject(self):
793 credentials = ('joe', 'password')
794 host = None
795 request_uri = '/projects/httplib2/test/digest/'
796 headers = {}
797 response = {
798 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
799 }
800 content = ""
801
jcgregorio6cbab7e2006-04-21 20:35:43 +0000802 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
jcgregorio2d66d4f2006-02-07 05:34:14 +0000803 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
804 our_request = "Authorization: %s" % headers['Authorization']
805 working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
806 self.assertEqual(our_request, working_request)
807
808
809 def testDigestObjectStale(self):
810 credentials = ('joe', 'password')
811 host = None
812 request_uri = '/projects/httplib2/test/digest/'
813 headers = {}
814 response = httplib2.Response({ })
815 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
816 response.status = 401
817 content = ""
jcgregorio6cbab7e2006-04-21 20:35:43 +0000818 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
jcgregorio2d66d4f2006-02-07 05:34:14 +0000819 # Returns true to force a retry
820 self.assertTrue( d.response(response, content) )
821
822 def testDigestObjectAuthInfo(self):
823 credentials = ('joe', 'password')
824 host = None
825 request_uri = '/projects/httplib2/test/digest/'
826 headers = {}
827 response = httplib2.Response({ })
828 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
829 response['authentication-info'] = 'nextnonce="fred"'
830 content = ""
jcgregorio6cbab7e2006-04-21 20:35:43 +0000831 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
jcgregorio2d66d4f2006-02-07 05:34:14 +0000832 # Returns true to force a retry
833 self.assertFalse( d.response(response, content) )
834 self.assertEqual('fred', d.challenge['nonce'])
835 self.assertEqual(1, d.challenge['nc'])
836
837 def testWsseAlgorithm(self):
838 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
839 expected = "quR/EWLAV4xLf9Zqyw4pDmfV9OY="
840 self.assertEqual(expected, digest)
841
jcgregoriodb8dfc82006-03-31 14:59:46 +0000842 def testEnd2End(self):
843 # one end to end header
844 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
845 end2end = httplib2._get_end2end_headers(response)
846 self.assertTrue('content-type' in end2end)
847 self.assertTrue('te' not in end2end)
848 self.assertTrue('connection' not in end2end)
849
850 # one end to end header that gets eliminated
851 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
852 end2end = httplib2._get_end2end_headers(response)
853 self.assertTrue('content-type' not in end2end)
854 self.assertTrue('te' not in end2end)
855 self.assertTrue('connection' not in end2end)
856
857 # Degenerate case of no headers
858 response = {}
859 end2end = httplib2._get_end2end_headers(response)
860 self.assertEquals(0, len(end2end))
861
862 # Degenerate case of connection referrring to a header not passed in
863 response = {'connection': 'content-type'}
864 end2end = httplib2._get_end2end_headers(response)
865 self.assertEquals(0, len(end2end))
jcgregorio2d66d4f2006-02-07 05:34:14 +0000866
867unittest.main()
868