blob: e2bdae31414e7f2b6bc6b709a8a3d696c358e10f [file] [log] [blame]
pilgrim00a352e2009-05-29 04:04:44 +00001#!/usr/bin/env python3
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 3.0 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = ["Mark Pilgrim"]
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.2 ($Rev: 118 $)"
16
17
18import sys
19import unittest
20import http.client
21import httplib2
22import os
23import urllib.parse
24import time
25import base64
26import io
27
28# The test resources base uri
29base = 'http://bitworking.org/projects/httplib2/test/'
30#base = 'http://localhost/projects/httplib2/test/'
31cacheDirName = ".cache"
32
33
34class CredentialsTest(unittest.TestCase):
35 def test(self):
36 c = httplib2.Credentials()
37 c.add("joe", "password")
38 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
39 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
40 c.add("fred", "password2", "wellformedweb.org")
41 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
42 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
43 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
44 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
45 c.clear()
46 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
47 c.add("fred", "password2", "wellformedweb.org")
48 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
49 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
50 self.assertEqual(0, len(list(c.iter(""))))
51
52
53class ParserTest(unittest.TestCase):
54 def testFromStd66(self):
55 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
56 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
57 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
58 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
59 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
60 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
61 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
62 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
63
64
65class UrlNormTest(unittest.TestCase):
66 def test(self):
67 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
68 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
69 self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
70 self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
71 self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
72 self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
73 try:
74 httplib2.urlnorm("/")
75 self.fail("Non-absolute URIs should raise an exception")
76 except httplib2.RelativeURIError:
77 pass
78
79class UrlSafenameTest(unittest.TestCase):
80 def test(self):
81 # Test that different URIs end up generating different safe names
82 self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
83 self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
84 self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
85 self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
86 self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
87 self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
88 # Test the max length limits
89 uri = "http://" + ("w" * 200) + ".org"
90 uri2 = "http://" + ("w" * 201) + ".org"
91 self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
92 # Max length should be 200 + 1 (",") + 32
93 self.assertEqual(233, len(httplib2.safename(uri2)))
94 self.assertEqual(233, len(httplib2.safename(uri)))
95 # Unicode
96 if sys.version_info >= (2,3):
97 self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
98
99class _MyResponse(io.BytesIO):
100 def __init__(self, body, **kwargs):
101 io.BytesIO.__init__(self, body)
102 self.headers = kwargs
103
104 def items(self):
105 return self.headers.items()
106
107 def iteritems(self):
108 return iter(self.headers.items())
109
110
111class _MyHTTPConnection(object):
112 "This class is just a mock of httplib.HTTPConnection used for testing"
113
114 def __init__(self, host, port=None, key_file=None, cert_file=None,
115 strict=None, timeout=None, proxy_info=None):
116 self.host = host
117 self.port = port
118 self.timeout = timeout
119 self.log = ""
120
121 def set_debuglevel(self, level):
122 pass
123
124 def connect(self):
125 "Connect to a host on a given port."
126 pass
127
128 def close(self):
129 pass
130
131 def request(self, method, request_uri, body, headers):
132 pass
133
134 def getresponse(self):
135 return _MyResponse(b"the body", status="200")
136
137
138class HttpTest(unittest.TestCase):
139 def setUp(self):
140 if os.path.exists(cacheDirName):
141 [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
142 self.http = httplib2.Http(cacheDirName)
143 self.http.clear_credentials()
144
145 def testConnectionType(self):
146 self.http.force_exception_to_status_code = False
147 response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
148 self.assertEqual(response['content-location'], "http://bitworking.org")
149 self.assertEqual(content, b"the body")
150
151 def testGetUnknownServer(self):
152 self.http.force_exception_to_status_code = False
153 try:
154 self.http.request("http://fred.bitworking.org/")
155 self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
156 except httplib2.ServerNotFoundError:
157 pass
158
159 # Now test with exceptions turned off
160 self.http.force_exception_to_status_code = True
161
162 (response, content) = self.http.request("http://fred.bitworking.org/")
163 self.assertEqual(response['content-type'], 'text/plain')
164 self.assertTrue(content.startswith(b"Unable to find"))
165 self.assertEqual(response.status, 400)
166
167 def testGetIRI(self):
168 if sys.version_info >= (2,3):
169 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
170 (response, content) = self.http.request(uri, "GET")
171 d = self.reflector(content)
172 self.assertTrue('QUERY_STRING' in d)
173 self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
174
175 def testGetIsDefaultMethod(self):
176 # Test that GET is the default method
177 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
178 (response, content) = self.http.request(uri)
179 self.assertEqual(response['x-method'], "GET")
180
181 def testDifferentMethods(self):
182 # Test that all methods can be used
183 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
184 for method in ["GET", "PUT", "DELETE", "POST"]:
185 (response, content) = self.http.request(uri, method, body=b" ")
186 self.assertEqual(response['x-method'], method)
187
188 def testGetNoCache(self):
189 # Test that can do a GET w/o the cache turned on.
190 http = httplib2.Http()
191 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
192 (response, content) = http.request(uri, "GET")
193 self.assertEqual(response.status, 200)
194 self.assertEqual(response.previous, None)
195
196 def testGetOnlyIfCachedCacheMiss(self):
197 # Test that can do a GET with no cache with 'only-if-cached'
198 http = httplib2.Http()
199 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
200 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
201 self.assertEqual(response.fromcache, False)
202 self.assertEqual(response.status, 200)
203
204 def testGetOnlyIfCachedNoCacheAtAll(self):
205 # Test that can do a GET with no cache with 'only-if-cached'
206 # Of course, there might be an intermediary beyond us
207 # that responds to the 'only-if-cached', so this
208 # test can't really be guaranteed to pass.
209 http = httplib2.Http()
210 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
211 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
212 self.assertEqual(response.fromcache, False)
213 self.assertEqual(response.status, 200)
214
215 def testUserAgent(self):
216 # Test that we provide a default user-agent
217 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
218 (response, content) = self.http.request(uri, "GET")
219 self.assertEqual(response.status, 200)
220 self.assertTrue(content.startswith(b"Python-httplib2/"))
221
222 def testUserAgentNonDefault(self):
223 # Test that the default user-agent can be over-ridden
224
225 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
226 (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
227 self.assertEqual(response.status, 200)
228 self.assertTrue(content.startswith(b"fred/1.0"))
229
230 def testGet300WithLocation(self):
231 # Test the we automatically follow 300 redirects if a Location: header is provided
232 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
233 (response, content) = self.http.request(uri, "GET")
234 self.assertEqual(response.status, 200)
235 self.assertEqual(content, b"This is the final destination.\n")
236 self.assertEqual(response.previous.status, 300)
237 self.assertEqual(response.previous.fromcache, False)
238
239 # Confirm that the intermediate 300 is not cached
240 (response, content) = self.http.request(uri, "GET")
241 self.assertEqual(response.status, 200)
242 self.assertEqual(content, b"This is the final destination.\n")
243 self.assertEqual(response.previous.status, 300)
244 self.assertEqual(response.previous.fromcache, False)
245
246 def testGet300WithLocationNoRedirect(self):
247 # Test the we automatically follow 300 redirects if a Location: header is provided
248 self.http.follow_redirects = False
249 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
250 (response, content) = self.http.request(uri, "GET")
251 self.assertEqual(response.status, 300)
252
253 def testGet300WithoutLocation(self):
254 # Not giving a Location: header in a 300 response is acceptable
255 # In which case we just return the 300 response
256 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
257 (response, content) = self.http.request(uri, "GET")
258 self.assertEqual(response.status, 300)
259 self.assertTrue(response['content-type'].startswith("text/html"))
260 self.assertEqual(response.previous, None)
261
262 def testGet301(self):
263 # Test that we automatically follow 301 redirects
264 # and that we cache the 301 response
265 uri = urllib.parse.urljoin(base, "301/onestep.asis")
266 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
267 (response, content) = self.http.request(uri, "GET")
268 self.assertEqual(response.status, 200)
269 self.assertTrue('content-location' in response)
270 self.assertEqual(response['content-location'], destination)
271 self.assertEqual(content, b"This is the final destination.\n")
272 self.assertEqual(response.previous.status, 301)
273 self.assertEqual(response.previous.fromcache, False)
274
275 (response, content) = self.http.request(uri, "GET")
276 self.assertEqual(response.status, 200)
277 self.assertEqual(response['content-location'], destination)
278 self.assertEqual(content, b"This is the final destination.\n")
279 self.assertEqual(response.previous.status, 301)
280 self.assertEqual(response.previous.fromcache, True)
281
282
283 def testGet301NoRedirect(self):
284 # Test that we automatically follow 301 redirects
285 # and that we cache the 301 response
286 self.http.follow_redirects = False
287 uri = urllib.parse.urljoin(base, "301/onestep.asis")
288 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
289 (response, content) = self.http.request(uri, "GET")
290 self.assertEqual(response.status, 301)
291
292
293 def testGet302(self):
294 # Test that we automatically follow 302 redirects
295 # and that we DO NOT cache the 302 response
296 uri = urllib.parse.urljoin(base, "302/onestep.asis")
297 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
298 (response, content) = self.http.request(uri, "GET")
299 self.assertEqual(response.status, 200)
300 self.assertEqual(response['content-location'], destination)
301 self.assertEqual(content, b"This is the final destination.\n")
302 self.assertEqual(response.previous.status, 302)
303 self.assertEqual(response.previous.fromcache, False)
304
305 uri = urllib.parse.urljoin(base, "302/onestep.asis")
306 (response, content) = self.http.request(uri, "GET")
307 self.assertEqual(response.status, 200)
308 self.assertEqual(response.fromcache, True)
309 self.assertEqual(response['content-location'], destination)
310 self.assertEqual(content, b"This is the final destination.\n")
311 self.assertEqual(response.previous.status, 302)
312 self.assertEqual(response.previous.fromcache, False)
313 self.assertEqual(response.previous['content-location'], uri)
314
315 uri = urllib.parse.urljoin(base, "302/twostep.asis")
316
317 (response, content) = self.http.request(uri, "GET")
318 self.assertEqual(response.status, 200)
319 self.assertEqual(response.fromcache, True)
320 self.assertEqual(content, b"This is the final destination.\n")
321 self.assertEqual(response.previous.status, 302)
322 self.assertEqual(response.previous.fromcache, False)
323
324 def testGet302RedirectionLimit(self):
325 # Test that we can set a lower redirection limit
326 # and that we raise an exception when we exceed
327 # that limit.
328 self.http.force_exception_to_status_code = False
329
330 uri = urllib.parse.urljoin(base, "302/twostep.asis")
331 try:
332 (response, content) = self.http.request(uri, "GET", redirections = 1)
333 self.fail("This should not happen")
334 except httplib2.RedirectLimit:
335 pass
336 except Exception as e:
337 self.fail("Threw wrong kind of exception ")
338
339 # Re-run the test with out the exceptions
340 self.http.force_exception_to_status_code = True
341
342 (response, content) = self.http.request(uri, "GET", redirections = 1)
343 self.assertEqual(response.status, 500)
344 self.assertTrue(response.reason.startswith("Redirected more"))
345 self.assertEqual("302", response['status'])
346 self.assertTrue(content.startswith(b"<html>"))
347 self.assertTrue(response.previous != None)
348
349 def testGet302NoLocation(self):
350 # Test that we throw an exception when we get
351 # a 302 with no Location: header.
352 self.http.force_exception_to_status_code = False
353 uri = urllib.parse.urljoin(base, "302/no-location.asis")
354 try:
355 (response, content) = self.http.request(uri, "GET")
356 self.fail("Should never reach here")
357 except httplib2.RedirectMissingLocation:
358 pass
359 except Exception as e:
360 self.fail("Threw wrong kind of exception ")
361
362 # Re-run the test with out the exceptions
363 self.http.force_exception_to_status_code = True
364
365 (response, content) = self.http.request(uri, "GET")
366 self.assertEqual(response.status, 500)
367 self.assertTrue(response.reason.startswith("Redirected but"))
368 self.assertEqual("302", response['status'])
369 self.assertTrue(content.startswith(b"This is content"))
370
371 def testGet302ViaHttps(self):
372 # Google always redirects to http://google.com
373 (response, content) = self.http.request("https://google.com", "GET")
374 self.assertEqual(200, response.status)
375 self.assertEqual(302, response.previous.status)
376
377 def testGetViaHttps(self):
378 # Test that we can handle HTTPS
379 (response, content) = self.http.request("https://google.com/adsense/", "GET")
380 self.assertEqual(200, response.status)
381
382 def testGetViaHttpsSpecViolationOnLocation(self):
383 # Test that we follow redirects through HTTPS
384 # even if they violate the spec by including
385 # a relative Location: header instead of an
386 # absolute one.
387 (response, content) = self.http.request("https://google.com/adsense", "GET")
388 self.assertEqual(200, response.status)
389 self.assertNotEqual(None, response.previous)
390
391
392 def testGetViaHttpsKeyCert(self):
393 # At this point I can only test
394 # that the key and cert files are passed in
395 # correctly to httplib. It would be nice to have
396 # a real https endpoint to test against.
397 http = httplib2.Http(timeout=2)
398
399 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
400 try:
401 (response, content) = http.request("https://bitworking.org", "GET")
402 except:
403 pass
404 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
405 self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
406
407 try:
408 (response, content) = http.request("https://notthere.bitworking.org", "GET")
409 except:
410 pass
411 self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
412 self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
413
414
415
416
417 def testGet303(self):
418 # Do a follow-up GET on a Location: header
419 # returned from a POST that gave a 303.
420 uri = urllib.parse.urljoin(base, "303/303.cgi")
421 (response, content) = self.http.request(uri, "POST", " ")
422 self.assertEqual(response.status, 200)
423 self.assertEqual(content, b"This is the final destination.\n")
424 self.assertEqual(response.previous.status, 303)
425
426 def testGet303NoRedirect(self):
427 # Do a follow-up GET on a Location: header
428 # returned from a POST that gave a 303.
429 self.http.follow_redirects = False
430 uri = urllib.parse.urljoin(base, "303/303.cgi")
431 (response, content) = self.http.request(uri, "POST", " ")
432 self.assertEqual(response.status, 303)
433
434 def test303ForDifferentMethods(self):
435 # Test that all methods can be used
436 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
437 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
438 (response, content) = self.http.request(uri, method, body=b" ")
439 self.assertEqual(response['x-method'], method_on_303)
440
441 def testGet304(self):
442 # Test that we use ETags properly to validate our cache
443 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
444 (response, content) = self.http.request(uri, "GET")
445 self.assertNotEqual(response['etag'], "")
446
447 (response, content) = self.http.request(uri, "GET")
448 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
449 self.assertEqual(response.status, 200)
450 self.assertEqual(response.fromcache, True)
451
452 cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
453 f = open(cache_file_name, "r")
454 status_line = f.readline()
455 f.close()
456
457 self.assertTrue(status_line.startswith("status:"))
458
459 (response, content) = self.http.request(uri, "HEAD")
460 self.assertEqual(response.status, 200)
461 self.assertEqual(response.fromcache, True)
462
463 (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
464 self.assertEqual(response.status, 206)
465 self.assertEqual(response.fromcache, False)
466
467 def testGetIgnoreEtag(self):
468 # Test that we can forcibly ignore ETags
469 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
470 (response, content) = self.http.request(uri, "GET")
471 self.assertNotEqual(response['etag'], "")
472
473 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
474 d = self.reflector(content)
475 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
476
477 self.http.ignore_etag = True
478 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
479 d = self.reflector(content)
480 self.assertEqual(response.fromcache, False)
481 self.assertFalse('HTTP_IF_NONE_MATCH' in d)
482
483 def testOverrideEtag(self):
484 # Test that we can forcibly ignore ETags
485 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
486 (response, content) = self.http.request(uri, "GET")
487 self.assertNotEqual(response['etag'], "")
488
489 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
490 d = self.reflector(content)
491 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
492 self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
493
494 (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
495 d = self.reflector(content)
496 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
497 self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
498
499#MAP-commented this out because it consistently fails
500# def testGet304EndToEnd(self):
501# # Test that end to end headers get overwritten in the cache
502# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
503# (response, content) = self.http.request(uri, "GET")
504# self.assertNotEqual(response['etag'], "")
505# old_date = response['date']
506# time.sleep(2)
507#
508# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
509# # The response should be from the cache, but the Date: header should be updated.
510# new_date = response['date']
511# self.assertNotEqual(new_date, old_date)
512# self.assertEqual(response.status, 200)
513# self.assertEqual(response.fromcache, True)
514
515 def testGet304LastModified(self):
516 # Test that we can still handle a 304
517 # by only using the last-modified cache validator.
518 uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
519 (response, content) = self.http.request(uri, "GET")
520
521 self.assertNotEqual(response['last-modified'], "")
522 (response, content) = self.http.request(uri, "GET")
523 (response, content) = self.http.request(uri, "GET")
524 self.assertEqual(response.status, 200)
525 self.assertEqual(response.fromcache, True)
526
527 def testGet307(self):
528 # Test that we do follow 307 redirects but
529 # do not cache the 307
530 uri = urllib.parse.urljoin(base, "307/onestep.asis")
531 (response, content) = self.http.request(uri, "GET")
532 self.assertEqual(response.status, 200)
533 self.assertEqual(content, b"This is the final destination.\n")
534 self.assertEqual(response.previous.status, 307)
535 self.assertEqual(response.previous.fromcache, False)
536
537 (response, content) = self.http.request(uri, "GET")
538 self.assertEqual(response.status, 200)
539 self.assertEqual(response.fromcache, True)
540 self.assertEqual(content, b"This is the final destination.\n")
541 self.assertEqual(response.previous.status, 307)
542 self.assertEqual(response.previous.fromcache, False)
543
544 def testGet410(self):
545 # Test that we pass 410's through
546 uri = urllib.parse.urljoin(base, "410/410.asis")
547 (response, content) = self.http.request(uri, "GET")
548 self.assertEqual(response.status, 410)
549
550 def testHeadGZip(self):
551 # Test that we don't try to decompress a HEAD response
552 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
553 (response, content) = self.http.request(uri, "HEAD")
554 self.assertEqual(response.status, 200)
555 self.assertNotEqual(int(response['content-length']), 0)
556 self.assertEqual(content, b"")
557
558 def testGetGZip(self):
559 # Test that we support gzip compression
560 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
561 (response, content) = self.http.request(uri, "GET")
562 self.assertEqual(response.status, 200)
563 self.assertFalse('content-encoding' in response)
564 self.assertTrue('-content-encoding' in response)
565 self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
566 self.assertEqual(content, b"This is the final destination.\n")
567
568 def testGetGZipFailure(self):
569 # Test that we raise a good exception when the gzip fails
570 self.http.force_exception_to_status_code = False
571 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
572 try:
573 (response, content) = self.http.request(uri, "GET")
574 self.fail("Should never reach here")
575 except httplib2.FailedToDecompressContent:
576 pass
577 except Exception:
578 self.fail("Threw wrong kind of exception")
579
580 # Re-run the test with out the exceptions
581 self.http.force_exception_to_status_code = True
582
583 (response, content) = self.http.request(uri, "GET")
584 self.assertEqual(response.status, 500)
585 self.assertTrue(response.reason.startswith("Content purported"))
586
587 def testTimeout(self):
588 self.http.force_exception_to_status_code = True
589 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
590 try:
591 import socket
592 socket.setdefaulttimeout(1)
593 except:
594 # Don't run the test if we can't set the timeout
595 return
596 (response, content) = self.http.request(uri)
597 self.assertEqual(response.status, 408)
598 self.assertTrue(response.reason.startswith("Request Timeout"))
599 self.assertTrue(content.startswith(b"Request Timeout"))
600
601 def testIndividualTimeout(self):
602 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
603 http = httplib2.Http(timeout=1)
604 http.force_exception_to_status_code = True
605
606 (response, content) = http.request(uri)
607 self.assertEqual(response.status, 408)
608 self.assertTrue(response.reason.startswith("Request Timeout"))
609 self.assertTrue(content.startswith(b"Request Timeout"))
610
611
612 def testGetDeflate(self):
613 # Test that we support deflate compression
614 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
615 (response, content) = self.http.request(uri, "GET")
616 self.assertEqual(response.status, 200)
617 self.assertFalse('content-encoding' in response)
618 self.assertEqual(int(response['content-length']), len("This is the final destination."))
619 self.assertEqual(content, b"This is the final destination.")
620
621 def testGetDeflateFailure(self):
622 # Test that we raise a good exception when the deflate fails
623 self.http.force_exception_to_status_code = False
624
625 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
626 try:
627 (response, content) = self.http.request(uri, "GET")
628 self.fail("Should never reach here")
629 except httplib2.FailedToDecompressContent:
630 pass
631 except Exception:
632 self.fail("Threw wrong kind of exception")
633
634 # Re-run the test with out the exceptions
635 self.http.force_exception_to_status_code = True
636
637 (response, content) = self.http.request(uri, "GET")
638 self.assertEqual(response.status, 500)
639 self.assertTrue(response.reason.startswith("Content purported"))
640
641 def testGetDuplicateHeaders(self):
642 # Test that duplicate headers get concatenated via ','
643 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
644 (response, content) = self.http.request(uri, "GET")
645 self.assertEqual(response.status, 200)
646 self.assertEqual(content, b"This is content\n")
647 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
648
649 def testGetCacheControlNoCache(self):
650 # Test Cache-Control: no-cache on requests
651 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
652 (response, content) = self.http.request(uri, "GET")
653 self.assertNotEqual(response['etag'], "")
654 (response, content) = self.http.request(uri, "GET")
655 self.assertEqual(response.status, 200)
656 self.assertEqual(response.fromcache, True)
657
658 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
659 self.assertEqual(response.status, 200)
660 self.assertEqual(response.fromcache, False)
661
662 def testGetCacheControlPragmaNoCache(self):
663 # Test Pragma: no-cache on requests
664 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
665 (response, content) = self.http.request(uri, "GET")
666 self.assertNotEqual(response['etag'], "")
667 (response, content) = self.http.request(uri, "GET")
668 self.assertEqual(response.status, 200)
669 self.assertEqual(response.fromcache, True)
670
671 (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
672 self.assertEqual(response.status, 200)
673 self.assertEqual(response.fromcache, False)
674
675 def testGetCacheControlNoStoreRequest(self):
676 # A no-store request means that the response should not be stored.
677 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
678
679 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
680 self.assertEqual(response.status, 200)
681 self.assertEqual(response.fromcache, False)
682
683 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
684 self.assertEqual(response.status, 200)
685 self.assertEqual(response.fromcache, False)
686
687 def testGetCacheControlNoStoreResponse(self):
688 # A no-store response means that the response should not be stored.
689 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
690
691 (response, content) = self.http.request(uri, "GET")
692 self.assertEqual(response.status, 200)
693 self.assertEqual(response.fromcache, False)
694
695 (response, content) = self.http.request(uri, "GET")
696 self.assertEqual(response.status, 200)
697 self.assertEqual(response.fromcache, False)
698
699 def testGetCacheControlNoCacheNoStoreRequest(self):
700 # Test that a no-store, no-cache clears the entry from the cache
701 # even if it was cached previously.
702 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
703
704 (response, content) = self.http.request(uri, "GET")
705 (response, content) = self.http.request(uri, "GET")
706 self.assertEqual(response.fromcache, True)
707 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
708 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
709 self.assertEqual(response.status, 200)
710 self.assertEqual(response.fromcache, False)
711
712 def testUpdateInvalidatesCache(self):
713 # Test that calling PUT or DELETE on a
714 # URI that is cache invalidates that cache.
715 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
716
717 (response, content) = self.http.request(uri, "GET")
718 (response, content) = self.http.request(uri, "GET")
719 self.assertEqual(response.fromcache, True)
720 (response, content) = self.http.request(uri, "DELETE")
721 self.assertEqual(response.status, 405)
722
723 (response, content) = self.http.request(uri, "GET")
724 self.assertEqual(response.fromcache, False)
725
726 def testUpdateUsesCachedETag(self):
727 # Test that we natively support http://www.w3.org/1999/04/Editing/
728 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
729
730 (response, content) = self.http.request(uri, "GET")
731 self.assertEqual(response.status, 200)
732 self.assertEqual(response.fromcache, False)
733 (response, content) = self.http.request(uri, "GET")
734 self.assertEqual(response.status, 200)
735 self.assertEqual(response.fromcache, True)
736 (response, content) = self.http.request(uri, "PUT")
737 self.assertEqual(response.status, 200)
738 (response, content) = self.http.request(uri, "PUT")
739 self.assertEqual(response.status, 412)
740
741 def testUpdateUsesCachedETagAndOCMethod(self):
742 # Test that we natively support http://www.w3.org/1999/04/Editing/
743 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
744
745 (response, content) = self.http.request(uri, "GET")
746 self.assertEqual(response.status, 200)
747 self.assertEqual(response.fromcache, False)
748 (response, content) = self.http.request(uri, "GET")
749 self.assertEqual(response.status, 200)
750 self.assertEqual(response.fromcache, True)
751 self.http.optimistic_concurrency_methods.append("DELETE")
752 (response, content) = self.http.request(uri, "DELETE")
753 self.assertEqual(response.status, 200)
754
755
756 def testUpdateUsesCachedETagOverridden(self):
757 # Test that we natively support http://www.w3.org/1999/04/Editing/
758 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
759
760 (response, content) = self.http.request(uri, "GET")
761 self.assertEqual(response.status, 200)
762 self.assertEqual(response.fromcache, False)
763 (response, content) = self.http.request(uri, "GET")
764 self.assertEqual(response.status, 200)
765 self.assertEqual(response.fromcache, True)
766 (response, content) = self.http.request(uri, "PUT", headers={'if-match': 'fred'})
767 self.assertEqual(response.status, 412)
768
769 def testBasicAuth(self):
770 # Test Basic Authentication
771 uri = urllib.parse.urljoin(base, "basic/file.txt")
772 (response, content) = self.http.request(uri, "GET")
773 self.assertEqual(response.status, 401)
774
775 uri = urllib.parse.urljoin(base, "basic/")
776 (response, content) = self.http.request(uri, "GET")
777 self.assertEqual(response.status, 401)
778
779 self.http.add_credentials('joe', 'password')
780 (response, content) = self.http.request(uri, "GET")
781 self.assertEqual(response.status, 200)
782
783 uri = urllib.parse.urljoin(base, "basic/file.txt")
784 (response, content) = self.http.request(uri, "GET")
785 self.assertEqual(response.status, 200)
786
787 def testBasicAuthWithDomain(self):
788 # Test Basic Authentication
789 uri = urllib.parse.urljoin(base, "basic/file.txt")
790 (response, content) = self.http.request(uri, "GET")
791 self.assertEqual(response.status, 401)
792
793 uri = urllib.parse.urljoin(base, "basic/")
794 (response, content) = self.http.request(uri, "GET")
795 self.assertEqual(response.status, 401)
796
797 self.http.add_credentials('joe', 'password', "example.org")
798 (response, content) = self.http.request(uri, "GET")
799 self.assertEqual(response.status, 401)
800
801 uri = urllib.parse.urljoin(base, "basic/file.txt")
802 (response, content) = self.http.request(uri, "GET")
803 self.assertEqual(response.status, 401)
804
805 domain = urllib.parse.urlparse(base)[1]
806 self.http.add_credentials('joe', 'password', domain)
807 (response, content) = self.http.request(uri, "GET")
808 self.assertEqual(response.status, 200)
809
810 uri = urllib.parse.urljoin(base, "basic/file.txt")
811 (response, content) = self.http.request(uri, "GET")
812 self.assertEqual(response.status, 200)
813
814
815
816
817
818
819 def testBasicAuthTwoDifferentCredentials(self):
820 # Test Basic Authentication with multiple sets of credentials
821 uri = urllib.parse.urljoin(base, "basic2/file.txt")
822 (response, content) = self.http.request(uri, "GET")
823 self.assertEqual(response.status, 401)
824
825 uri = urllib.parse.urljoin(base, "basic2/")
826 (response, content) = self.http.request(uri, "GET")
827 self.assertEqual(response.status, 401)
828
829 self.http.add_credentials('fred', 'barney')
830 (response, content) = self.http.request(uri, "GET")
831 self.assertEqual(response.status, 200)
832
833 uri = urllib.parse.urljoin(base, "basic2/file.txt")
834 (response, content) = self.http.request(uri, "GET")
835 self.assertEqual(response.status, 200)
836
837 def testBasicAuthNested(self):
838 # Test Basic Authentication with resources
839 # that are nested
840 uri = urllib.parse.urljoin(base, "basic-nested/")
841 (response, content) = self.http.request(uri, "GET")
842 self.assertEqual(response.status, 401)
843
844 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
845 (response, content) = self.http.request(uri, "GET")
846 self.assertEqual(response.status, 401)
847
848 # Now add in credentials one at a time and test.
849 self.http.add_credentials('joe', 'password')
850
851 uri = urllib.parse.urljoin(base, "basic-nested/")
852 (response, content) = self.http.request(uri, "GET")
853 self.assertEqual(response.status, 200)
854
855 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
856 (response, content) = self.http.request(uri, "GET")
857 self.assertEqual(response.status, 401)
858
859 self.http.add_credentials('fred', 'barney')
860
861 uri = urllib.parse.urljoin(base, "basic-nested/")
862 (response, content) = self.http.request(uri, "GET")
863 self.assertEqual(response.status, 200)
864
865 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
866 (response, content) = self.http.request(uri, "GET")
867 self.assertEqual(response.status, 200)
868
869 def testDigestAuth(self):
870 # Test that we support Digest Authentication
871 uri = urllib.parse.urljoin(base, "digest/")
872 (response, content) = self.http.request(uri, "GET")
873 self.assertEqual(response.status, 401)
874
875 self.http.add_credentials('joe', 'password')
876 (response, content) = self.http.request(uri, "GET")
877 self.assertEqual(response.status, 200)
878
879 uri = urllib.parse.urljoin(base, "digest/file.txt")
880 (response, content) = self.http.request(uri, "GET")
881
882 def testDigestAuthNextNonceAndNC(self):
883 # Test that if the server sets nextnonce that we reset
884 # the nonce count back to 1
885 uri = urllib.parse.urljoin(base, "digest/file.txt")
886 self.http.add_credentials('joe', 'password')
887 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
888 info = httplib2._parse_www_authenticate(response, 'authentication-info')
889 self.assertEqual(response.status, 200)
890 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
891 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
892 self.assertEqual(response.status, 200)
893
894 if 'nextnonce' in info:
895 self.assertEqual(info2['nc'], 1)
896
897 def testDigestAuthStale(self):
898 # Test that we can handle a nonce becoming stale
899 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
900 self.http.add_credentials('joe', 'password')
901 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
902 info = httplib2._parse_www_authenticate(response, 'authentication-info')
903 self.assertEqual(response.status, 200)
904
905 time.sleep(3)
906 # Sleep long enough that the nonce becomes stale
907
908 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
909 self.assertFalse(response.fromcache)
910 self.assertTrue(response._stale_digest)
911 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
912 self.assertEqual(response.status, 200)
913
914 def reflector(self, content):
915 return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
916
917 def testReflector(self):
918 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
919 (response, content) = self.http.request(uri, "GET")
920 d = self.reflector(content)
921 self.assertTrue('HTTP_USER_AGENT' in d)
922
923try:
924 import memcache
925 class HttpTestMemCached(HttpTest):
926 def setUp(self):
927 self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
928 #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
929 self.http = httplib2.Http(self.cache)
930 self.cache.flush_all()
931 # Not exactly sure why the sleep is needed here, but
932 # if not present then some unit tests that rely on caching
933 # fail. Memcached seems to lose some sets immediately
934 # after a flush_all if the set is to a value that
935 # was previously cached. (Maybe the flush is handled async?)
936 time.sleep(1)
937 self.http.clear_credentials()
938except:
939 pass
940
941
942
943# ------------------------------------------------------------------------
944
945class HttpPrivateTest(unittest.TestCase):
946
947 def testParseCacheControl(self):
948 # Test that we can parse the Cache-Control header
949 self.assertEqual({}, httplib2._parse_cache_control({}))
950 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
951 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
952 self.assertEqual(cc['no-cache'], 1)
953 self.assertEqual(cc['max-age'], '7200')
954 cc = httplib2._parse_cache_control({'cache-control': ' , '})
955 self.assertEqual(cc[''], 1)
956
957 def testNormalizeHeaders(self):
958 # Test that we normalize headers to lowercase
959 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
960 self.assertTrue('cache-control' in h)
961 self.assertTrue('other' in h)
962 self.assertEqual('Stuff', h['other'])
963
964 def testExpirationModelTransparent(self):
965 # Test that no-cache makes our request TRANSPARENT
966 response_headers = {
967 'cache-control': 'max-age=7200'
968 }
969 request_headers = {
970 'cache-control': 'no-cache'
971 }
972 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
973
974 def testMaxAgeNonNumeric(self):
975 # Test that no-cache makes our request TRANSPARENT
976 response_headers = {
977 'cache-control': 'max-age=fred, min-fresh=barney'
978 }
979 request_headers = {
980 }
981 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
982
983
984 def testExpirationModelNoCacheResponse(self):
985 # The date and expires point to an entry that should be
986 # FRESH, but the no-cache over-rides that.
987 now = time.time()
988 response_headers = {
989 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
990 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
991 'cache-control': 'no-cache'
992 }
993 request_headers = {
994 }
995 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
996
997 def testExpirationModelStaleRequestMustReval(self):
998 # must-revalidate forces STALE
999 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
1000
1001 def testExpirationModelStaleResponseMustReval(self):
1002 # must-revalidate forces STALE
1003 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
1004
1005 def testExpirationModelFresh(self):
1006 response_headers = {
1007 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1008 'cache-control': 'max-age=2'
1009 }
1010 request_headers = {
1011 }
1012 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1013 time.sleep(3)
1014 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1015
1016 def testExpirationMaxAge0(self):
1017 response_headers = {
1018 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1019 'cache-control': 'max-age=0'
1020 }
1021 request_headers = {
1022 }
1023 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1024
1025 def testExpirationModelDateAndExpires(self):
1026 now = time.time()
1027 response_headers = {
1028 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1029 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1030 }
1031 request_headers = {
1032 }
1033 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1034 time.sleep(3)
1035 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1036
1037 def testExpiresZero(self):
1038 now = time.time()
1039 response_headers = {
1040 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1041 'expires': "0",
1042 }
1043 request_headers = {
1044 }
1045 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1046
1047 def testExpirationModelDateOnly(self):
1048 now = time.time()
1049 response_headers = {
1050 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
1051 }
1052 request_headers = {
1053 }
1054 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1055
1056 def testExpirationModelOnlyIfCached(self):
1057 response_headers = {
1058 }
1059 request_headers = {
1060 'cache-control': 'only-if-cached',
1061 }
1062 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1063
1064 def testExpirationModelMaxAgeBoth(self):
1065 now = time.time()
1066 response_headers = {
1067 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1068 'cache-control': 'max-age=2'
1069 }
1070 request_headers = {
1071 'cache-control': 'max-age=0'
1072 }
1073 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1074
1075 def testExpirationModelDateAndExpiresMinFresh1(self):
1076 now = time.time()
1077 response_headers = {
1078 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1079 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1080 }
1081 request_headers = {
1082 'cache-control': 'min-fresh=2'
1083 }
1084 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1085
1086 def testExpirationModelDateAndExpiresMinFresh2(self):
1087 now = time.time()
1088 response_headers = {
1089 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1090 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1091 }
1092 request_headers = {
1093 'cache-control': 'min-fresh=2'
1094 }
1095 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1096
1097 def testParseWWWAuthenticateEmpty(self):
1098 res = httplib2._parse_www_authenticate({})
1099 self.assertEqual(len(list(res.keys())), 0)
1100
1101 def testParseWWWAuthenticate(self):
1102 # different uses of spaces around commas
1103 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
1104 self.assertEqual(len(list(res.keys())), 1)
1105 self.assertEqual(len(list(res['test'].keys())), 5)
1106
1107 # tokens with non-alphanum
1108 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
1109 self.assertEqual(len(list(res.keys())), 1)
1110 self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
1111
1112 # quoted string with quoted pairs
1113 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
1114 self.assertEqual(len(list(res.keys())), 1)
1115 self.assertEqual(res['test']['realm'], 'a "test" realm')
1116
1117 def testParseWWWAuthenticateStrict(self):
1118 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
1119 self.testParseWWWAuthenticate();
1120 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
1121
1122 def testParseWWWAuthenticateBasic(self):
1123 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
1124 basic = res['basic']
1125 self.assertEqual('me', basic['realm'])
1126
1127 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
1128 basic = res['basic']
1129 self.assertEqual('me', basic['realm'])
1130 self.assertEqual('MD5', basic['algorithm'])
1131
1132 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
1133 basic = res['basic']
1134 self.assertEqual('me', basic['realm'])
1135 self.assertEqual('MD5', basic['algorithm'])
1136
1137 def testParseWWWAuthenticateBasic2(self):
1138 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
1139 basic = res['basic']
1140 self.assertEqual('me', basic['realm'])
1141 self.assertEqual('fred', basic['other'])
1142
1143 def testParseWWWAuthenticateBasic3(self):
1144 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
1145 basic = res['basic']
1146 self.assertEqual('me', basic['realm'])
1147
1148
1149 def testParseWWWAuthenticateDigest(self):
1150 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1151 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
1152 digest = res['digest']
1153 self.assertEqual('testrealm@host.com', digest['realm'])
1154 self.assertEqual('auth,auth-int', digest['qop'])
1155
1156
1157 def testParseWWWAuthenticateMultiple(self):
1158 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1159 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
1160 digest = res['digest']
1161 self.assertEqual('testrealm@host.com', digest['realm'])
1162 self.assertEqual('auth,auth-int', digest['qop'])
1163 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1164 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1165 basic = res['basic']
1166 self.assertEqual('me', basic['realm'])
1167
1168 def testParseWWWAuthenticateMultiple2(self):
1169 # Handle an added comma between challenges, which might get thrown in if the challenges were
1170 # originally sent in separate www-authenticate headers.
1171 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1172 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
1173 digest = res['digest']
1174 self.assertEqual('testrealm@host.com', digest['realm'])
1175 self.assertEqual('auth,auth-int', digest['qop'])
1176 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1177 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1178 basic = res['basic']
1179 self.assertEqual('me', basic['realm'])
1180
1181 def testParseWWWAuthenticateMultiple3(self):
1182 # Handle an added comma between challenges, which might get thrown in if the challenges were
1183 # originally sent in separate www-authenticate headers.
1184 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1185 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1186 digest = res['digest']
1187 self.assertEqual('testrealm@host.com', digest['realm'])
1188 self.assertEqual('auth,auth-int', digest['qop'])
1189 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1190 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1191 basic = res['basic']
1192 self.assertEqual('me', basic['realm'])
1193 wsse = res['wsse']
1194 self.assertEqual('foo', wsse['realm'])
1195 self.assertEqual('UsernameToken', wsse['profile'])
1196
1197 def testParseWWWAuthenticateMultiple4(self):
1198 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1199 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1200 digest = res['digest']
1201 self.assertEqual('test-real.m@host.com', digest['realm'])
1202 self.assertEqual('\tauth,auth-int', digest['qop'])
1203 self.assertEqual('(*)&^&$%#', digest['nonce'])
1204
1205 def testParseWWWAuthenticateMoreQuoteCombos(self):
1206 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
1207 digest = res['digest']
1208 self.assertEqual('myrealm', digest['realm'])
1209
1210 def testDigestObject(self):
1211 credentials = ('joe', 'password')
1212 host = None
1213 request_uri = '/projects/httplib2/test/digest/'
1214 headers = {}
1215 response = {
1216 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
1217 }
1218 content = b""
1219
1220 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1221 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1222 our_request = "Authorization: %s" % headers['Authorization']
1223 working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
1224 self.assertEqual(our_request, working_request)
1225
1226
1227 def testDigestObjectStale(self):
1228 credentials = ('joe', 'password')
1229 host = None
1230 request_uri = '/projects/httplib2/test/digest/'
1231 headers = {}
1232 response = httplib2.Response({ })
1233 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1234 response.status = 401
1235 content = b""
1236 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1237 # Returns true to force a retry
1238 self.assertTrue( d.response(response, content) )
1239
1240 def testDigestObjectAuthInfo(self):
1241 credentials = ('joe', 'password')
1242 host = None
1243 request_uri = '/projects/httplib2/test/digest/'
1244 headers = {}
1245 response = httplib2.Response({ })
1246 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1247 response['authentication-info'] = 'nextnonce="fred"'
1248 content = b""
1249 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1250 # Returns true to force a retry
1251 self.assertFalse( d.response(response, content) )
1252 self.assertEqual('fred', d.challenge['nonce'])
1253 self.assertEqual(1, d.challenge['nc'])
1254
1255 def testWsseAlgorithm(self):
1256 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
1257 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1258 self.assertEqual(expected, digest)
1259
1260 def testEnd2End(self):
1261 # one end to end header
1262 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
1263 end2end = httplib2._get_end2end_headers(response)
1264 self.assertTrue('content-type' in end2end)
1265 self.assertTrue('te' not in end2end)
1266 self.assertTrue('connection' not in end2end)
1267
1268 # one end to end header that gets eliminated
1269 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
1270 end2end = httplib2._get_end2end_headers(response)
1271 self.assertTrue('content-type' not in end2end)
1272 self.assertTrue('te' not in end2end)
1273 self.assertTrue('connection' not in end2end)
1274
1275 # Degenerate case of no headers
1276 response = {}
1277 end2end = httplib2._get_end2end_headers(response)
1278 self.assertEquals(0, len(end2end))
1279
1280 # Degenerate case of connection referrring to a header not passed in
1281 response = {'connection': 'content-type'}
1282 end2end = httplib2._get_end2end_headers(response)
1283 self.assertEquals(0, len(end2end))
1284
1285unittest.main()