blob: 88f1bdfc9a6ded3555325f4ce73a7263e1941d0f [file] [log] [blame]
jcgregorio2d66d4f2006-02-07 05:34:14 +00001#!/usr/bin/env python2.4
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 2.4 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = []
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.1 ($Rev: 118 $)"
16
17
18import unittest, httplib2, os, urlparse, time, base64
19
jcgregorio8421f272006-02-14 18:19:51 +000020
21# Python 2.3 support
22if not hasattr(unittest.TestCase, 'assertTrue'):
23 unittest.TestCase.assertTrue = unittest.TestCase.failUnless
24 unittest.TestCase.assertFalse = unittest.TestCase.failIf
25
jcgregorio2d66d4f2006-02-07 05:34:14 +000026# The test resources base uri
27base = 'http://bitworking.org/projects/httplib2/test/'
28#base = 'http://localhost/projects/httplib2/test/'
29
30class ParserTest(unittest.TestCase):
31 def testFromStd66(self):
32 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
33 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
34 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
35 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
36 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
37 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
38 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
39 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
40
41http = httplib2.Http(".cache")
42
43class HttpTest(unittest.TestCase):
44 def setUp(self):
45 [os.remove(os.path.join(".cache", file)) for file in os.listdir(".cache")]
46 http.clear_credentials()
47
48 def testGetIsDefaultMethod(self):
49 # Test that GET is the default method
50 uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
51 (response, content) = http.request(uri)
52 self.assertEqual(response['x-method'], "GET")
53
54 def testDifferentMethods(self):
55 # Test that all methods can be used
56 uri = urlparse.urljoin(base, "methods/method_reflector.cgi")
57 for method in ["GET", "PUT", "DELETE", "POST"]:
58 (response, content) = http.request(uri, method, body=" ")
59 self.assertEqual(response['x-method'], method)
60
61 def testGetNoCache(self):
62 # Test that can do a GET w/o the cache turned on.
63 http = httplib2.Http()
64 uri = urlparse.urljoin(base, "304/test_etag.txt")
65 (response, content) = http.request(uri, "GET")
66 self.assertEqual(response.status, 200)
67 self.assertEqual(response._previous, None)
68
69 def testUserAgent(self):
70 # Test that we provide a default user-agent
71 uri = urlparse.urljoin(base, "user-agent/test.cgi")
72 (response, content) = http.request(uri, "GET")
73 self.assertEqual(response.status, 200)
74 self.assertTrue(content.startswith("Python-httplib2/"))
75
76 def testUserAgentNonDefault(self):
77 # Test that the default user-agent can be over-ridden
78 uri = urlparse.urljoin(base, "user-agent/test.cgi")
79 (response, content) = http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
80 self.assertEqual(response.status, 200)
81 self.assertTrue(content.startswith("fred/1.0"))
82
83 def testGet300WithLocation(self):
84 # Test the we automatically follow 300 redirects if a Location: header is provided
85 uri = urlparse.urljoin(base, "300/with-location-header.asis")
86 (response, content) = http.request(uri, "GET")
87 self.assertEqual(response.status, 200)
88 self.assertEqual(content, "This is the final destination.\n")
89 self.assertEqual(response._previous.status, 300)
90 self.assertEqual(response._previous.fromcache, False)
91
92 # Confirm that the intermediate 300 is not cached
93 (response, content) = http.request(uri, "GET")
94 self.assertEqual(response.status, 200)
95 self.assertEqual(content, "This is the final destination.\n")
96 self.assertEqual(response._previous.status, 300)
97 self.assertEqual(response._previous.fromcache, False)
98
99 def testGet300WithoutLocation(self):
100 # Not giving a Location: header in a 300 response is acceptable
101 # In which case we just return the 300 response
102 uri = urlparse.urljoin(base, "300/without-location-header.asis")
103 (response, content) = http.request(uri, "GET")
104 self.assertEqual(response.status, 300)
105 self.assertTrue(response['content-type'].startswith("text/html"))
106 self.assertEqual(response._previous, None)
107
108 def testGet301(self):
109 # Test that we automatically follow 301 redirects
110 # and that we cache the 301 response
111 uri = urlparse.urljoin(base, "301/onestep.asis")
112 (response, content) = http.request(uri, "GET")
113 self.assertEqual(response.status, 200)
114 self.assertEqual(content, "This is the final destination.\n")
115 self.assertEqual(response._previous.status, 301)
116 self.assertEqual(response._previous.fromcache, False)
117
118 (response, content) = http.request(uri, "GET")
119 self.assertEqual(response.status, 200)
120 self.assertEqual(content, "This is the final destination.\n")
121 self.assertEqual(response._previous.status, 301)
122 self.assertEqual(response._previous.fromcache, True)
123
124 def testGet302(self):
125 # Test that we automatically follow 302 redirects
126 # and that we DO NOT cache the 302 response
127 uri = urlparse.urljoin(base, "302/onestep.asis")
128 (response, content) = http.request(uri, "GET")
129 self.assertEqual(response.status, 200)
130 self.assertEqual(content, "This is the final destination.\n")
131 self.assertEqual(response._previous.status, 302)
132 self.assertEqual(response._previous.fromcache, False)
133
134 uri = urlparse.urljoin(base, "302/onestep.asis")
135 (response, content) = http.request(uri, "GET")
136 self.assertEqual(response.status, 200)
137 self.assertEqual(response.fromcache, True)
138 self.assertEqual(content, "This is the final destination.\n")
139 self.assertEqual(response._previous.status, 302)
140 self.assertEqual(response._previous.fromcache, False)
141
142 uri = urlparse.urljoin(base, "302/twostep.asis")
143
144 (response, content) = http.request(uri, "GET")
145 self.assertEqual(response.status, 200)
146 self.assertEqual(response.fromcache, True)
147 self.assertEqual(content, "This is the final destination.\n")
148 self.assertEqual(response._previous.status, 302)
149 self.assertEqual(response._previous.fromcache, False)
150
151 def testGet302RedirectionLimit(self):
152 # Test that we can set a lower redirection limit
153 # and that we raise an exception when we exceed
154 # that limit.
155 uri = urlparse.urljoin(base, "302/twostep.asis")
156 try:
157 (response, content) = http.request(uri, "GET", redirections = 1)
158 self.fail("This should not happen")
159 except httplib2.RedirectLimit:
160 pass
161 except Exception, e:
162 self.fail("Threw wrong kind of exception ")
163
164 def testGet302NoLocation(self):
165 # Test that we throw an exception when we get
166 # a 302 with no Location: header.
167 uri = urlparse.urljoin(base, "302/no-location.asis")
168 try:
169 (response, content) = http.request(uri, "GET")
170 self.fail("Should never reach here")
171 except httplib2.RedirectMissingLocation:
172 pass
173 except Exception, e:
174 self.fail("Threw wrong kind of exception ")
175
176 def testGet302ViaHttps(self):
177 # goole always redirects to http://google.com
178 (response, content) = http.request("https://google.com", "GET")
179 self.assertEqual(200, response.status)
180 self.assertEqual(302, response._previous.status)
181
182 def testGetViaHttps(self):
183 # Test that we can handle HTTPS
184 (response, content) = http.request("https://google.com/adsense/", "GET")
185 self.assertEqual(200, response.status)
186 self.assertEqual(None, response._previous)
187
188 def testGetViaHttpsSpecViolationOnLocation(self):
189 # Test that we follow redirects through HTTPS
190 # even if they violate the spec by including
191 # a relative Location: header instead of an
192 # absolute one.
193 (response, content) = http.request("https://google.com/adsense", "GET")
194 self.assertEqual(200, response.status)
195 self.assertNotEqual(None, response._previous)
196
197 def testGet303(self):
198 # Do a follow-up GET on a Location: header
199 # returned from a POST that gave a 303.
200 uri = urlparse.urljoin(base, "303/303.cgi")
201 (response, content) = http.request(uri, "POST", " ")
202 self.assertEqual(response.status, 200)
203 self.assertEqual(content, "This is the final destination.\n")
204 self.assertEqual(response._previous.status, 303)
205
206 def test303ForDifferentMethods(self):
207 # Test that all methods can be used
208 uri = urlparse.urljoin(base, "303/redirect-to-reflector.cgi")
209 # HEAD really does send a HEAD, but apparently Apache changes
210 # every HEAD into a GET, so our script returns x-method: GET.
211 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
212 (response, content) = http.request(uri, method, body=" ")
213 self.assertEqual(response['x-method'], method_on_303)
214
215 def testGet304(self):
216 # Test that we use ETags properly to validate our cache
217 uri = urlparse.urljoin(base, "304/test_etag.txt")
218 (response, content) = http.request(uri, "GET")
219 self.assertNotEqual(response['etag'], "")
220
221 (response, content) = http.request(uri, "GET")
222 (response, content) = http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
223 self.assertEqual(response.status, 200)
224 self.assertEqual(response.fromcache, True)
225
226 (response, content) = http.request(uri, "HEAD")
227 self.assertEqual(response.status, 200)
228 self.assertEqual(response.fromcache, True)
229
230 (response, content) = http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
231 self.assertEqual(response.status, 206)
232 self.assertEqual(response.fromcache, False)
233
234 def testGet304EndToEnd(self):
235 # Test that end to end headers get overwritten in the cache
236 uri = urlparse.urljoin(base, "304/end2end.cgi")
237 (response, content) = http.request(uri, "GET")
238 self.assertNotEqual(response['etag'], "")
239 old_date = response['date']
240 time.sleep(2)
241
242 (response, content) = http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
243 # The response should be from the cache, but the Date: header should be updated.
244 new_date = response['date']
245 self.assertNotEqual(new_date, old_date)
246 self.assertEqual(response.status, 200)
247 self.assertEqual(response.fromcache, True)
248
249 def testGet304LastModified(self):
250 # Test that we can still handle a 304
251 # by only using the last-modified cache validator.
252 uri = urlparse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
253 (response, content) = http.request(uri, "GET")
254
255 self.assertNotEqual(response['last-modified'], "")
256 (response, content) = http.request(uri, "GET")
257 (response, content) = http.request(uri, "GET")
258 self.assertEqual(response.status, 200)
259 self.assertEqual(response.fromcache, True)
260
261 def testGet307(self):
262 # Test that we do follow 307 redirects but
263 # do not cache the 307
264 uri = urlparse.urljoin(base, "307/onestep.asis")
265 (response, content) = http.request(uri, "GET")
266 self.assertEqual(response.status, 200)
267 self.assertEqual(content, "This is the final destination.\n")
268 self.assertEqual(response._previous.status, 307)
269 self.assertEqual(response._previous.fromcache, False)
270
271 (response, content) = http.request(uri, "GET")
272 self.assertEqual(response.status, 200)
273 self.assertEqual(response.fromcache, True)
274 self.assertEqual(content, "This is the final destination.\n")
275 self.assertEqual(response._previous.status, 307)
276 self.assertEqual(response._previous.fromcache, False)
277
278 def testGet410(self):
279 # Test that we pass 410's through
280 uri = urlparse.urljoin(base, "410/410.asis")
281 (response, content) = http.request(uri, "GET")
282 self.assertEqual(response.status, 410)
283
284 def testGetGZip(self):
285 # Test that we support gzip compression
286 uri = urlparse.urljoin(base, "gzip/final-destination.txt")
287 (response, content) = http.request(uri, "GET")
288 self.assertEqual(response.status, 200)
289 self.assertEqual(response['content-encoding'], "gzip")
290 self.assertEqual(content, "This is the final destination.\n")
291
292 def testGetGZipFailure(self):
293 # Test that we raise a good exception when the gzip fails
294 uri = urlparse.urljoin(base, "gzip/failed-compression.asis")
295 try:
296 (response, content) = http.request(uri, "GET")
297 self.fail("Should never reach here")
298 except httplib2.FailedToDecompressContent:
299 pass
300 except Exception:
301 self.fail("Threw wrong kind of exception")
302
303 def testGetDeflate(self):
304 # Test that we support deflate compression
305 uri = urlparse.urljoin(base, "deflate/deflated.asis")
306 (response, content) = http.request(uri, "GET")
307 self.assertEqual(response.status, 200)
308 self.assertEqual(response['content-encoding'], "deflate")
309 self.assertEqual(content, "This is the final destination.")
310
311 def testGetDeflateFailure(self):
312 # Test that we raise a good exception when the deflate fails
313 uri = urlparse.urljoin(base, "deflate/deflated.asis")
314 uri = urlparse.urljoin(base, "deflate/failed-compression.asis")
315 try:
316 (response, content) = http.request(uri, "GET")
317 self.fail("Should never reach here")
318 except httplib2.FailedToDecompressContent:
319 pass
320 except Exception:
321 self.fail("Threw wrong kind of exception")
322
323 def testGetDuplicateHeaders(self):
324 # Test that duplicate headers get concatenated via ','
325 uri = urlparse.urljoin(base, "duplicate-headers/multilink.asis")
326 (response, content) = http.request(uri, "GET")
327 self.assertEqual(response.status, 200)
328 self.assertEqual(content, "This is content\n")
329 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
330
331 def testGetCacheControlNoCache(self):
332 # Test Cache-Control: no-cache on requests
333 uri = urlparse.urljoin(base, "304/test_etag.txt")
334 (response, content) = http.request(uri, "GET")
335 self.assertNotEqual(response['etag'], "")
336 (response, content) = http.request(uri, "GET")
337 self.assertEqual(response.status, 200)
338 self.assertEqual(response.fromcache, True)
339
340 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
341 self.assertEqual(response.status, 200)
342 self.assertEqual(response.fromcache, False)
343
344 def testGetCacheControlPragmaNoCache(self):
345 # Test Pragma: no-cache on requests
346 uri = urlparse.urljoin(base, "304/test_etag.txt")
347 (response, content) = http.request(uri, "GET")
348 self.assertNotEqual(response['etag'], "")
349 (response, content) = http.request(uri, "GET")
350 self.assertEqual(response.status, 200)
351 self.assertEqual(response.fromcache, True)
352
353 (response, content) = http.request(uri, "GET", headers={'Pragma': 'no-cache'})
354 self.assertEqual(response.status, 200)
355 self.assertEqual(response.fromcache, False)
356
357 def testGetCacheControlNoStoreRequest(self):
358 # A no-store request means that the response should not be stored.
359 uri = urlparse.urljoin(base, "304/test_etag.txt")
360
361 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
362 self.assertEqual(response.status, 200)
363 self.assertEqual(response.fromcache, False)
364
365 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
366 self.assertEqual(response.status, 200)
367 self.assertEqual(response.fromcache, False)
368
369 def testGetCacheControlNoStoreResponse(self):
370 # A no-store response means that the response should not be stored.
371 uri = urlparse.urljoin(base, "no-store/no-store.asis")
372
373 (response, content) = http.request(uri, "GET")
374 self.assertEqual(response.status, 200)
375 self.assertEqual(response.fromcache, False)
376
377 (response, content) = http.request(uri, "GET")
378 self.assertEqual(response.status, 200)
379 self.assertEqual(response.fromcache, False)
380 self.assertEqual(0, len(os.listdir(".cache")))
381
382 def testGetCacheControlNoCacheNoStoreRequest(self):
383 # Test that a no-store, no-cache clears the entry from the cache
384 # even if it was cached previously.
385 uri = urlparse.urljoin(base, "304/test_etag.txt")
386
387 (response, content) = http.request(uri, "GET")
388 (response, content) = http.request(uri, "GET")
389 self.assertEqual(response.fromcache, True)
390 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
391 (response, content) = http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
392 self.assertEqual(response.status, 200)
393 self.assertEqual(response.fromcache, False)
394 self.assertEqual(0, len(os.listdir(".cache")))
395
396 def testUpdateInvalidatesCache(self):
397 # Test that calling PUT or DELETE on a
398 # URI that is cache invalidates that cache.
399 uri = urlparse.urljoin(base, "304/test_etag.txt")
400
401 (response, content) = http.request(uri, "GET")
402 (response, content) = http.request(uri, "GET")
403 self.assertEqual(response.fromcache, True)
404 (response, content) = http.request(uri, "DELETE")
405 self.assertEqual(response.status, 405)
406
407 (response, content) = http.request(uri, "GET")
408 self.assertEqual(response.fromcache, False)
409
410 def testUpdateUsesCachedETag(self):
411 # Test that we natively support http://www.w3.org/1999/04/Editing/
412 uri = urlparse.urljoin(base, "conditional-updates/test.cgi")
413
414 (response, content) = http.request(uri, "GET")
415 self.assertEqual(response.status, 200)
416 self.assertEqual(response.fromcache, False)
417 (response, content) = http.request(uri, "GET")
418 self.assertEqual(response.status, 200)
419 self.assertEqual(response.fromcache, True)
420 (response, content) = http.request(uri, "PUT")
421 self.assertEqual(response.status, 200)
422 (response, content) = http.request(uri, "PUT")
423 self.assertEqual(response.status, 412)
424
425 def testBasicAuth(self):
426 # Test Basic Authentication
427 uri = urlparse.urljoin(base, "basic/file.txt")
428 (response, content) = http.request(uri, "GET")
429 self.assertEqual(response.status, 401)
430
431 uri = urlparse.urljoin(base, "basic/")
432 (response, content) = http.request(uri, "GET")
433 self.assertEqual(response.status, 401)
434
435 http.add_credentials('joe', 'password')
436 (response, content) = http.request(uri, "GET")
437 self.assertEqual(response.status, 200)
438
439 uri = urlparse.urljoin(base, "basic/file.txt")
440 (response, content) = http.request(uri, "GET")
441 self.assertEqual(response.status, 200)
442
443 def testBasicAuthTwoDifferentCredentials(self):
444 # Test Basic Authentication with multple sets of credentials
445 uri = urlparse.urljoin(base, "basic2/file.txt")
446 (response, content) = http.request(uri, "GET")
447 self.assertEqual(response.status, 401)
448
449 uri = urlparse.urljoin(base, "basic2/")
450 (response, content) = http.request(uri, "GET")
451 self.assertEqual(response.status, 401)
452
453 http.add_credentials('fred', 'barney')
454 (response, content) = http.request(uri, "GET")
455 self.assertEqual(response.status, 200)
456
457 uri = urlparse.urljoin(base, "basic2/file.txt")
458 (response, content) = http.request(uri, "GET")
459 self.assertEqual(response.status, 200)
460
461 def testBasicAuthNested(self):
462 # Test Basic Authentication with resources
463 # that are nested
464 uri = urlparse.urljoin(base, "basic-nested/")
465 (response, content) = http.request(uri, "GET")
466 self.assertEqual(response.status, 401)
467
468 uri = urlparse.urljoin(base, "basic-nested/subdir")
469 (response, content) = http.request(uri, "GET")
470 self.assertEqual(response.status, 401)
471
472 # Now add in creditials one at a time and test.
473 http.add_credentials('joe', 'password')
474
475 uri = urlparse.urljoin(base, "basic-nested/")
476 (response, content) = http.request(uri, "GET")
477 self.assertEqual(response.status, 200)
478
479 uri = urlparse.urljoin(base, "basic-nested/subdir")
480 (response, content) = http.request(uri, "GET")
481 self.assertEqual(response.status, 401)
482
483 http.add_credentials('fred', 'barney')
484
485 uri = urlparse.urljoin(base, "basic-nested/")
486 (response, content) = http.request(uri, "GET")
487 self.assertEqual(response.status, 200)
488
489 uri = urlparse.urljoin(base, "basic-nested/subdir")
490 (response, content) = http.request(uri, "GET")
491 self.assertEqual(response.status, 200)
492
493 def testDigestAuth(self):
494 # Test that we support Digest Authentication
495 uri = urlparse.urljoin(base, "digest/")
496 (response, content) = http.request(uri, "GET")
497 self.assertEqual(response.status, 401)
498
499 http.add_credentials('joe', 'password')
500 (response, content) = http.request(uri, "GET")
501 self.assertEqual(response.status, 200)
502
503 uri = urlparse.urljoin(base, "digest/file.txt")
504 (response, content) = http.request(uri, "GET")
505
506 def testDigestAuthNextNonceAndNC(self):
507 # Test that if the server sets nextnonce that we reset
508 # the nonce count back to 1
509 uri = urlparse.urljoin(base, "digest/file.txt")
510 http.add_credentials('joe', 'password')
511 (response, content) = http.request(uri, "GET", headers = {"cache-control":"no-cache"})
512 info = httplib2._parse_www_authenticate(response, 'authentication-info')
513 self.assertEqual(response.status, 200)
514 (response, content) = http.request(uri, "GET", headers = {"cache-control":"no-cache"})
515 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
516 self.assertEqual(response.status, 200)
517
518 if info.has_key('nextnonce'):
519 self.assertEqual(info2['nc'], 1)
520
521 def testDigestAuthStale(self):
522 # Test that we can handle a nonce becoming stale
523 uri = urlparse.urljoin(base, "digest-expire/file.txt")
524 http.add_credentials('joe', 'password')
525 (response, content) = http.request(uri, "GET", headers = {"cache-control":"no-cache"})
526 info = httplib2._parse_www_authenticate(response, 'authentication-info')
527 self.assertEqual(response.status, 200)
528
529 time.sleep(3)
530 # Sleep long enough that the nonce becomes stale
531
532 (response, content) = http.request(uri, "GET", headers = {"cache-control":"no-cache"})
533 self.assertFalse(response.fromcache)
534 self.assertTrue(response._stale_digest)
535 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
536 self.assertEqual(response.status, 200)
537
538 def reflector(self, content):
539 return dict( [tuple(x.split("=")) for x in content.strip().split("\n")] )
540
541 def testReflector(self):
542 uri = urlparse.urljoin(base, "reflector/reflector.cgi")
543 (response, content) = http.request(uri, "GET")
544 d = self.reflector(content)
545 self.assertTrue(d.has_key('HTTP_USER_AGENT'))
546
547
548class HttpPrivateTest(unittest.TestCase):
549
550 def testParseCacheControl(self):
551 # Test that we can parse the Cache-Control header
552 self.assertEqual({}, httplib2._parse_cache_control({}))
553 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
554 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
555 self.assertEqual(cc['no-cache'], 1)
556 self.assertEqual(cc['max-age'], '7200')
557 cc = httplib2._parse_cache_control({'cache-control': ' , '})
558 self.assertEqual(cc[''], 1)
559
560 def testNormalizeHeaders(self):
561 # Test that we normalize headers to lowercase
562 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
563 self.assertTrue(h.has_key('cache-control'))
564 self.assertTrue(h.has_key('other'))
565 self.assertEqual('Stuff', h['other'])
566
567 def testExpirationModelTransparent(self):
568 # Test that no-cache makes our request TRANSPARENT
569 response_headers = {
570 'cache-control': 'max-age=7200'
571 }
572 request_headers = {
573 'cache-control': 'no-cache'
574 }
575 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
576
577 def testExpirationModelNoCacheResponse(self):
578 # The date and expires point to an entry that should be
579 # FRESH, but the no-cache over-rides that.
580 now = time.time()
581 response_headers = {
582 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
583 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
584 'cache-control': 'no-cache'
585 }
586 request_headers = {
587 }
588 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
589
590 def testExpirationModelStaleRequestMustReval(self):
591 # must-revalidate forces STALE
592 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
593
594 def testExpirationModelStaleResponseMustReval(self):
595 # must-revalidate forces STALE
596 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
597
598 def testExpirationModelFresh(self):
599 response_headers = {
600 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
601 'cache-control': 'max-age=2'
602 }
603 request_headers = {
604 }
605 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
606 time.sleep(3)
607 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
608
609 def testExpirationMaxAge0(self):
610 response_headers = {
611 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
612 'cache-control': 'max-age=0'
613 }
614 request_headers = {
615 }
616 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
617
618 def testExpirationModelDateAndExpires(self):
619 now = time.time()
620 response_headers = {
621 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
622 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
623 }
624 request_headers = {
625 }
626 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
627 time.sleep(3)
628 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
629
630 def testExpirationModelDateOnly(self):
631 now = time.time()
632 response_headers = {
633 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
634 }
635 request_headers = {
636 }
637 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
638
639 def testExpirationModelOnlyIfCached(self):
640 response_headers = {
641 }
642 request_headers = {
643 'cache-control': 'only-if-cached',
644 }
645 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
646
647 def testExpirationModelMaxAgeBoth(self):
648 now = time.time()
649 response_headers = {
650 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
651 'cache-control': 'max-age=2'
652 }
653 request_headers = {
654 'cache-control': 'max-age=0'
655 }
656 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
657
658 def testExpirationModelDateAndExpiresMinFresh1(self):
659 now = time.time()
660 response_headers = {
661 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
662 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
663 }
664 request_headers = {
665 'cache-control': 'min-fresh=2'
666 }
667 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
668
669 def testExpirationModelDateAndExpiresMinFresh2(self):
670 now = time.time()
671 response_headers = {
672 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
673 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
674 }
675 request_headers = {
676 'cache-control': 'min-fresh=2'
677 }
678 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
679
680 def testParseWWWAuthenticateEmpty(self):
681 res = httplib2._parse_www_authenticate({})
682 self.assertEqual(len(res.keys()), 0)
683
684 def testParseWWWAuthenticateBasic(self):
685 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
686 basic = res['basic']
687 self.assertEqual('me', basic['realm'])
688
689 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
690 basic = res['basic']
691 self.assertEqual('me', basic['realm'])
692 self.assertEqual('MD5', basic['algorithm'])
693
694 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
695 basic = res['basic']
696 self.assertEqual('me', basic['realm'])
697 self.assertEqual('MD5', basic['algorithm'])
698
699 def testParseWWWAuthenticateBasic2(self):
700 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
701 basic = res['basic']
702 self.assertEqual('me', basic['realm'])
703 self.assertEqual('fred', basic['other'])
704
705 def testParseWWWAuthenticateBasic3(self):
706 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
707 basic = res['basic']
708 self.assertEqual('me', basic['realm'])
709
710
711 def testParseWWWAuthenticateDigest(self):
712 res = httplib2._parse_www_authenticate({ 'www-authenticate':
713 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
714 digest = res['digest']
715 self.assertEqual('testrealm@host.com', digest['realm'])
716 self.assertEqual('auth,auth-int', digest['qop'])
717
718
719 def testParseWWWAuthenticateMultiple(self):
720 res = httplib2._parse_www_authenticate({ 'www-authenticate':
721 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
722 digest = res['digest']
723 self.assertEqual('testrealm@host.com', digest['realm'])
724 self.assertEqual('auth,auth-int', digest['qop'])
725 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
726 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
727 basic = res['basic']
728 self.assertEqual('me', basic['realm'])
729
730 def testParseWWWAuthenticateMultiple2(self):
731 # Handle an added comma between challenges, which might get thrown in if the challenges were
732 # originally sent in separate www-authenticate headers.
733 res = httplib2._parse_www_authenticate({ 'www-authenticate':
734 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
735 digest = res['digest']
736 self.assertEqual('testrealm@host.com', digest['realm'])
737 self.assertEqual('auth,auth-int', digest['qop'])
738 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
739 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
740 basic = res['basic']
741 self.assertEqual('me', basic['realm'])
742
743 def testParseWWWAuthenticateMultiple3(self):
744 # Handle an added comma between challenges, which might get thrown in if the challenges were
745 # originally sent in separate www-authenticate headers.
746 res = httplib2._parse_www_authenticate({ 'www-authenticate':
747 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
748 digest = res['digest']
749 self.assertEqual('testrealm@host.com', digest['realm'])
750 self.assertEqual('auth,auth-int', digest['qop'])
751 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
752 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
753 basic = res['basic']
754 self.assertEqual('me', basic['realm'])
755 wsse = res['wsse']
756 self.assertEqual('foo', wsse['realm'])
757 self.assertEqual('UsernameToken', wsse['profile'])
758
759 def testParseWWWAuthenticateMultiple4(self):
760 res = httplib2._parse_www_authenticate({ 'www-authenticate':
761 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
762 digest = res['digest']
763 self.assertEqual('test-real.m@host.com', digest['realm'])
764 self.assertEqual('\tauth,auth-int', digest['qop'])
765 self.assertEqual('(*)&^&$%#', digest['nonce'])
766
767 def testParseWWWAuthenticateMoreQuoteCombos(self):
768 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
769 digest = res['digest']
770 self.assertEqual('myrealm', digest['realm'])
771
772 def testDigestObject(self):
773 credentials = ('joe', 'password')
774 host = None
775 request_uri = '/projects/httplib2/test/digest/'
776 headers = {}
777 response = {
778 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
779 }
780 content = ""
781
782 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content)
783 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
784 our_request = "Authorization: %s" % headers['Authorization']
785 working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
786 self.assertEqual(our_request, working_request)
787
788
789 def testDigestObjectStale(self):
790 credentials = ('joe', 'password')
791 host = None
792 request_uri = '/projects/httplib2/test/digest/'
793 headers = {}
794 response = httplib2.Response({ })
795 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
796 response.status = 401
797 content = ""
798 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content)
799 # Returns true to force a retry
800 self.assertTrue( d.response(response, content) )
801
802 def testDigestObjectAuthInfo(self):
803 credentials = ('joe', 'password')
804 host = None
805 request_uri = '/projects/httplib2/test/digest/'
806 headers = {}
807 response = httplib2.Response({ })
808 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
809 response['authentication-info'] = 'nextnonce="fred"'
810 content = ""
811 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content)
812 # Returns true to force a retry
813 self.assertFalse( d.response(response, content) )
814 self.assertEqual('fred', d.challenge['nonce'])
815 self.assertEqual(1, d.challenge['nc'])
816
817 def testWsseAlgorithm(self):
818 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
819 expected = "quR/EWLAV4xLf9Zqyw4pDmfV9OY="
820 self.assertEqual(expected, digest)
821
822
823unittest.main()
824