blob: 246956ab92f8b7faceb3ae633cc1f215f93cfc3d [file] [log] [blame]
pilgrim00a352e2009-05-29 04:04:44 +00001#!/usr/bin/env python3
2"""
3httplib2test
4
5A set of unit tests for httplib2.py.
6
7Requires Python 3.0 or later
8"""
9
10__author__ = "Joe Gregorio (joe@bitworking.org)"
11__copyright__ = "Copyright 2006, Joe Gregorio"
12__contributors__ = ["Mark Pilgrim"]
13__license__ = "MIT"
14__history__ = """ """
15__version__ = "0.2 ($Rev: 118 $)"
16
Joe Gregoriob6c90c42011-02-11 01:03:22 -050017import base64
pilgrim00a352e2009-05-29 04:04:44 +000018import http.client
19import httplib2
pilgrim00a352e2009-05-29 04:04:44 +000020import io
Joe Gregoriob6c90c42011-02-11 01:03:22 -050021import os
Joe Gregorio46546a62012-10-03 14:31:10 -040022import pickle
Joe Gregoriob6c90c42011-02-11 01:03:22 -050023import socket
Joe Gregoriob53de9b2011-06-07 15:44:51 -040024import ssl
Joe Gregoriob6c90c42011-02-11 01:03:22 -050025import sys
26import time
27import unittest
28import urllib.parse
pilgrim00a352e2009-05-29 04:04:44 +000029
30# The test resources base uri
31base = 'http://bitworking.org/projects/httplib2/test/'
32#base = 'http://localhost/projects/httplib2/test/'
33cacheDirName = ".cache"
34
35
36class CredentialsTest(unittest.TestCase):
37 def test(self):
38 c = httplib2.Credentials()
39 c.add("joe", "password")
40 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
41 self.assertEqual(("joe", "password"), list(c.iter(""))[0])
42 c.add("fred", "password2", "wellformedweb.org")
43 self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
44 self.assertEqual(1, len(list(c.iter("bitworking.org"))))
45 self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
46 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
47 c.clear()
48 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
49 c.add("fred", "password2", "wellformedweb.org")
50 self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
51 self.assertEqual(0, len(list(c.iter("bitworking.org"))))
52 self.assertEqual(0, len(list(c.iter(""))))
53
54
55class ParserTest(unittest.TestCase):
56 def testFromStd66(self):
57 self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
58 self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
59 self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
60 self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
61 self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
62 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
63 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
64 self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
65
66
67class UrlNormTest(unittest.TestCase):
68 def test(self):
69 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
70 self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
71 self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
72 self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
73 self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
74 self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
75 try:
76 httplib2.urlnorm("/")
77 self.fail("Non-absolute URIs should raise an exception")
78 except httplib2.RelativeURIError:
79 pass
80
81class UrlSafenameTest(unittest.TestCase):
82 def test(self):
83 # Test that different URIs end up generating different safe names
84 self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
85 self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
86 self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
87 self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
88 self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
89 self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
90 # Test the max length limits
91 uri = "http://" + ("w" * 200) + ".org"
92 uri2 = "http://" + ("w" * 201) + ".org"
93 self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
94 # Max length should be 200 + 1 (",") + 32
95 self.assertEqual(233, len(httplib2.safename(uri2)))
96 self.assertEqual(233, len(httplib2.safename(uri)))
97 # Unicode
98 if sys.version_info >= (2,3):
99 self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
100
101class _MyResponse(io.BytesIO):
102 def __init__(self, body, **kwargs):
103 io.BytesIO.__init__(self, body)
104 self.headers = kwargs
105
106 def items(self):
107 return self.headers.items()
108
109 def iteritems(self):
110 return iter(self.headers.items())
111
112
113class _MyHTTPConnection(object):
114 "This class is just a mock of httplib.HTTPConnection used for testing"
115
116 def __init__(self, host, port=None, key_file=None, cert_file=None,
117 strict=None, timeout=None, proxy_info=None):
118 self.host = host
119 self.port = port
120 self.timeout = timeout
121 self.log = ""
Joe Gregoriob53de9b2011-06-07 15:44:51 -0400122 self.sock = None
pilgrim00a352e2009-05-29 04:04:44 +0000123
124 def set_debuglevel(self, level):
125 pass
126
127 def connect(self):
128 "Connect to a host on a given port."
129 pass
130
131 def close(self):
132 pass
133
134 def request(self, method, request_uri, body, headers):
135 pass
136
137 def getresponse(self):
138 return _MyResponse(b"the body", status="200")
139
140
Joe Gregorio1d3a7092013-03-08 14:14:56 -0500141class _MyHTTPBadStatusConnection(object):
142 "Mock of httplib.HTTPConnection that raises BadStatusLine."
143
144 num_calls = 0
145
146 def __init__(self, host, port=None, key_file=None, cert_file=None,
147 strict=None, timeout=None, proxy_info=None):
148 self.host = host
149 self.port = port
150 self.timeout = timeout
151 self.log = ""
152 self.sock = None
153 _MyHTTPBadStatusConnection.num_calls = 0
154
155 def set_debuglevel(self, level):
156 pass
157
158 def connect(self):
159 pass
160
161 def close(self):
162 pass
163
164 def request(self, method, request_uri, body, headers):
165 pass
166
167 def getresponse(self):
168 _MyHTTPBadStatusConnection.num_calls += 1
169 raise http.client.BadStatusLine("")
170
171
pilgrim00a352e2009-05-29 04:04:44 +0000172class HttpTest(unittest.TestCase):
173 def setUp(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500174 if os.path.exists(cacheDirName):
pilgrim00a352e2009-05-29 04:04:44 +0000175 [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
176 self.http = httplib2.Http(cacheDirName)
177 self.http.clear_credentials()
178
Joe Gregoriof3ee17b2011-02-13 11:59:51 -0500179 def testIPv6NoSSL(self):
180 try:
181 self.http.request("http://[::1]/")
182 except socket.gaierror:
183 self.fail("should get the address family right for IPv6")
184 except socket.error:
185 # Even if IPv6 isn't installed on a machine it should just raise socket.error
186 pass
187
188 def testIPv6SSL(self):
189 try:
190 self.http.request("https://[::1]/")
191 except socket.gaierror:
192 self.fail("should get the address family right for IPv6")
193 except socket.error:
194 # Even if IPv6 isn't installed on a machine it should just raise socket.error
195 pass
196
pilgrim00a352e2009-05-29 04:04:44 +0000197 def testConnectionType(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500198 self.http.force_exception_to_status_code = False
pilgrim00a352e2009-05-29 04:04:44 +0000199 response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
200 self.assertEqual(response['content-location'], "http://bitworking.org")
201 self.assertEqual(content, b"the body")
202
Joe Gregorio1d3a7092013-03-08 14:14:56 -0500203
204 def testBadStatusLineRetry(self):
205 old_retries = httplib2.RETRIES
206 httplib2.RETRIES = 1
207 self.http.force_exception_to_status_code = False
208 try:
209 response, content = self.http.request("http://bitworking.org",
210 connection_type=_MyHTTPBadStatusConnection)
211 except http.client.BadStatusLine:
212 self.assertEqual(2, _MyHTTPBadStatusConnection.num_calls)
213 httplib2.RETRIES = old_retries
214
215
pilgrim00a352e2009-05-29 04:04:44 +0000216 def testGetUnknownServer(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500217 self.http.force_exception_to_status_code = False
pilgrim00a352e2009-05-29 04:04:44 +0000218 try:
219 self.http.request("http://fred.bitworking.org/")
220 self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
221 except httplib2.ServerNotFoundError:
222 pass
223
224 # Now test with exceptions turned off
225 self.http.force_exception_to_status_code = True
226
227 (response, content) = self.http.request("http://fred.bitworking.org/")
228 self.assertEqual(response['content-type'], 'text/plain')
229 self.assertTrue(content.startswith(b"Unable to find"))
230 self.assertEqual(response.status, 400)
231
Joe Gregoriob6c90c42011-02-11 01:03:22 -0500232 def testGetConnectionRefused(self):
233 self.http.force_exception_to_status_code = False
234 try:
235 self.http.request("http://localhost:7777/")
236 self.fail("An socket.error exception must be thrown on Connection Refused.")
237 except socket.error:
238 pass
239
240 # Now test with exceptions turned off
241 self.http.force_exception_to_status_code = True
242
243 (response, content) = self.http.request("http://localhost:7777/")
244 self.assertEqual(response['content-type'], 'text/plain')
245 self.assertTrue(b"Connection refused" in content)
246 self.assertEqual(response.status, 400)
247
pilgrim00a352e2009-05-29 04:04:44 +0000248 def testGetIRI(self):
249 if sys.version_info >= (2,3):
250 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
251 (response, content) = self.http.request(uri, "GET")
252 d = self.reflector(content)
Joe Gregorioffc3d542013-02-19 15:57:37 -0500253 self.assertTrue('QUERY_STRING' in d)
254 self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0)
255
pilgrim00a352e2009-05-29 04:04:44 +0000256 def testGetIsDefaultMethod(self):
257 # Test that GET is the default method
258 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
259 (response, content) = self.http.request(uri)
260 self.assertEqual(response['x-method'], "GET")
261
262 def testDifferentMethods(self):
263 # Test that all methods can be used
264 uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
265 for method in ["GET", "PUT", "DELETE", "POST"]:
266 (response, content) = self.http.request(uri, method, body=b" ")
267 self.assertEqual(response['x-method'], method)
268
Joe Gregoriob628c0b2009-07-16 12:28:04 -0400269 def testHeadRead(self):
270 # Test that we don't try to read the response of a HEAD request
271 # since httplib blocks response.read() for HEAD requests.
272 # Oddly enough this doesn't appear as a problem when doing HEAD requests
273 # against Apache servers.
274 uri = "http://www.google.com/"
275 (response, content) = self.http.request(uri, "HEAD")
276 self.assertEqual(response.status, 200)
277 self.assertEqual(content, b"")
278
pilgrim00a352e2009-05-29 04:04:44 +0000279 def testGetNoCache(self):
280 # Test that can do a GET w/o the cache turned on.
281 http = httplib2.Http()
282 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
283 (response, content) = http.request(uri, "GET")
284 self.assertEqual(response.status, 200)
285 self.assertEqual(response.previous, None)
286
Joe Gregorioe202d212009-07-16 14:57:52 -0400287 def testGetOnlyIfCachedCacheHit(self):
288 # Test that can do a GET with cache and 'only-if-cached'
289 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
290 (response, content) = self.http.request(uri, "GET")
291 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
292 self.assertEqual(response.fromcache, True)
293 self.assertEqual(response.status, 200)
294
pilgrim00a352e2009-05-29 04:04:44 +0000295 def testGetOnlyIfCachedCacheMiss(self):
296 # Test that can do a GET with no cache with 'only-if-cached'
pilgrim00a352e2009-05-29 04:04:44 +0000297 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorioe202d212009-07-16 14:57:52 -0400298 (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
pilgrim00a352e2009-05-29 04:04:44 +0000299 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400300 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000301
302 def testGetOnlyIfCachedNoCacheAtAll(self):
303 # Test that can do a GET with no cache with 'only-if-cached'
304 # Of course, there might be an intermediary beyond us
305 # that responds to the 'only-if-cached', so this
306 # test can't really be guaranteed to pass.
307 http = httplib2.Http()
308 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
309 (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
310 self.assertEqual(response.fromcache, False)
Joe Gregorioe202d212009-07-16 14:57:52 -0400311 self.assertEqual(response.status, 504)
pilgrim00a352e2009-05-29 04:04:44 +0000312
313 def testUserAgent(self):
314 # Test that we provide a default user-agent
315 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
316 (response, content) = self.http.request(uri, "GET")
317 self.assertEqual(response.status, 200)
318 self.assertTrue(content.startswith(b"Python-httplib2/"))
319
320 def testUserAgentNonDefault(self):
321 # Test that the default user-agent can be over-ridden
322
323 uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
324 (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
325 self.assertEqual(response.status, 200)
326 self.assertTrue(content.startswith(b"fred/1.0"))
327
328 def testGet300WithLocation(self):
329 # Test the we automatically follow 300 redirects if a Location: header is provided
330 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
331 (response, content) = self.http.request(uri, "GET")
332 self.assertEqual(response.status, 200)
333 self.assertEqual(content, b"This is the final destination.\n")
334 self.assertEqual(response.previous.status, 300)
335 self.assertEqual(response.previous.fromcache, False)
336
337 # Confirm that the intermediate 300 is not cached
338 (response, content) = self.http.request(uri, "GET")
339 self.assertEqual(response.status, 200)
340 self.assertEqual(content, b"This is the final destination.\n")
341 self.assertEqual(response.previous.status, 300)
342 self.assertEqual(response.previous.fromcache, False)
343
344 def testGet300WithLocationNoRedirect(self):
345 # Test the we automatically follow 300 redirects if a Location: header is provided
346 self.http.follow_redirects = False
347 uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
348 (response, content) = self.http.request(uri, "GET")
349 self.assertEqual(response.status, 300)
350
351 def testGet300WithoutLocation(self):
352 # Not giving a Location: header in a 300 response is acceptable
353 # In which case we just return the 300 response
354 uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
355 (response, content) = self.http.request(uri, "GET")
356 self.assertEqual(response.status, 300)
357 self.assertTrue(response['content-type'].startswith("text/html"))
358 self.assertEqual(response.previous, None)
359
360 def testGet301(self):
361 # Test that we automatically follow 301 redirects
362 # and that we cache the 301 response
363 uri = urllib.parse.urljoin(base, "301/onestep.asis")
364 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
365 (response, content) = self.http.request(uri, "GET")
366 self.assertEqual(response.status, 200)
367 self.assertTrue('content-location' in response)
368 self.assertEqual(response['content-location'], destination)
369 self.assertEqual(content, b"This is the final destination.\n")
370 self.assertEqual(response.previous.status, 301)
371 self.assertEqual(response.previous.fromcache, False)
372
373 (response, content) = self.http.request(uri, "GET")
374 self.assertEqual(response.status, 200)
375 self.assertEqual(response['content-location'], destination)
376 self.assertEqual(content, b"This is the final destination.\n")
377 self.assertEqual(response.previous.status, 301)
378 self.assertEqual(response.previous.fromcache, True)
379
Joe Gregorio694a8122011-02-13 21:40:09 -0500380 def testHead301(self):
381 # Test that we automatically follow 301 redirects
382 uri = urllib.parse.urljoin(base, "301/onestep.asis")
383 (response, content) = self.http.request(uri, "HEAD")
384 self.assertEqual(response.status, 200)
385 self.assertEqual(response.previous.status, 301)
386 self.assertEqual(response.previous.fromcache, False)
pilgrim00a352e2009-05-29 04:04:44 +0000387
388 def testGet301NoRedirect(self):
389 # Test that we automatically follow 301 redirects
390 # and that we cache the 301 response
391 self.http.follow_redirects = False
392 uri = urllib.parse.urljoin(base, "301/onestep.asis")
393 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
394 (response, content) = self.http.request(uri, "GET")
395 self.assertEqual(response.status, 301)
396
397
398 def testGet302(self):
399 # Test that we automatically follow 302 redirects
400 # and that we DO NOT cache the 302 response
401 uri = urllib.parse.urljoin(base, "302/onestep.asis")
402 destination = urllib.parse.urljoin(base, "302/final-destination.txt")
403 (response, content) = self.http.request(uri, "GET")
404 self.assertEqual(response.status, 200)
405 self.assertEqual(response['content-location'], destination)
406 self.assertEqual(content, b"This is the final destination.\n")
407 self.assertEqual(response.previous.status, 302)
408 self.assertEqual(response.previous.fromcache, False)
409
410 uri = urllib.parse.urljoin(base, "302/onestep.asis")
411 (response, content) = self.http.request(uri, "GET")
412 self.assertEqual(response.status, 200)
413 self.assertEqual(response.fromcache, True)
414 self.assertEqual(response['content-location'], destination)
415 self.assertEqual(content, b"This is the final destination.\n")
416 self.assertEqual(response.previous.status, 302)
417 self.assertEqual(response.previous.fromcache, False)
418 self.assertEqual(response.previous['content-location'], uri)
419
420 uri = urllib.parse.urljoin(base, "302/twostep.asis")
421
422 (response, content) = self.http.request(uri, "GET")
423 self.assertEqual(response.status, 200)
424 self.assertEqual(response.fromcache, True)
425 self.assertEqual(content, b"This is the final destination.\n")
426 self.assertEqual(response.previous.status, 302)
427 self.assertEqual(response.previous.fromcache, False)
428
429 def testGet302RedirectionLimit(self):
430 # Test that we can set a lower redirection limit
431 # and that we raise an exception when we exceed
432 # that limit.
Joe Gregorioffc3d542013-02-19 15:57:37 -0500433 self.http.force_exception_to_status_code = False
pilgrim00a352e2009-05-29 04:04:44 +0000434
435 uri = urllib.parse.urljoin(base, "302/twostep.asis")
436 try:
437 (response, content) = self.http.request(uri, "GET", redirections = 1)
438 self.fail("This should not happen")
439 except httplib2.RedirectLimit:
440 pass
441 except Exception as e:
442 self.fail("Threw wrong kind of exception ")
443
444 # Re-run the test with out the exceptions
Joe Gregorio1ed20352011-02-14 23:30:21 -0500445 self.http.force_exception_to_status_code = True
pilgrim00a352e2009-05-29 04:04:44 +0000446
447 (response, content) = self.http.request(uri, "GET", redirections = 1)
448 self.assertEqual(response.status, 500)
449 self.assertTrue(response.reason.startswith("Redirected more"))
450 self.assertEqual("302", response['status'])
451 self.assertTrue(content.startswith(b"<html>"))
452 self.assertTrue(response.previous != None)
453
454 def testGet302NoLocation(self):
455 # Test that we throw an exception when we get
456 # a 302 with no Location: header.
Joe Gregorioffc3d542013-02-19 15:57:37 -0500457 self.http.force_exception_to_status_code = False
pilgrim00a352e2009-05-29 04:04:44 +0000458 uri = urllib.parse.urljoin(base, "302/no-location.asis")
459 try:
460 (response, content) = self.http.request(uri, "GET")
461 self.fail("Should never reach here")
462 except httplib2.RedirectMissingLocation:
463 pass
464 except Exception as e:
465 self.fail("Threw wrong kind of exception ")
466
467 # Re-run the test with out the exceptions
Joe Gregorioffc3d542013-02-19 15:57:37 -0500468 self.http.force_exception_to_status_code = True
pilgrim00a352e2009-05-29 04:04:44 +0000469
470 (response, content) = self.http.request(uri, "GET")
471 self.assertEqual(response.status, 500)
472 self.assertTrue(response.reason.startswith("Redirected but"))
473 self.assertEqual("302", response['status'])
474 self.assertTrue(content.startswith(b"This is content"))
Joe Gregorioffc3d542013-02-19 15:57:37 -0500475
Joe Gregorioac335ff2011-11-14 12:29:03 -0500476 def testGet301ViaHttps(self):
pilgrim00a352e2009-05-29 04:04:44 +0000477 # Google always redirects to http://google.com
Joe Gregorioac335ff2011-11-14 12:29:03 -0500478 (response, content) = self.http.request("https://code.google.com/apis/", "GET")
pilgrim00a352e2009-05-29 04:04:44 +0000479 self.assertEqual(200, response.status)
Joe Gregorioac335ff2011-11-14 12:29:03 -0500480 self.assertEqual(301, response.previous.status)
pilgrim00a352e2009-05-29 04:04:44 +0000481
482 def testGetViaHttps(self):
483 # Test that we can handle HTTPS
484 (response, content) = self.http.request("https://google.com/adsense/", "GET")
485 self.assertEqual(200, response.status)
486
487 def testGetViaHttpsSpecViolationOnLocation(self):
488 # Test that we follow redirects through HTTPS
489 # even if they violate the spec by including
Joe Gregorioffc3d542013-02-19 15:57:37 -0500490 # a relative Location: header instead of an
pilgrim00a352e2009-05-29 04:04:44 +0000491 # absolute one.
492 (response, content) = self.http.request("https://google.com/adsense", "GET")
493 self.assertEqual(200, response.status)
494 self.assertNotEqual(None, response.previous)
495
496
497 def testGetViaHttpsKeyCert(self):
498 # At this point I can only test
Joe Gregorioffc3d542013-02-19 15:57:37 -0500499 # that the key and cert files are passed in
500 # correctly to httplib. It would be nice to have
pilgrim00a352e2009-05-29 04:04:44 +0000501 # a real https endpoint to test against.
502 http = httplib2.Http(timeout=2)
503
504 http.add_certificate("akeyfile", "acertfile", "bitworking.org")
505 try:
Joe Gregorioa2324f62011-06-06 16:39:56 -0400506 (response, content) = http.request("https://bitworking.org", "GET")
507 except AttributeError:
508 self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
509 self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
510 except IOError:
511 # Skip on 3.2
512 pass
pilgrim00a352e2009-05-29 04:04:44 +0000513
514 try:
515 (response, content) = http.request("https://notthere.bitworking.org", "GET")
Joe Gregorioa2324f62011-06-06 16:39:56 -0400516 except httplib2.ServerNotFoundError:
517 self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
518 self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
519 except IOError:
520 # Skip on 3.2
521 pass
pilgrim00a352e2009-05-29 04:04:44 +0000522
Joe Gregoriob53de9b2011-06-07 15:44:51 -0400523 def testSslCertValidation(self):
524 # Test that we get an ssl.SSLError when specifying a non-existent CA
525 # certs file.
526 http = httplib2.Http(ca_certs='/nosuchfile')
527 self.assertRaises(IOError,
528 http.request, "https://www.google.com/", "GET")
pilgrim00a352e2009-05-29 04:04:44 +0000529
Joe Gregoriob53de9b2011-06-07 15:44:51 -0400530 # Test that we get a SSLHandshakeError if we try to access
531 # https://www.google.com, using a CA cert file that doesn't contain
Joe Gregorio1d3a7092013-03-08 14:14:56 -0500532 # the CA Google uses (i.e., simulating a cert that's not signed by a
Joe Gregoriob53de9b2011-06-07 15:44:51 -0400533 # trusted CA).
534 other_ca_certs = os.path.join(
535 os.path.dirname(os.path.abspath(httplib2.__file__ )),
536 "test", "other_cacerts.txt")
537 http = httplib2.Http(ca_certs=other_ca_certs)
538 self.assertRaises(ssl.SSLError,
539 http.request,"https://www.google.com/", "GET")
pilgrim00a352e2009-05-29 04:04:44 +0000540
Joe Gregoriob53de9b2011-06-07 15:44:51 -0400541 def testSniHostnameValidation(self):
542 self.http.request("https://google.com/", method="GET")
pilgrim00a352e2009-05-29 04:04:44 +0000543
544 def testGet303(self):
545 # Do a follow-up GET on a Location: header
546 # returned from a POST that gave a 303.
547 uri = urllib.parse.urljoin(base, "303/303.cgi")
548 (response, content) = self.http.request(uri, "POST", " ")
549 self.assertEqual(response.status, 200)
550 self.assertEqual(content, b"This is the final destination.\n")
551 self.assertEqual(response.previous.status, 303)
552
553 def testGet303NoRedirect(self):
554 # Do a follow-up GET on a Location: header
555 # returned from a POST that gave a 303.
556 self.http.follow_redirects = False
557 uri = urllib.parse.urljoin(base, "303/303.cgi")
558 (response, content) = self.http.request(uri, "POST", " ")
559 self.assertEqual(response.status, 303)
560
561 def test303ForDifferentMethods(self):
562 # Test that all methods can be used
563 uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
Joe Gregorioffc3d542013-02-19 15:57:37 -0500564 for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]:
pilgrim00a352e2009-05-29 04:04:44 +0000565 (response, content) = self.http.request(uri, method, body=b" ")
566 self.assertEqual(response['x-method'], method_on_303)
567
568 def testGet304(self):
569 # Test that we use ETags properly to validate our cache
570 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorio0197ec82014-03-06 14:56:29 -0500571 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
pilgrim00a352e2009-05-29 04:04:44 +0000572 self.assertNotEqual(response['etag'], "")
573
Joe Gregorio0197ec82014-03-06 14:56:29 -0500574 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
575 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'must-revalidate'})
pilgrim00a352e2009-05-29 04:04:44 +0000576 self.assertEqual(response.status, 200)
577 self.assertEqual(response.fromcache, True)
578
579 cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
580 f = open(cache_file_name, "r")
581 status_line = f.readline()
582 f.close()
583
584 self.assertTrue(status_line.startswith("status:"))
585
Joe Gregorio0197ec82014-03-06 14:56:29 -0500586 (response, content) = self.http.request(uri, "HEAD", headers = {'accept-encoding': 'identity'})
pilgrim00a352e2009-05-29 04:04:44 +0000587 self.assertEqual(response.status, 200)
588 self.assertEqual(response.fromcache, True)
589
Joe Gregorio0197ec82014-03-06 14:56:29 -0500590 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'range': 'bytes=0-0'})
pilgrim00a352e2009-05-29 04:04:44 +0000591 self.assertEqual(response.status, 206)
592 self.assertEqual(response.fromcache, False)
593
594 def testGetIgnoreEtag(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500595 # Test that we can forcibly ignore ETags
pilgrim00a352e2009-05-29 04:04:44 +0000596 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
Joe Gregorio0197ec82014-03-06 14:56:29 -0500597 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
pilgrim00a352e2009-05-29 04:04:44 +0000598 self.assertNotEqual(response['etag'], "")
599
Joe Gregorio0197ec82014-03-06 14:56:29 -0500600 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
pilgrim00a352e2009-05-29 04:04:44 +0000601 d = self.reflector(content)
Joe Gregorioffc3d542013-02-19 15:57:37 -0500602 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
pilgrim00a352e2009-05-29 04:04:44 +0000603
604 self.http.ignore_etag = True
Joe Gregorio0197ec82014-03-06 14:56:29 -0500605 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
pilgrim00a352e2009-05-29 04:04:44 +0000606 d = self.reflector(content)
607 self.assertEqual(response.fromcache, False)
Joe Gregorioffc3d542013-02-19 15:57:37 -0500608 self.assertFalse('HTTP_IF_NONE_MATCH' in d)
pilgrim00a352e2009-05-29 04:04:44 +0000609
610 def testOverrideEtag(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500611 # Test that we can forcibly ignore ETags
pilgrim00a352e2009-05-29 04:04:44 +0000612 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
Joe Gregorio0197ec82014-03-06 14:56:29 -0500613 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
pilgrim00a352e2009-05-29 04:04:44 +0000614 self.assertNotEqual(response['etag'], "")
615
Joe Gregorio0197ec82014-03-06 14:56:29 -0500616 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
pilgrim00a352e2009-05-29 04:04:44 +0000617 d = self.reflector(content)
Joe Gregorioffc3d542013-02-19 15:57:37 -0500618 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
619 self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred")
pilgrim00a352e2009-05-29 04:04:44 +0000620
Joe Gregorio0197ec82014-03-06 14:56:29 -0500621 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'max-age=0', 'if-none-match': 'fred'})
pilgrim00a352e2009-05-29 04:04:44 +0000622 d = self.reflector(content)
Joe Gregorioffc3d542013-02-19 15:57:37 -0500623 self.assertTrue('HTTP_IF_NONE_MATCH' in d)
624 self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred")
pilgrim00a352e2009-05-29 04:04:44 +0000625
626#MAP-commented this out because it consistently fails
627# def testGet304EndToEnd(self):
628# # Test that end to end headers get overwritten in the cache
629# uri = urllib.parse.urljoin(base, "304/end2end.cgi")
630# (response, content) = self.http.request(uri, "GET")
631# self.assertNotEqual(response['etag'], "")
632# old_date = response['date']
633# time.sleep(2)
634#
635# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
636# # The response should be from the cache, but the Date: header should be updated.
637# new_date = response['date']
638# self.assertNotEqual(new_date, old_date)
639# self.assertEqual(response.status, 200)
640# self.assertEqual(response.fromcache, True)
641
642 def testGet304LastModified(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500643 # Test that we can still handle a 304
pilgrim00a352e2009-05-29 04:04:44 +0000644 # by only using the last-modified cache validator.
645 uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
646 (response, content) = self.http.request(uri, "GET")
647
648 self.assertNotEqual(response['last-modified'], "")
649 (response, content) = self.http.request(uri, "GET")
650 (response, content) = self.http.request(uri, "GET")
651 self.assertEqual(response.status, 200)
652 self.assertEqual(response.fromcache, True)
653
654 def testGet307(self):
655 # Test that we do follow 307 redirects but
656 # do not cache the 307
657 uri = urllib.parse.urljoin(base, "307/onestep.asis")
658 (response, content) = self.http.request(uri, "GET")
659 self.assertEqual(response.status, 200)
660 self.assertEqual(content, b"This is the final destination.\n")
661 self.assertEqual(response.previous.status, 307)
662 self.assertEqual(response.previous.fromcache, False)
663
664 (response, content) = self.http.request(uri, "GET")
665 self.assertEqual(response.status, 200)
666 self.assertEqual(response.fromcache, True)
667 self.assertEqual(content, b"This is the final destination.\n")
668 self.assertEqual(response.previous.status, 307)
669 self.assertEqual(response.previous.fromcache, False)
670
671 def testGet410(self):
672 # Test that we pass 410's through
673 uri = urllib.parse.urljoin(base, "410/410.asis")
674 (response, content) = self.http.request(uri, "GET")
675 self.assertEqual(response.status, 410)
676
chris dent89f15142009-12-24 14:02:57 -0600677 def testVaryHeaderSimple(self):
678 """
679 RFC 2616 13.6
680 When the cache receives a subsequent request whose Request-URI
681 specifies one or more cache entries including a Vary header field,
682 the cache MUST NOT use such a cache entry to construct a response
683 to the new request unless all of the selecting request-headers
684 present in the new request match the corresponding stored
685 request-headers in the original request.
686 """
687 # test that the vary header is sent
688 uri = urllib.parse.urljoin(base, "vary/accept.asis")
689 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
690 self.assertEqual(response.status, 200)
691 self.assertTrue('vary' in response)
692
693 # get the resource again, from the cache since accept header in this
694 # request is the same as the request
695 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
696 self.assertEqual(response.status, 200)
697 self.assertEqual(response.fromcache, True, msg="Should be from cache")
698
699 # get the resource again, not from cache since Accept headers does not match
700 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
701 self.assertEqual(response.status, 200)
702 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
703
704 # get the resource again, without any Accept header, so again no match
705 (response, content) = self.http.request(uri, "GET")
706 self.assertEqual(response.status, 200)
707 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
708
709 def testNoVary(self):
Joe Gregorio46546a62012-10-03 14:31:10 -0400710 pass
chris dent89f15142009-12-24 14:02:57 -0600711 # when there is no vary, a different Accept header (e.g.) should not
712 # impact if the cache is used
713 # test that the vary header is not sent
Joe Gregorio46546a62012-10-03 14:31:10 -0400714 # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
715 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
716 # self.assertEqual(response.status, 200)
717 # self.assertFalse('vary' in response)
718 #
719 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
720 # self.assertEqual(response.status, 200)
721 # self.assertEqual(response.fromcache, True, msg="Should be from cache")
722 #
723 # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
724 # self.assertEqual(response.status, 200)
725 # self.assertEqual(response.fromcache, True, msg="Should be from cache")
chris dent89f15142009-12-24 14:02:57 -0600726
727 def testVaryHeaderDouble(self):
728 uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
729 (response, content) = self.http.request(uri, "GET", headers={
730 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
731 self.assertEqual(response.status, 200)
732 self.assertTrue('vary' in response)
733
734 # we are from cache
735 (response, content) = self.http.request(uri, "GET", headers={
736 'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
737 self.assertEqual(response.fromcache, True, msg="Should be from cache")
738
739 (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
740 self.assertEqual(response.status, 200)
741 self.assertEqual(response.fromcache, False)
742
743 # get the resource again, not from cache, varied headers don't match exact
744 (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
745 self.assertEqual(response.status, 200)
746 self.assertEqual(response.fromcache, False, msg="Should not be from cache")
747
jcgregorio88ef89b2010-05-13 23:42:11 -0400748 def testVaryUnusedHeader(self):
749 # A header's value is not considered to vary if it's not used at all.
750 uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
751 (response, content) = self.http.request(uri, "GET", headers={
752 'Accept': 'text/plain'})
753 self.assertEqual(response.status, 200)
754 self.assertTrue('vary' in response)
755
756 # we are from cache
757 (response, content) = self.http.request(uri, "GET", headers={
758 'Accept': 'text/plain',})
759 self.assertEqual(response.fromcache, True, msg="Should be from cache")
760
pilgrim00a352e2009-05-29 04:04:44 +0000761 def testHeadGZip(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500762 # Test that we don't try to decompress a HEAD response
pilgrim00a352e2009-05-29 04:04:44 +0000763 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
764 (response, content) = self.http.request(uri, "HEAD")
765 self.assertEqual(response.status, 200)
766 self.assertNotEqual(int(response['content-length']), 0)
767 self.assertEqual(content, b"")
768
769 def testGetGZip(self):
770 # Test that we support gzip compression
771 uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
772 (response, content) = self.http.request(uri, "GET")
773 self.assertEqual(response.status, 200)
774 self.assertFalse('content-encoding' in response)
775 self.assertTrue('-content-encoding' in response)
776 self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
777 self.assertEqual(content, b"This is the final destination.\n")
778
Joe Gregoriod1137c52011-02-13 19:27:35 -0500779 def testPostAndGZipResponse(self):
780 uri = urllib.parse.urljoin(base, "gzip/post.cgi")
781 (response, content) = self.http.request(uri, "POST", body=" ")
782 self.assertEqual(response.status, 200)
783 self.assertFalse('content-encoding' in response)
784 self.assertTrue('-content-encoding' in response)
785
pilgrim00a352e2009-05-29 04:04:44 +0000786 def testGetGZipFailure(self):
787 # Test that we raise a good exception when the gzip fails
Joe Gregorioffc3d542013-02-19 15:57:37 -0500788 self.http.force_exception_to_status_code = False
pilgrim00a352e2009-05-29 04:04:44 +0000789 uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
790 try:
791 (response, content) = self.http.request(uri, "GET")
792 self.fail("Should never reach here")
793 except httplib2.FailedToDecompressContent:
794 pass
795 except Exception:
796 self.fail("Threw wrong kind of exception")
797
798 # Re-run the test with out the exceptions
Joe Gregorioffc3d542013-02-19 15:57:37 -0500799 self.http.force_exception_to_status_code = True
pilgrim00a352e2009-05-29 04:04:44 +0000800
801 (response, content) = self.http.request(uri, "GET")
802 self.assertEqual(response.status, 500)
803 self.assertTrue(response.reason.startswith("Content purported"))
804
pilgrim00a352e2009-05-29 04:04:44 +0000805 def testIndividualTimeout(self):
806 uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
807 http = httplib2.Http(timeout=1)
Joe Gregorioffc3d542013-02-19 15:57:37 -0500808 http.force_exception_to_status_code = True
pilgrim00a352e2009-05-29 04:04:44 +0000809
810 (response, content) = http.request(uri)
811 self.assertEqual(response.status, 408)
812 self.assertTrue(response.reason.startswith("Request Timeout"))
813 self.assertTrue(content.startswith(b"Request Timeout"))
814
815
816 def testGetDeflate(self):
817 # Test that we support deflate compression
818 uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
819 (response, content) = self.http.request(uri, "GET")
820 self.assertEqual(response.status, 200)
821 self.assertFalse('content-encoding' in response)
822 self.assertEqual(int(response['content-length']), len("This is the final destination."))
823 self.assertEqual(content, b"This is the final destination.")
824
825 def testGetDeflateFailure(self):
826 # Test that we raise a good exception when the deflate fails
Joe Gregorioffc3d542013-02-19 15:57:37 -0500827 self.http.force_exception_to_status_code = False
pilgrim00a352e2009-05-29 04:04:44 +0000828
829 uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
830 try:
831 (response, content) = self.http.request(uri, "GET")
832 self.fail("Should never reach here")
833 except httplib2.FailedToDecompressContent:
834 pass
835 except Exception:
836 self.fail("Threw wrong kind of exception")
837
838 # Re-run the test with out the exceptions
Joe Gregorioffc3d542013-02-19 15:57:37 -0500839 self.http.force_exception_to_status_code = True
pilgrim00a352e2009-05-29 04:04:44 +0000840
841 (response, content) = self.http.request(uri, "GET")
842 self.assertEqual(response.status, 500)
843 self.assertTrue(response.reason.startswith("Content purported"))
844
845 def testGetDuplicateHeaders(self):
846 # Test that duplicate headers get concatenated via ','
847 uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
848 (response, content) = self.http.request(uri, "GET")
849 self.assertEqual(response.status, 200)
850 self.assertEqual(content, b"This is content\n")
851 self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
852
853 def testGetCacheControlNoCache(self):
854 # Test Cache-Control: no-cache on requests
855 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorio0197ec82014-03-06 14:56:29 -0500856 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
pilgrim00a352e2009-05-29 04:04:44 +0000857 self.assertNotEqual(response['etag'], "")
Joe Gregorio0197ec82014-03-06 14:56:29 -0500858 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
pilgrim00a352e2009-05-29 04:04:44 +0000859 self.assertEqual(response.status, 200)
860 self.assertEqual(response.fromcache, True)
861
Joe Gregorio0197ec82014-03-06 14:56:29 -0500862 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'Cache-Control': 'no-cache'})
pilgrim00a352e2009-05-29 04:04:44 +0000863 self.assertEqual(response.status, 200)
864 self.assertEqual(response.fromcache, False)
865
866 def testGetCacheControlPragmaNoCache(self):
867 # Test Pragma: no-cache on requests
868 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
Joe Gregorio0197ec82014-03-06 14:56:29 -0500869 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
pilgrim00a352e2009-05-29 04:04:44 +0000870 self.assertNotEqual(response['etag'], "")
Joe Gregorio0197ec82014-03-06 14:56:29 -0500871 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
pilgrim00a352e2009-05-29 04:04:44 +0000872 self.assertEqual(response.status, 200)
873 self.assertEqual(response.fromcache, True)
874
Joe Gregorio0197ec82014-03-06 14:56:29 -0500875 (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'Pragma': 'no-cache'})
pilgrim00a352e2009-05-29 04:04:44 +0000876 self.assertEqual(response.status, 200)
877 self.assertEqual(response.fromcache, False)
878
879 def testGetCacheControlNoStoreRequest(self):
880 # A no-store request means that the response should not be stored.
881 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
882
883 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
884 self.assertEqual(response.status, 200)
885 self.assertEqual(response.fromcache, False)
886
887 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
888 self.assertEqual(response.status, 200)
889 self.assertEqual(response.fromcache, False)
890
891 def testGetCacheControlNoStoreResponse(self):
892 # A no-store response means that the response should not be stored.
893 uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
894
895 (response, content) = self.http.request(uri, "GET")
896 self.assertEqual(response.status, 200)
897 self.assertEqual(response.fromcache, False)
898
899 (response, content) = self.http.request(uri, "GET")
900 self.assertEqual(response.status, 200)
901 self.assertEqual(response.fromcache, False)
902
903 def testGetCacheControlNoCacheNoStoreRequest(self):
904 # Test that a no-store, no-cache clears the entry from the cache
905 # even if it was cached previously.
906 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
907
908 (response, content) = self.http.request(uri, "GET")
909 (response, content) = self.http.request(uri, "GET")
910 self.assertEqual(response.fromcache, True)
911 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
912 (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
913 self.assertEqual(response.status, 200)
914 self.assertEqual(response.fromcache, False)
915
916 def testUpdateInvalidatesCache(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500917 # Test that calling PUT or DELETE on a
pilgrim00a352e2009-05-29 04:04:44 +0000918 # URI that is cache invalidates that cache.
919 uri = urllib.parse.urljoin(base, "304/test_etag.txt")
920
921 (response, content) = self.http.request(uri, "GET")
922 (response, content) = self.http.request(uri, "GET")
923 self.assertEqual(response.fromcache, True)
924 (response, content) = self.http.request(uri, "DELETE")
925 self.assertEqual(response.status, 405)
926
927 (response, content) = self.http.request(uri, "GET")
928 self.assertEqual(response.fromcache, False)
929
930 def testUpdateUsesCachedETag(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500931 # Test that we natively support http://www.w3.org/1999/04/Editing/
pilgrim00a352e2009-05-29 04:04:44 +0000932 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
933
934 (response, content) = self.http.request(uri, "GET")
935 self.assertEqual(response.status, 200)
936 self.assertEqual(response.fromcache, False)
937 (response, content) = self.http.request(uri, "GET")
938 self.assertEqual(response.status, 200)
939 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400940 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000941 self.assertEqual(response.status, 200)
Joe Gregorio799b2072009-09-29 17:21:19 -0400942 (response, content) = self.http.request(uri, "PUT", body="foo")
pilgrim00a352e2009-05-29 04:04:44 +0000943 self.assertEqual(response.status, 412)
944
Joe Gregoriobd682082011-05-24 14:06:09 -0400945
946 def testUpdatePatchUsesCachedETag(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500947 # Test that we natively support http://www.w3.org/1999/04/Editing/
Joe Gregoriobd682082011-05-24 14:06:09 -0400948 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
949
950 (response, content) = self.http.request(uri, "GET")
951 self.assertEqual(response.status, 200)
952 self.assertEqual(response.fromcache, False)
953 (response, content) = self.http.request(uri, "GET")
954 self.assertEqual(response.status, 200)
955 self.assertEqual(response.fromcache, True)
956 (response, content) = self.http.request(uri, "PATCH", body="foo")
957 self.assertEqual(response.status, 200)
958 (response, content) = self.http.request(uri, "PATCH", body="foo")
959 self.assertEqual(response.status, 412)
960
pilgrim00a352e2009-05-29 04:04:44 +0000961 def testUpdateUsesCachedETagAndOCMethod(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500962 # Test that we natively support http://www.w3.org/1999/04/Editing/
pilgrim00a352e2009-05-29 04:04:44 +0000963 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
964
965 (response, content) = self.http.request(uri, "GET")
966 self.assertEqual(response.status, 200)
967 self.assertEqual(response.fromcache, False)
968 (response, content) = self.http.request(uri, "GET")
969 self.assertEqual(response.status, 200)
970 self.assertEqual(response.fromcache, True)
971 self.http.optimistic_concurrency_methods.append("DELETE")
972 (response, content) = self.http.request(uri, "DELETE")
973 self.assertEqual(response.status, 200)
974
975
976 def testUpdateUsesCachedETagOverridden(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -0500977 # Test that we natively support http://www.w3.org/1999/04/Editing/
pilgrim00a352e2009-05-29 04:04:44 +0000978 uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
979
980 (response, content) = self.http.request(uri, "GET")
981 self.assertEqual(response.status, 200)
982 self.assertEqual(response.fromcache, False)
983 (response, content) = self.http.request(uri, "GET")
984 self.assertEqual(response.status, 200)
985 self.assertEqual(response.fromcache, True)
Joe Gregorio799b2072009-09-29 17:21:19 -0400986 (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
pilgrim00a352e2009-05-29 04:04:44 +0000987 self.assertEqual(response.status, 412)
988
989 def testBasicAuth(self):
990 # Test Basic Authentication
991 uri = urllib.parse.urljoin(base, "basic/file.txt")
992 (response, content) = self.http.request(uri, "GET")
993 self.assertEqual(response.status, 401)
994
995 uri = urllib.parse.urljoin(base, "basic/")
996 (response, content) = self.http.request(uri, "GET")
997 self.assertEqual(response.status, 401)
998
999 self.http.add_credentials('joe', 'password')
1000 (response, content) = self.http.request(uri, "GET")
1001 self.assertEqual(response.status, 200)
1002
1003 uri = urllib.parse.urljoin(base, "basic/file.txt")
1004 (response, content) = self.http.request(uri, "GET")
1005 self.assertEqual(response.status, 200)
1006
1007 def testBasicAuthWithDomain(self):
1008 # Test Basic Authentication
1009 uri = urllib.parse.urljoin(base, "basic/file.txt")
1010 (response, content) = self.http.request(uri, "GET")
1011 self.assertEqual(response.status, 401)
1012
1013 uri = urllib.parse.urljoin(base, "basic/")
1014 (response, content) = self.http.request(uri, "GET")
1015 self.assertEqual(response.status, 401)
1016
1017 self.http.add_credentials('joe', 'password', "example.org")
1018 (response, content) = self.http.request(uri, "GET")
1019 self.assertEqual(response.status, 401)
1020
1021 uri = urllib.parse.urljoin(base, "basic/file.txt")
1022 (response, content) = self.http.request(uri, "GET")
1023 self.assertEqual(response.status, 401)
1024
Joe Gregorioffc3d542013-02-19 15:57:37 -05001025 domain = urllib.parse.urlparse(base)[1]
pilgrim00a352e2009-05-29 04:04:44 +00001026 self.http.add_credentials('joe', 'password', domain)
1027 (response, content) = self.http.request(uri, "GET")
1028 self.assertEqual(response.status, 200)
1029
1030 uri = urllib.parse.urljoin(base, "basic/file.txt")
1031 (response, content) = self.http.request(uri, "GET")
1032 self.assertEqual(response.status, 200)
1033
1034
1035
1036
1037
1038
1039 def testBasicAuthTwoDifferentCredentials(self):
1040 # Test Basic Authentication with multiple sets of credentials
1041 uri = urllib.parse.urljoin(base, "basic2/file.txt")
1042 (response, content) = self.http.request(uri, "GET")
1043 self.assertEqual(response.status, 401)
1044
1045 uri = urllib.parse.urljoin(base, "basic2/")
1046 (response, content) = self.http.request(uri, "GET")
1047 self.assertEqual(response.status, 401)
1048
1049 self.http.add_credentials('fred', 'barney')
1050 (response, content) = self.http.request(uri, "GET")
1051 self.assertEqual(response.status, 200)
1052
1053 uri = urllib.parse.urljoin(base, "basic2/file.txt")
1054 (response, content) = self.http.request(uri, "GET")
1055 self.assertEqual(response.status, 200)
1056
1057 def testBasicAuthNested(self):
1058 # Test Basic Authentication with resources
1059 # that are nested
1060 uri = urllib.parse.urljoin(base, "basic-nested/")
1061 (response, content) = self.http.request(uri, "GET")
1062 self.assertEqual(response.status, 401)
1063
1064 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1065 (response, content) = self.http.request(uri, "GET")
1066 self.assertEqual(response.status, 401)
1067
1068 # Now add in credentials one at a time and test.
1069 self.http.add_credentials('joe', 'password')
1070
1071 uri = urllib.parse.urljoin(base, "basic-nested/")
1072 (response, content) = self.http.request(uri, "GET")
1073 self.assertEqual(response.status, 200)
1074
1075 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1076 (response, content) = self.http.request(uri, "GET")
1077 self.assertEqual(response.status, 401)
1078
1079 self.http.add_credentials('fred', 'barney')
1080
1081 uri = urllib.parse.urljoin(base, "basic-nested/")
1082 (response, content) = self.http.request(uri, "GET")
1083 self.assertEqual(response.status, 200)
1084
1085 uri = urllib.parse.urljoin(base, "basic-nested/subdir")
1086 (response, content) = self.http.request(uri, "GET")
1087 self.assertEqual(response.status, 200)
1088
1089 def testDigestAuth(self):
1090 # Test that we support Digest Authentication
1091 uri = urllib.parse.urljoin(base, "digest/")
1092 (response, content) = self.http.request(uri, "GET")
1093 self.assertEqual(response.status, 401)
1094
1095 self.http.add_credentials('joe', 'password')
1096 (response, content) = self.http.request(uri, "GET")
1097 self.assertEqual(response.status, 200)
1098
1099 uri = urllib.parse.urljoin(base, "digest/file.txt")
1100 (response, content) = self.http.request(uri, "GET")
1101
1102 def testDigestAuthNextNonceAndNC(self):
1103 # Test that if the server sets nextnonce that we reset
1104 # the nonce count back to 1
1105 uri = urllib.parse.urljoin(base, "digest/file.txt")
1106 self.http.add_credentials('joe', 'password')
1107 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1108 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1109 self.assertEqual(response.status, 200)
1110 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1111 info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
1112 self.assertEqual(response.status, 200)
1113
1114 if 'nextnonce' in info:
1115 self.assertEqual(info2['nc'], 1)
1116
1117 def testDigestAuthStale(self):
1118 # Test that we can handle a nonce becoming stale
1119 uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
1120 self.http.add_credentials('joe', 'password')
1121 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1122 info = httplib2._parse_www_authenticate(response, 'authentication-info')
1123 self.assertEqual(response.status, 200)
1124
1125 time.sleep(3)
1126 # Sleep long enough that the nonce becomes stale
1127
1128 (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
1129 self.assertFalse(response.fromcache)
1130 self.assertTrue(response._stale_digest)
1131 info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
1132 self.assertEqual(response.status, 200)
1133
1134 def reflector(self, content):
1135 return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
1136
1137 def testReflector(self):
1138 uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
1139 (response, content) = self.http.request(uri, "GET")
1140 d = self.reflector(content)
Joe Gregorioffc3d542013-02-19 15:57:37 -05001141 self.assertTrue('HTTP_USER_AGENT' in d)
pilgrim00a352e2009-05-29 04:04:44 +00001142
Joe Gregorio84cc10a2009-09-01 13:02:49 -04001143
1144 def testConnectionClose(self):
1145 uri = "http://www.google.com/"
1146 (response, content) = self.http.request(uri, "GET")
1147 for c in self.http.connections.values():
1148 self.assertNotEqual(None, c.sock)
1149 (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
1150 for c in self.http.connections.values():
1151 self.assertEqual(None, c.sock)
1152
Joe Gregorio46546a62012-10-03 14:31:10 -04001153 def testPickleHttp(self):
1154 pickled_http = pickle.dumps(self.http)
1155 new_http = pickle.loads(pickled_http)
1156
1157 self.assertEqual(sorted(new_http.__dict__.keys()),
1158 sorted(self.http.__dict__.keys()))
1159 for key in new_http.__dict__:
1160 if key in ('certificates', 'credentials'):
1161 self.assertEqual(new_http.__dict__[key].credentials,
1162 self.http.__dict__[key].credentials)
1163 elif key == 'cache':
1164 self.assertEqual(new_http.__dict__[key].cache,
1165 self.http.__dict__[key].cache)
1166 else:
1167 self.assertEqual(new_http.__dict__[key],
1168 self.http.__dict__[key])
1169
1170 def testPickleHttpWithConnection(self):
1171 self.http.request('http://bitworking.org',
1172 connection_type=_MyHTTPConnection)
1173 pickled_http = pickle.dumps(self.http)
1174 new_http = pickle.loads(pickled_http)
1175
1176 self.assertEqual(list(self.http.connections.keys()),
1177 ['http:bitworking.org'])
1178 self.assertEqual(new_http.connections, {})
1179
1180 def testPickleCustomRequestHttp(self):
1181 def dummy_request(*args, **kwargs):
1182 return new_request(*args, **kwargs)
1183 dummy_request.dummy_attr = 'dummy_value'
1184
1185 self.http.request = dummy_request
1186 pickled_http = pickle.dumps(self.http)
1187 self.assertFalse(b"S'request'" in pickled_http)
1188
pilgrim00a352e2009-05-29 04:04:44 +00001189try:
1190 import memcache
1191 class HttpTestMemCached(HttpTest):
1192 def setUp(self):
1193 self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
1194 #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
1195 self.http = httplib2.Http(self.cache)
1196 self.cache.flush_all()
1197 # Not exactly sure why the sleep is needed here, but
1198 # if not present then some unit tests that rely on caching
1199 # fail. Memcached seems to lose some sets immediately
1200 # after a flush_all if the set is to a value that
1201 # was previously cached. (Maybe the flush is handled async?)
1202 time.sleep(1)
1203 self.http.clear_credentials()
1204except:
1205 pass
1206
1207
1208
1209# ------------------------------------------------------------------------
1210
1211class HttpPrivateTest(unittest.TestCase):
1212
1213 def testParseCacheControl(self):
1214 # Test that we can parse the Cache-Control header
1215 self.assertEqual({}, httplib2._parse_cache_control({}))
1216 self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
1217 cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
1218 self.assertEqual(cc['no-cache'], 1)
1219 self.assertEqual(cc['max-age'], '7200')
1220 cc = httplib2._parse_cache_control({'cache-control': ' , '})
1221 self.assertEqual(cc[''], 1)
1222
Joe Gregorioe314e8b2009-07-16 20:11:28 -04001223 try:
1224 cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
1225 self.assertTrue("max-age" in cc)
1226 except:
1227 self.fail("Should not throw exception")
1228
1229
1230
1231
pilgrim00a352e2009-05-29 04:04:44 +00001232 def testNormalizeHeaders(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -05001233 # Test that we normalize headers to lowercase
pilgrim00a352e2009-05-29 04:04:44 +00001234 h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
1235 self.assertTrue('cache-control' in h)
1236 self.assertTrue('other' in h)
1237 self.assertEqual('Stuff', h['other'])
Cristobal1cf37bd2015-03-02 21:00:03 -03001238
1239 def testConvertByteStr(self):
1240 with self.assertRaises(TypeError):
1241 httplib2._convert_byte_str(4)
1242 self.assertEqual('Hello World', httplib2._convert_byte_str(b'Hello World'))
1243 self.assertEqual('Bye World', httplib2._convert_byte_str('Bye World'))
pilgrim00a352e2009-05-29 04:04:44 +00001244
1245 def testExpirationModelTransparent(self):
1246 # Test that no-cache makes our request TRANSPARENT
1247 response_headers = {
1248 'cache-control': 'max-age=7200'
1249 }
1250 request_headers = {
1251 'cache-control': 'no-cache'
1252 }
1253 self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
1254
1255 def testMaxAgeNonNumeric(self):
1256 # Test that no-cache makes our request TRANSPARENT
1257 response_headers = {
1258 'cache-control': 'max-age=fred, min-fresh=barney'
1259 }
1260 request_headers = {
1261 }
1262 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1263
1264
1265 def testExpirationModelNoCacheResponse(self):
1266 # The date and expires point to an entry that should be
1267 # FRESH, but the no-cache over-rides that.
1268 now = time.time()
1269 response_headers = {
1270 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1271 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1272 'cache-control': 'no-cache'
1273 }
1274 request_headers = {
1275 }
1276 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1277
1278 def testExpirationModelStaleRequestMustReval(self):
1279 # must-revalidate forces STALE
1280 self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
1281
1282 def testExpirationModelStaleResponseMustReval(self):
1283 # must-revalidate forces STALE
1284 self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
1285
1286 def testExpirationModelFresh(self):
1287 response_headers = {
1288 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1289 'cache-control': 'max-age=2'
1290 }
1291 request_headers = {
1292 }
1293 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1294 time.sleep(3)
1295 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1296
1297 def testExpirationMaxAge0(self):
1298 response_headers = {
1299 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
1300 'cache-control': 'max-age=0'
1301 }
1302 request_headers = {
1303 }
1304 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1305
1306 def testExpirationModelDateAndExpires(self):
1307 now = time.time()
1308 response_headers = {
1309 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1310 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1311 }
1312 request_headers = {
1313 }
1314 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1315 time.sleep(3)
1316 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1317
1318 def testExpiresZero(self):
1319 now = time.time()
1320 response_headers = {
1321 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1322 'expires': "0",
1323 }
1324 request_headers = {
1325 }
1326 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1327
1328 def testExpirationModelDateOnly(self):
1329 now = time.time()
1330 response_headers = {
1331 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
1332 }
1333 request_headers = {
1334 }
1335 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1336
1337 def testExpirationModelOnlyIfCached(self):
1338 response_headers = {
1339 }
1340 request_headers = {
1341 'cache-control': 'only-if-cached',
1342 }
1343 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1344
1345 def testExpirationModelMaxAgeBoth(self):
1346 now = time.time()
1347 response_headers = {
1348 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1349 'cache-control': 'max-age=2'
1350 }
1351 request_headers = {
1352 'cache-control': 'max-age=0'
1353 }
1354 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1355
1356 def testExpirationModelDateAndExpiresMinFresh1(self):
1357 now = time.time()
1358 response_headers = {
1359 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1360 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
1361 }
1362 request_headers = {
1363 'cache-control': 'min-fresh=2'
1364 }
1365 self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
1366
1367 def testExpirationModelDateAndExpiresMinFresh2(self):
1368 now = time.time()
1369 response_headers = {
1370 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
1371 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
1372 }
1373 request_headers = {
1374 'cache-control': 'min-fresh=2'
1375 }
1376 self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
1377
1378 def testParseWWWAuthenticateEmpty(self):
1379 res = httplib2._parse_www_authenticate({})
Joe Gregorioffc3d542013-02-19 15:57:37 -05001380 self.assertEqual(len(list(res.keys())), 0)
pilgrim00a352e2009-05-29 04:04:44 +00001381
1382 def testParseWWWAuthenticate(self):
1383 # different uses of spaces around commas
1384 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
1385 self.assertEqual(len(list(res.keys())), 1)
1386 self.assertEqual(len(list(res['test'].keys())), 5)
Joe Gregorioffc3d542013-02-19 15:57:37 -05001387
pilgrim00a352e2009-05-29 04:04:44 +00001388 # tokens with non-alphanum
1389 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
1390 self.assertEqual(len(list(res.keys())), 1)
1391 self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
Joe Gregorioffc3d542013-02-19 15:57:37 -05001392
pilgrim00a352e2009-05-29 04:04:44 +00001393 # quoted string with quoted pairs
1394 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
1395 self.assertEqual(len(list(res.keys())), 1)
1396 self.assertEqual(res['test']['realm'], 'a "test" realm')
1397
1398 def testParseWWWAuthenticateStrict(self):
1399 httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
1400 self.testParseWWWAuthenticate();
1401 httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
1402
1403 def testParseWWWAuthenticateBasic(self):
1404 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
1405 basic = res['basic']
1406 self.assertEqual('me', basic['realm'])
1407
1408 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
1409 basic = res['basic']
1410 self.assertEqual('me', basic['realm'])
1411 self.assertEqual('MD5', basic['algorithm'])
1412
1413 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
1414 basic = res['basic']
1415 self.assertEqual('me', basic['realm'])
1416 self.assertEqual('MD5', basic['algorithm'])
1417
1418 def testParseWWWAuthenticateBasic2(self):
1419 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
1420 basic = res['basic']
1421 self.assertEqual('me', basic['realm'])
1422 self.assertEqual('fred', basic['other'])
1423
1424 def testParseWWWAuthenticateBasic3(self):
1425 res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
1426 basic = res['basic']
1427 self.assertEqual('me', basic['realm'])
1428
1429
1430 def testParseWWWAuthenticateDigest(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -05001431 res = httplib2._parse_www_authenticate({ 'www-authenticate':
pilgrim00a352e2009-05-29 04:04:44 +00001432 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
1433 digest = res['digest']
1434 self.assertEqual('testrealm@host.com', digest['realm'])
1435 self.assertEqual('auth,auth-int', digest['qop'])
1436
1437
1438 def testParseWWWAuthenticateMultiple(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -05001439 res = httplib2._parse_www_authenticate({ 'www-authenticate':
pilgrim00a352e2009-05-29 04:04:44 +00001440 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
1441 digest = res['digest']
1442 self.assertEqual('testrealm@host.com', digest['realm'])
1443 self.assertEqual('auth,auth-int', digest['qop'])
1444 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1445 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1446 basic = res['basic']
1447 self.assertEqual('me', basic['realm'])
1448
1449 def testParseWWWAuthenticateMultiple2(self):
1450 # Handle an added comma between challenges, which might get thrown in if the challenges were
1451 # originally sent in separate www-authenticate headers.
Joe Gregorioffc3d542013-02-19 15:57:37 -05001452 res = httplib2._parse_www_authenticate({ 'www-authenticate':
pilgrim00a352e2009-05-29 04:04:44 +00001453 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
1454 digest = res['digest']
1455 self.assertEqual('testrealm@host.com', digest['realm'])
1456 self.assertEqual('auth,auth-int', digest['qop'])
1457 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1458 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1459 basic = res['basic']
1460 self.assertEqual('me', basic['realm'])
1461
1462 def testParseWWWAuthenticateMultiple3(self):
1463 # Handle an added comma between challenges, which might get thrown in if the challenges were
1464 # originally sent in separate www-authenticate headers.
Joe Gregorioffc3d542013-02-19 15:57:37 -05001465 res = httplib2._parse_www_authenticate({ 'www-authenticate':
pilgrim00a352e2009-05-29 04:04:44 +00001466 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
1467 digest = res['digest']
1468 self.assertEqual('testrealm@host.com', digest['realm'])
1469 self.assertEqual('auth,auth-int', digest['qop'])
1470 self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
1471 self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
1472 basic = res['basic']
1473 self.assertEqual('me', basic['realm'])
1474 wsse = res['wsse']
1475 self.assertEqual('foo', wsse['realm'])
1476 self.assertEqual('UsernameToken', wsse['profile'])
1477
1478 def testParseWWWAuthenticateMultiple4(self):
Joe Gregorioffc3d542013-02-19 15:57:37 -05001479 res = httplib2._parse_www_authenticate({ 'www-authenticate':
1480 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
pilgrim00a352e2009-05-29 04:04:44 +00001481 digest = res['digest']
1482 self.assertEqual('test-real.m@host.com', digest['realm'])
1483 self.assertEqual('\tauth,auth-int', digest['qop'])
1484 self.assertEqual('(*)&^&$%#', digest['nonce'])
1485
1486 def testParseWWWAuthenticateMoreQuoteCombos(self):
1487 res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
1488 digest = res['digest']
1489 self.assertEqual('myrealm', digest['realm'])
1490
Joe Gregorio6fa3cf22011-02-13 22:45:06 -05001491 def testParseWWWAuthenticateMalformed(self):
1492 try:
1493 res = httplib2._parse_www_authenticate({'www-authenticate':'OAuth "Facebook Platform" "invalid_token" "Invalid OAuth access token."'})
1494 self.fail("should raise an exception")
1495 except httplib2.MalformedHeader:
1496 pass
1497
pilgrim00a352e2009-05-29 04:04:44 +00001498 def testDigestObject(self):
1499 credentials = ('joe', 'password')
1500 host = None
Joe Gregorioffc3d542013-02-19 15:57:37 -05001501 request_uri = '/projects/httplib2/test/digest/'
pilgrim00a352e2009-05-29 04:04:44 +00001502 headers = {}
1503 response = {
1504 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
1505 }
1506 content = b""
Joe Gregorio875a8b52011-06-13 14:06:23 -04001507
pilgrim00a352e2009-05-29 04:04:44 +00001508 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
Joe Gregorioffc3d542013-02-19 15:57:37 -05001509 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
Joe Gregorio875a8b52011-06-13 14:06:23 -04001510 our_request = "authorization: %s" % headers['authorization']
1511 working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
pilgrim00a352e2009-05-29 04:04:44 +00001512 self.assertEqual(our_request, working_request)
1513
Joe Gregorio03d99102011-06-22 16:55:52 -04001514 def testDigestObjectWithOpaque(self):
1515 credentials = ('joe', 'password')
1516 host = None
1517 request_uri = '/projects/httplib2/test/digest/'
1518 headers = {}
1519 response = {
1520 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", opaque="atestopaque"'
1521 }
1522 content = ""
1523
1524 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1525 d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46")
1526 our_request = "authorization: %s" % headers['authorization']
1527 working_request = 'authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46", opaque="atestopaque"'
1528 self.assertEqual(our_request, working_request)
pilgrim00a352e2009-05-29 04:04:44 +00001529
1530 def testDigestObjectStale(self):
1531 credentials = ('joe', 'password')
1532 host = None
Joe Gregorioffc3d542013-02-19 15:57:37 -05001533 request_uri = '/projects/httplib2/test/digest/'
pilgrim00a352e2009-05-29 04:04:44 +00001534 headers = {}
1535 response = httplib2.Response({ })
1536 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1537 response.status = 401
1538 content = b""
1539 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1540 # Returns true to force a retry
1541 self.assertTrue( d.response(response, content) )
1542
1543 def testDigestObjectAuthInfo(self):
1544 credentials = ('joe', 'password')
1545 host = None
Joe Gregorioffc3d542013-02-19 15:57:37 -05001546 request_uri = '/projects/httplib2/test/digest/'
pilgrim00a352e2009-05-29 04:04:44 +00001547 headers = {}
1548 response = httplib2.Response({ })
1549 response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
1550 response['authentication-info'] = 'nextnonce="fred"'
1551 content = b""
1552 d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
1553 # Returns true to force a retry
1554 self.assertFalse( d.response(response, content) )
1555 self.assertEqual('fred', d.challenge['nonce'])
1556 self.assertEqual(1, d.challenge['nc'])
1557
1558 def testWsseAlgorithm(self):
1559 digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
1560 expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
1561 self.assertEqual(expected, digest)
1562
1563 def testEnd2End(self):
1564 # one end to end header
1565 response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
1566 end2end = httplib2._get_end2end_headers(response)
1567 self.assertTrue('content-type' in end2end)
1568 self.assertTrue('te' not in end2end)
1569 self.assertTrue('connection' not in end2end)
1570
1571 # one end to end header that gets eliminated
1572 response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
1573 end2end = httplib2._get_end2end_headers(response)
1574 self.assertTrue('content-type' not in end2end)
1575 self.assertTrue('te' not in end2end)
1576 self.assertTrue('connection' not in end2end)
1577
1578 # Degenerate case of no headers
1579 response = {}
1580 end2end = httplib2._get_end2end_headers(response)
Joe Gregoriob53de9b2011-06-07 15:44:51 -04001581 self.assertEqual(0, len(end2end))
pilgrim00a352e2009-05-29 04:04:44 +00001582
Joe Gregorioffc3d542013-02-19 15:57:37 -05001583 # Degenerate case of connection referrring to a header not passed in
pilgrim00a352e2009-05-29 04:04:44 +00001584 response = {'connection': 'content-type'}
1585 end2end = httplib2._get_end2end_headers(response)
Joe Gregoriob53de9b2011-06-07 15:44:51 -04001586 self.assertEqual(0, len(end2end))
pilgrim00a352e2009-05-29 04:04:44 +00001587
Joe Gregorio74b1d4a2012-10-25 14:05:49 -04001588
1589class TestProxyInfo(unittest.TestCase):
1590 def setUp(self):
1591 self.orig_env = dict(os.environ)
1592
1593 def tearDown(self):
1594 os.environ.clear()
1595 os.environ.update(self.orig_env)
1596
1597 def test_from_url(self):
1598 pi = httplib2.proxy_info_from_url('http://myproxy.example.com')
1599 self.assertEqual(pi.proxy_host, 'myproxy.example.com')
1600 self.assertEqual(pi.proxy_port, 80)
1601 self.assertEqual(pi.proxy_user, None)
1602
1603 def test_from_url_ident(self):
1604 pi = httplib2.proxy_info_from_url('http://zoidberg:fish@someproxy:99')
1605 self.assertEqual(pi.proxy_host, 'someproxy')
1606 self.assertEqual(pi.proxy_port, 99)
1607 self.assertEqual(pi.proxy_user, 'zoidberg')
1608 self.assertEqual(pi.proxy_pass, 'fish')
1609
1610 def test_from_env(self):
1611 os.environ['http_proxy'] = 'http://myproxy.example.com:8080'
1612 pi = httplib2.proxy_info_from_environment()
1613 self.assertEqual(pi.proxy_host, 'myproxy.example.com')
1614 self.assertEqual(pi.proxy_port, 8080)
1615
1616 def test_from_env_no_proxy(self):
1617 os.environ['http_proxy'] = 'http://myproxy.example.com:80'
1618 os.environ['https_proxy'] = 'http://myproxy.example.com:81'
1619 pi = httplib2.proxy_info_from_environment('https')
1620 self.assertEqual(pi.proxy_host, 'myproxy.example.com')
1621 self.assertEqual(pi.proxy_port, 81)
1622
1623 def test_from_env_none(self):
1624 os.environ.clear()
1625 pi = httplib2.proxy_info_from_environment()
1626 self.assertEqual(pi, None)
1627
1628
1629if __name__ == '__main__':
1630 unittest.main()