| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 1 | #!/usr/bin/env python3
 | 
 | 2 | """
 | 
 | 3 | httplib2test
 | 
 | 4 | 
 | 
 | 5 | A set of unit tests for httplib2.py.
 | 
 | 6 | 
 | 
 | 7 | Requires Python 3.0 or later
 | 
 | 8 | """
 | 
 | 9 | 
 | 
 | 10 | __author__ = "Joe Gregorio (joe@bitworking.org)"
 | 
 | 11 | __copyright__ = "Copyright 2006, Joe Gregorio"
 | 
 | 12 | __contributors__ = ["Mark Pilgrim"]
 | 
 | 13 | __license__ = "MIT"
 | 
 | 14 | __history__ = """ """
 | 
 | 15 | __version__ = "0.2 ($Rev: 118 $)"
 | 
 | 16 | 
 | 
| Joe Gregorio | b6c90c4 | 2011-02-11 01:03:22 -0500 | [diff] [blame] | 17 | import base64
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 18 | import http.client
 | 
 | 19 | import httplib2
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 20 | import io
 | 
| Joe Gregorio | b6c90c4 | 2011-02-11 01:03:22 -0500 | [diff] [blame] | 21 | import os
 | 
 | 22 | import socket
 | 
 | 23 | import sys
 | 
 | 24 | import time
 | 
 | 25 | import unittest
 | 
 | 26 | import urllib.parse
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 27 | 
 | 
 | 28 | # The test resources base uri
 | 
 | 29 | base = 'http://bitworking.org/projects/httplib2/test/'
 | 
 | 30 | #base = 'http://localhost/projects/httplib2/test/'
 | 
 | 31 | cacheDirName = ".cache"
 | 
 | 32 | 
 | 
 | 33 | 
 | 
 | 34 | class CredentialsTest(unittest.TestCase):
 | 
 | 35 |     def test(self):
 | 
 | 36 |         c = httplib2.Credentials()
 | 
 | 37 |         c.add("joe", "password")
 | 
 | 38 |         self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
 | 
 | 39 |         self.assertEqual(("joe", "password"), list(c.iter(""))[0])
 | 
 | 40 |         c.add("fred", "password2", "wellformedweb.org")
 | 
 | 41 |         self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0])
 | 
 | 42 |         self.assertEqual(1, len(list(c.iter("bitworking.org"))))
 | 
 | 43 |         self.assertEqual(2, len(list(c.iter("wellformedweb.org"))))
 | 
 | 44 |         self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
 | 
 | 45 |         c.clear()
 | 
 | 46 |         self.assertEqual(0, len(list(c.iter("bitworking.org"))))
 | 
 | 47 |         c.add("fred", "password2", "wellformedweb.org")
 | 
 | 48 |         self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org")))
 | 
 | 49 |         self.assertEqual(0, len(list(c.iter("bitworking.org"))))
 | 
 | 50 |         self.assertEqual(0, len(list(c.iter(""))))
 | 
 | 51 | 
 | 
 | 52 | 
 | 
 | 53 | class ParserTest(unittest.TestCase):
 | 
 | 54 |     def testFromStd66(self):
 | 
 | 55 |         self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com"))
 | 
 | 56 |         self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com"))
 | 
 | 57 |         self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080"))
 | 
 | 58 |         self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/"))
 | 
 | 59 |         self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path"))
 | 
 | 60 |         self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2"))
 | 
 | 61 |         self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
 | 
 | 62 |         self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred"))
 | 
 | 63 | 
 | 
 | 64 | 
 | 
 | 65 | class UrlNormTest(unittest.TestCase):
 | 
 | 66 |     def test(self):
 | 
 | 67 |         self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1])
 | 
 | 68 |         self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1])
 | 
 | 69 |         self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1])
 | 
 | 70 |         self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1])
 | 
 | 71 |         self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1])
 | 
 | 72 |         self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80"))
 | 
 | 73 |         try:
 | 
 | 74 |             httplib2.urlnorm("/")
 | 
 | 75 |             self.fail("Non-absolute URIs should raise an exception")
 | 
 | 76 |         except httplib2.RelativeURIError:
 | 
 | 77 |             pass
 | 
 | 78 | 
 | 
 | 79 | class UrlSafenameTest(unittest.TestCase):
 | 
 | 80 |     def test(self):
 | 
 | 81 |         # Test that different URIs end up generating different safe names
 | 
 | 82 |         self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b"))
 | 
 | 83 |         self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b"))
 | 
 | 84 |         self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b"))
 | 
 | 85 |         self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1]))
 | 
 | 86 |         self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b"))
 | 
 | 87 |         self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www"))
 | 
 | 88 |         # Test the max length limits
 | 
 | 89 |         uri = "http://" + ("w" * 200) + ".org"
 | 
 | 90 |         uri2 = "http://" + ("w" * 201) + ".org"
 | 
 | 91 |         self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri))
 | 
 | 92 |         # Max length should be 200 + 1 (",") + 32
 | 
 | 93 |         self.assertEqual(233, len(httplib2.safename(uri2)))
 | 
 | 94 |         self.assertEqual(233, len(httplib2.safename(uri)))
 | 
 | 95 |         # Unicode
 | 
 | 96 |         if sys.version_info >= (2,3):
 | 
 | 97 |             self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b"))
 | 
 | 98 | 
 | 
 | 99 | class _MyResponse(io.BytesIO):
 | 
 | 100 |     def __init__(self, body, **kwargs):
 | 
 | 101 |         io.BytesIO.__init__(self, body)
 | 
 | 102 |         self.headers = kwargs
 | 
 | 103 | 
 | 
 | 104 |     def items(self):
 | 
 | 105 |         return self.headers.items()
 | 
 | 106 | 
 | 
 | 107 |     def iteritems(self):
 | 
 | 108 |         return iter(self.headers.items())
 | 
 | 109 | 
 | 
 | 110 | 
 | 
 | 111 | class _MyHTTPConnection(object):
 | 
 | 112 |     "This class is just a mock of httplib.HTTPConnection used for testing"
 | 
 | 113 | 
 | 
 | 114 |     def __init__(self, host, port=None, key_file=None, cert_file=None,
 | 
 | 115 |                  strict=None, timeout=None, proxy_info=None):
 | 
 | 116 |         self.host = host
 | 
 | 117 |         self.port = port
 | 
 | 118 |         self.timeout = timeout
 | 
 | 119 |         self.log = ""
 | 
 | 120 | 
 | 
 | 121 |     def set_debuglevel(self, level):
 | 
 | 122 |         pass
 | 
 | 123 | 
 | 
 | 124 |     def connect(self):
 | 
 | 125 |         "Connect to a host on a given port."
 | 
 | 126 |         pass
 | 
 | 127 | 
 | 
 | 128 |     def close(self):
 | 
 | 129 |         pass
 | 
 | 130 | 
 | 
 | 131 |     def request(self, method, request_uri, body, headers):
 | 
 | 132 |         pass
 | 
 | 133 | 
 | 
 | 134 |     def getresponse(self):
 | 
 | 135 |         return _MyResponse(b"the body", status="200")
 | 
 | 136 | 
 | 
 | 137 | 
 | 
 | 138 | class HttpTest(unittest.TestCase):
 | 
 | 139 |     def setUp(self):
 | 
 | 140 |         if os.path.exists(cacheDirName): 
 | 
 | 141 |             [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)]
 | 
 | 142 |         self.http = httplib2.Http(cacheDirName)
 | 
 | 143 |         self.http.clear_credentials()
 | 
 | 144 | 
 | 
| Joe Gregorio | f3ee17b | 2011-02-13 11:59:51 -0500 | [diff] [blame] | 145 |     def testIPv6NoSSL(self):
 | 
 | 146 |         try:
 | 
 | 147 |           self.http.request("http://[::1]/")
 | 
 | 148 |         except socket.gaierror:
 | 
 | 149 |           self.fail("should get the address family right for IPv6")
 | 
 | 150 |         except socket.error:
 | 
 | 151 |           # Even if IPv6 isn't installed on a machine it should just raise socket.error
 | 
 | 152 |           pass
 | 
 | 153 | 
 | 
 | 154 |     def testIPv6SSL(self):
 | 
 | 155 |         try:
 | 
 | 156 |           self.http.request("https://[::1]/")
 | 
 | 157 |         except socket.gaierror:
 | 
 | 158 |           self.fail("should get the address family right for IPv6")
 | 
 | 159 |         except socket.error:
 | 
 | 160 |           # Even if IPv6 isn't installed on a machine it should just raise socket.error
 | 
 | 161 |           pass
 | 
 | 162 | 
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 163 |     def testConnectionType(self):
 | 
 | 164 |         self.http.force_exception_to_status_code = False 
 | 
 | 165 |         response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection)
 | 
 | 166 |         self.assertEqual(response['content-location'], "http://bitworking.org")
 | 
 | 167 |         self.assertEqual(content, b"the body")
 | 
 | 168 | 
 | 
 | 169 |     def testGetUnknownServer(self):
 | 
 | 170 |         self.http.force_exception_to_status_code = False 
 | 
 | 171 |         try:
 | 
 | 172 |             self.http.request("http://fred.bitworking.org/")
 | 
 | 173 |             self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.")
 | 
 | 174 |         except httplib2.ServerNotFoundError:
 | 
 | 175 |             pass
 | 
 | 176 | 
 | 
 | 177 |         # Now test with exceptions turned off
 | 
 | 178 |         self.http.force_exception_to_status_code = True
 | 
 | 179 | 
 | 
 | 180 |         (response, content) = self.http.request("http://fred.bitworking.org/")
 | 
 | 181 |         self.assertEqual(response['content-type'], 'text/plain')
 | 
 | 182 |         self.assertTrue(content.startswith(b"Unable to find"))
 | 
 | 183 |         self.assertEqual(response.status, 400)
 | 
 | 184 | 
 | 
| Joe Gregorio | b6c90c4 | 2011-02-11 01:03:22 -0500 | [diff] [blame] | 185 |     def testGetConnectionRefused(self):
 | 
 | 186 |         self.http.force_exception_to_status_code = False
 | 
 | 187 |         try:
 | 
 | 188 |             self.http.request("http://localhost:7777/")
 | 
 | 189 |             self.fail("An socket.error exception must be thrown on Connection Refused.")
 | 
 | 190 |         except socket.error:
 | 
 | 191 |             pass
 | 
 | 192 | 
 | 
 | 193 |         # Now test with exceptions turned off
 | 
 | 194 |         self.http.force_exception_to_status_code = True
 | 
 | 195 | 
 | 
 | 196 |         (response, content) = self.http.request("http://localhost:7777/")
 | 
 | 197 |         self.assertEqual(response['content-type'], 'text/plain')
 | 
 | 198 |         self.assertTrue(b"Connection refused" in content)
 | 
 | 199 |         self.assertEqual(response.status, 400)
 | 
 | 200 | 
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 201 |     def testGetIRI(self):
 | 
 | 202 |         if sys.version_info >= (2,3):
 | 
 | 203 |             uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}")
 | 
 | 204 |             (response, content) = self.http.request(uri, "GET")
 | 
 | 205 |             d = self.reflector(content)
 | 
 | 206 |             self.assertTrue('QUERY_STRING' in d) 
 | 
 | 207 |             self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0) 
 | 
 | 208 |     
 | 
 | 209 |     def testGetIsDefaultMethod(self):
 | 
 | 210 |         # Test that GET is the default method
 | 
 | 211 |         uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
 | 
 | 212 |         (response, content) = self.http.request(uri)
 | 
 | 213 |         self.assertEqual(response['x-method'], "GET")
 | 
 | 214 | 
 | 
 | 215 |     def testDifferentMethods(self):
 | 
 | 216 |         # Test that all methods can be used
 | 
 | 217 |         uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi")
 | 
 | 218 |         for method in ["GET", "PUT", "DELETE", "POST"]:
 | 
 | 219 |             (response, content) = self.http.request(uri, method, body=b" ")
 | 
 | 220 |             self.assertEqual(response['x-method'], method)
 | 
 | 221 | 
 | 
| Joe Gregorio | b628c0b | 2009-07-16 12:28:04 -0400 | [diff] [blame] | 222 |     def testHeadRead(self):
 | 
 | 223 |         # Test that we don't try to read the response of a HEAD request
 | 
 | 224 |         # since httplib blocks response.read() for HEAD requests.
 | 
 | 225 |         # Oddly enough this doesn't appear as a problem when doing HEAD requests
 | 
 | 226 |         # against Apache servers.
 | 
 | 227 |         uri = "http://www.google.com/"
 | 
 | 228 |         (response, content) = self.http.request(uri, "HEAD")
 | 
 | 229 |         self.assertEqual(response.status, 200)
 | 
 | 230 |         self.assertEqual(content, b"")
 | 
 | 231 | 
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 232 |     def testGetNoCache(self):
 | 
 | 233 |         # Test that can do a GET w/o the cache turned on.
 | 
 | 234 |         http = httplib2.Http()
 | 
 | 235 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
 | 236 |         (response, content) = http.request(uri, "GET")
 | 
 | 237 |         self.assertEqual(response.status, 200)
 | 
 | 238 |         self.assertEqual(response.previous, None)
 | 
 | 239 | 
 | 
| Joe Gregorio | e202d21 | 2009-07-16 14:57:52 -0400 | [diff] [blame] | 240 |     def testGetOnlyIfCachedCacheHit(self):
 | 
 | 241 |         # Test that can do a GET with cache and 'only-if-cached'
 | 
 | 242 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
 | 243 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 244 |         (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
 | 
 | 245 |         self.assertEqual(response.fromcache, True)
 | 
 | 246 |         self.assertEqual(response.status, 200)
 | 
 | 247 | 
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 248 |     def testGetOnlyIfCachedCacheMiss(self):
 | 
 | 249 |         # Test that can do a GET with no cache with 'only-if-cached'
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 250 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
| Joe Gregorio | e202d21 | 2009-07-16 14:57:52 -0400 | [diff] [blame] | 251 |         (response, content) = self.http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 252 |         self.assertEqual(response.fromcache, False)
 | 
| Joe Gregorio | e202d21 | 2009-07-16 14:57:52 -0400 | [diff] [blame] | 253 |         self.assertEqual(response.status, 504)
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 254 | 
 | 
 | 255 |     def testGetOnlyIfCachedNoCacheAtAll(self):
 | 
 | 256 |         # Test that can do a GET with no cache with 'only-if-cached'
 | 
 | 257 |         # Of course, there might be an intermediary beyond us
 | 
 | 258 |         # that responds to the 'only-if-cached', so this
 | 
 | 259 |         # test can't really be guaranteed to pass.
 | 
 | 260 |         http = httplib2.Http()
 | 
 | 261 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
 | 262 |         (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'})
 | 
 | 263 |         self.assertEqual(response.fromcache, False)
 | 
| Joe Gregorio | e202d21 | 2009-07-16 14:57:52 -0400 | [diff] [blame] | 264 |         self.assertEqual(response.status, 504)
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 265 | 
 | 
 | 266 |     def testUserAgent(self):
 | 
 | 267 |         # Test that we provide a default user-agent
 | 
 | 268 |         uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
 | 
 | 269 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 270 |         self.assertEqual(response.status, 200)
 | 
 | 271 |         self.assertTrue(content.startswith(b"Python-httplib2/"))
 | 
 | 272 | 
 | 
 | 273 |     def testUserAgentNonDefault(self):
 | 
 | 274 |         # Test that the default user-agent can be over-ridden
 | 
 | 275 | 
 | 
 | 276 |         uri = urllib.parse.urljoin(base, "user-agent/test.cgi")
 | 
 | 277 |         (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'})
 | 
 | 278 |         self.assertEqual(response.status, 200)
 | 
 | 279 |         self.assertTrue(content.startswith(b"fred/1.0"))
 | 
 | 280 | 
 | 
 | 281 |     def testGet300WithLocation(self):
 | 
 | 282 |         # Test the we automatically follow 300 redirects if a Location: header is provided
 | 
 | 283 |         uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
 | 
 | 284 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 285 |         self.assertEqual(response.status, 200)
 | 
 | 286 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 287 |         self.assertEqual(response.previous.status, 300)
 | 
 | 288 |         self.assertEqual(response.previous.fromcache, False)
 | 
 | 289 | 
 | 
 | 290 |         # Confirm that the intermediate 300 is not cached
 | 
 | 291 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 292 |         self.assertEqual(response.status, 200)
 | 
 | 293 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 294 |         self.assertEqual(response.previous.status, 300)
 | 
 | 295 |         self.assertEqual(response.previous.fromcache, False)
 | 
 | 296 | 
 | 
 | 297 |     def testGet300WithLocationNoRedirect(self):
 | 
 | 298 |         # Test the we automatically follow 300 redirects if a Location: header is provided
 | 
 | 299 |         self.http.follow_redirects = False
 | 
 | 300 |         uri = urllib.parse.urljoin(base, "300/with-location-header.asis")
 | 
 | 301 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 302 |         self.assertEqual(response.status, 300)
 | 
 | 303 | 
 | 
 | 304 |     def testGet300WithoutLocation(self):
 | 
 | 305 |         # Not giving a Location: header in a 300 response is acceptable
 | 
 | 306 |         # In which case we just return the 300 response
 | 
 | 307 |         uri = urllib.parse.urljoin(base, "300/without-location-header.asis")
 | 
 | 308 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 309 |         self.assertEqual(response.status, 300)
 | 
 | 310 |         self.assertTrue(response['content-type'].startswith("text/html"))
 | 
 | 311 |         self.assertEqual(response.previous, None)
 | 
 | 312 | 
 | 
 | 313 |     def testGet301(self):
 | 
 | 314 |         # Test that we automatically follow 301 redirects
 | 
 | 315 |         # and that we cache the 301 response
 | 
 | 316 |         uri = urllib.parse.urljoin(base, "301/onestep.asis")
 | 
 | 317 |         destination = urllib.parse.urljoin(base, "302/final-destination.txt")
 | 
 | 318 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 319 |         self.assertEqual(response.status, 200)
 | 
 | 320 |         self.assertTrue('content-location' in response)
 | 
 | 321 |         self.assertEqual(response['content-location'], destination)
 | 
 | 322 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 323 |         self.assertEqual(response.previous.status, 301)
 | 
 | 324 |         self.assertEqual(response.previous.fromcache, False)
 | 
 | 325 | 
 | 
 | 326 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 327 |         self.assertEqual(response.status, 200)
 | 
 | 328 |         self.assertEqual(response['content-location'], destination)
 | 
 | 329 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 330 |         self.assertEqual(response.previous.status, 301)
 | 
 | 331 |         self.assertEqual(response.previous.fromcache, True)
 | 
 | 332 | 
 | 
 | 333 | 
 | 
 | 334 |     def testGet301NoRedirect(self):
 | 
 | 335 |         # Test that we automatically follow 301 redirects
 | 
 | 336 |         # and that we cache the 301 response
 | 
 | 337 |         self.http.follow_redirects = False
 | 
 | 338 |         uri = urllib.parse.urljoin(base, "301/onestep.asis")
 | 
 | 339 |         destination = urllib.parse.urljoin(base, "302/final-destination.txt")
 | 
 | 340 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 341 |         self.assertEqual(response.status, 301)
 | 
 | 342 | 
 | 
 | 343 | 
 | 
 | 344 |     def testGet302(self):
 | 
 | 345 |         # Test that we automatically follow 302 redirects
 | 
 | 346 |         # and that we DO NOT cache the 302 response
 | 
 | 347 |         uri = urllib.parse.urljoin(base, "302/onestep.asis")
 | 
 | 348 |         destination = urllib.parse.urljoin(base, "302/final-destination.txt")
 | 
 | 349 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 350 |         self.assertEqual(response.status, 200)
 | 
 | 351 |         self.assertEqual(response['content-location'], destination)
 | 
 | 352 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 353 |         self.assertEqual(response.previous.status, 302)
 | 
 | 354 |         self.assertEqual(response.previous.fromcache, False)
 | 
 | 355 | 
 | 
 | 356 |         uri = urllib.parse.urljoin(base, "302/onestep.asis")
 | 
 | 357 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 358 |         self.assertEqual(response.status, 200)
 | 
 | 359 |         self.assertEqual(response.fromcache, True)
 | 
 | 360 |         self.assertEqual(response['content-location'], destination)
 | 
 | 361 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 362 |         self.assertEqual(response.previous.status, 302)
 | 
 | 363 |         self.assertEqual(response.previous.fromcache, False)
 | 
 | 364 |         self.assertEqual(response.previous['content-location'], uri)
 | 
 | 365 | 
 | 
 | 366 |         uri = urllib.parse.urljoin(base, "302/twostep.asis")
 | 
 | 367 | 
 | 
 | 368 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 369 |         self.assertEqual(response.status, 200)
 | 
 | 370 |         self.assertEqual(response.fromcache, True)
 | 
 | 371 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 372 |         self.assertEqual(response.previous.status, 302)
 | 
 | 373 |         self.assertEqual(response.previous.fromcache, False)
 | 
 | 374 | 
 | 
 | 375 |     def testGet302RedirectionLimit(self):
 | 
 | 376 |         # Test that we can set a lower redirection limit
 | 
 | 377 |         # and that we raise an exception when we exceed
 | 
 | 378 |         # that limit.
 | 
 | 379 |         self.http.force_exception_to_status_code = False 
 | 
 | 380 | 
 | 
 | 381 |         uri = urllib.parse.urljoin(base, "302/twostep.asis")
 | 
 | 382 |         try:
 | 
 | 383 |             (response, content) = self.http.request(uri, "GET", redirections = 1)
 | 
 | 384 |             self.fail("This should not happen")
 | 
 | 385 |         except httplib2.RedirectLimit:
 | 
 | 386 |             pass
 | 
 | 387 |         except Exception as e:
 | 
 | 388 |             self.fail("Threw wrong kind of exception ")
 | 
 | 389 | 
 | 
 | 390 |         # Re-run the test with out the exceptions
 | 
 | 391 |         self.http.force_exception_to_status_code = True 
 | 
 | 392 | 
 | 
 | 393 |         (response, content) = self.http.request(uri, "GET", redirections = 1)
 | 
 | 394 |         self.assertEqual(response.status, 500)
 | 
 | 395 |         self.assertTrue(response.reason.startswith("Redirected more"))
 | 
 | 396 |         self.assertEqual("302", response['status'])
 | 
 | 397 |         self.assertTrue(content.startswith(b"<html>"))
 | 
 | 398 |         self.assertTrue(response.previous != None)
 | 
 | 399 | 
 | 
 | 400 |     def testGet302NoLocation(self):
 | 
 | 401 |         # Test that we throw an exception when we get
 | 
 | 402 |         # a 302 with no Location: header.
 | 
 | 403 |         self.http.force_exception_to_status_code = False 
 | 
 | 404 |         uri = urllib.parse.urljoin(base, "302/no-location.asis")
 | 
 | 405 |         try:
 | 
 | 406 |             (response, content) = self.http.request(uri, "GET")
 | 
 | 407 |             self.fail("Should never reach here")
 | 
 | 408 |         except httplib2.RedirectMissingLocation:
 | 
 | 409 |             pass
 | 
 | 410 |         except Exception as e:
 | 
 | 411 |             self.fail("Threw wrong kind of exception ")
 | 
 | 412 | 
 | 
 | 413 |         # Re-run the test with out the exceptions
 | 
 | 414 |         self.http.force_exception_to_status_code = True 
 | 
 | 415 | 
 | 
 | 416 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 417 |         self.assertEqual(response.status, 500)
 | 
 | 418 |         self.assertTrue(response.reason.startswith("Redirected but"))
 | 
 | 419 |         self.assertEqual("302", response['status'])
 | 
 | 420 |         self.assertTrue(content.startswith(b"This is content"))
 | 
 | 421 |  
 | 
 | 422 |     def testGet302ViaHttps(self):
 | 
 | 423 |         # Google always redirects to http://google.com
 | 
 | 424 |         (response, content) = self.http.request("https://google.com", "GET")
 | 
 | 425 |         self.assertEqual(200, response.status)
 | 
 | 426 |         self.assertEqual(302, response.previous.status)
 | 
 | 427 | 
 | 
 | 428 |     def testGetViaHttps(self):
 | 
 | 429 |         # Test that we can handle HTTPS
 | 
 | 430 |         (response, content) = self.http.request("https://google.com/adsense/", "GET")
 | 
 | 431 |         self.assertEqual(200, response.status)
 | 
 | 432 | 
 | 
 | 433 |     def testGetViaHttpsSpecViolationOnLocation(self):
 | 
 | 434 |         # Test that we follow redirects through HTTPS
 | 
 | 435 |         # even if they violate the spec by including
 | 
 | 436 |         # a relative Location: header instead of an 
 | 
 | 437 |         # absolute one.
 | 
 | 438 |         (response, content) = self.http.request("https://google.com/adsense", "GET")
 | 
 | 439 |         self.assertEqual(200, response.status)
 | 
 | 440 |         self.assertNotEqual(None, response.previous)
 | 
 | 441 | 
 | 
 | 442 | 
 | 
 | 443 |     def testGetViaHttpsKeyCert(self):
 | 
 | 444 |         #  At this point I can only test
 | 
 | 445 |         #  that the key and cert files are passed in 
 | 
 | 446 |         #  correctly to httplib. It would be nice to have 
 | 
 | 447 |         #  a real https endpoint to test against.
 | 
 | 448 |         http = httplib2.Http(timeout=2)
 | 
 | 449 | 
 | 
 | 450 |         http.add_certificate("akeyfile", "acertfile", "bitworking.org")
 | 
 | 451 |         try:
 | 
 | 452 |             (response, content) = http.request("https://bitworking.org", "GET")
 | 
 | 453 |         except:
 | 
 | 454 |             pass
 | 
 | 455 |         self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile")
 | 
 | 456 |         self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile")
 | 
 | 457 | 
 | 
 | 458 |         try:
 | 
 | 459 |             (response, content) = http.request("https://notthere.bitworking.org", "GET")
 | 
 | 460 |         except:
 | 
 | 461 |             pass
 | 
 | 462 |         self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None)
 | 
 | 463 |         self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None)
 | 
 | 464 | 
 | 
 | 465 | 
 | 
 | 466 | 
 | 
 | 467 | 
 | 
 | 468 |     def testGet303(self):
 | 
 | 469 |         # Do a follow-up GET on a Location: header
 | 
 | 470 |         # returned from a POST that gave a 303.
 | 
 | 471 |         uri = urllib.parse.urljoin(base, "303/303.cgi")
 | 
 | 472 |         (response, content) = self.http.request(uri, "POST", " ")
 | 
 | 473 |         self.assertEqual(response.status, 200)
 | 
 | 474 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 475 |         self.assertEqual(response.previous.status, 303)
 | 
 | 476 | 
 | 
 | 477 |     def testGet303NoRedirect(self):
 | 
 | 478 |         # Do a follow-up GET on a Location: header
 | 
 | 479 |         # returned from a POST that gave a 303.
 | 
 | 480 |         self.http.follow_redirects = False
 | 
 | 481 |         uri = urllib.parse.urljoin(base, "303/303.cgi")
 | 
 | 482 |         (response, content) = self.http.request(uri, "POST", " ")
 | 
 | 483 |         self.assertEqual(response.status, 303)
 | 
 | 484 | 
 | 
 | 485 |     def test303ForDifferentMethods(self):
 | 
 | 486 |         # Test that all methods can be used
 | 
 | 487 |         uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi")
 | 
 | 488 |         for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]: 
 | 
 | 489 |             (response, content) = self.http.request(uri, method, body=b" ")
 | 
 | 490 |             self.assertEqual(response['x-method'], method_on_303)
 | 
 | 491 | 
 | 
 | 492 |     def testGet304(self):
 | 
 | 493 |         # Test that we use ETags properly to validate our cache
 | 
 | 494 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
 | 495 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 496 |         self.assertNotEqual(response['etag'], "")
 | 
 | 497 | 
 | 
 | 498 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 499 |         (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'})
 | 
 | 500 |         self.assertEqual(response.status, 200)
 | 
 | 501 |         self.assertEqual(response.fromcache, True)
 | 
 | 502 | 
 | 
 | 503 |         cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
 | 
 | 504 |         f = open(cache_file_name, "r")
 | 
 | 505 |         status_line = f.readline()
 | 
 | 506 |         f.close()
 | 
 | 507 | 
 | 
 | 508 |         self.assertTrue(status_line.startswith("status:"))
 | 
 | 509 | 
 | 
 | 510 |         (response, content) = self.http.request(uri, "HEAD")
 | 
 | 511 |         self.assertEqual(response.status, 200)
 | 
 | 512 |         self.assertEqual(response.fromcache, True)
 | 
 | 513 | 
 | 
 | 514 |         (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'})
 | 
 | 515 |         self.assertEqual(response.status, 206)
 | 
 | 516 |         self.assertEqual(response.fromcache, False)
 | 
 | 517 | 
 | 
 | 518 |     def testGetIgnoreEtag(self):
 | 
 | 519 |         # Test that we can forcibly ignore ETags 
 | 
 | 520 |         uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
 | 
 | 521 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 522 |         self.assertNotEqual(response['etag'], "")
 | 
 | 523 | 
 | 
 | 524 |         (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
 | 
 | 525 |         d = self.reflector(content)
 | 
 | 526 |         self.assertTrue('HTTP_IF_NONE_MATCH' in d) 
 | 
 | 527 | 
 | 
 | 528 |         self.http.ignore_etag = True
 | 
 | 529 |         (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
 | 
 | 530 |         d = self.reflector(content)
 | 
 | 531 |         self.assertEqual(response.fromcache, False)
 | 
 | 532 |         self.assertFalse('HTTP_IF_NONE_MATCH' in d) 
 | 
 | 533 | 
 | 
 | 534 |     def testOverrideEtag(self):
 | 
 | 535 |         # Test that we can forcibly ignore ETags 
 | 
 | 536 |         uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
 | 
 | 537 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 538 |         self.assertNotEqual(response['etag'], "")
 | 
 | 539 | 
 | 
 | 540 |         (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'})
 | 
 | 541 |         d = self.reflector(content)
 | 
 | 542 |         self.assertTrue('HTTP_IF_NONE_MATCH' in d) 
 | 
 | 543 |         self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred") 
 | 
 | 544 | 
 | 
 | 545 |         (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'})
 | 
 | 546 |         d = self.reflector(content)
 | 
 | 547 |         self.assertTrue('HTTP_IF_NONE_MATCH' in d) 
 | 
 | 548 |         self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred") 
 | 
 | 549 | 
 | 
 | 550 | #MAP-commented this out because it consistently fails
 | 
 | 551 | #    def testGet304EndToEnd(self):
 | 
 | 552 | #       # Test that end to end headers get overwritten in the cache
 | 
 | 553 | #        uri = urllib.parse.urljoin(base, "304/end2end.cgi")
 | 
 | 554 | #        (response, content) = self.http.request(uri, "GET")
 | 
 | 555 | #        self.assertNotEqual(response['etag'], "")
 | 
 | 556 | #        old_date = response['date']
 | 
 | 557 | #        time.sleep(2)
 | 
 | 558 | #
 | 
 | 559 | #        (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'})
 | 
 | 560 | #        # The response should be from the cache, but the Date: header should be updated.
 | 
 | 561 | #        new_date = response['date']
 | 
 | 562 | #        self.assertNotEqual(new_date, old_date)
 | 
 | 563 | #        self.assertEqual(response.status, 200)
 | 
 | 564 | #        self.assertEqual(response.fromcache, True)
 | 
 | 565 | 
 | 
 | 566 |     def testGet304LastModified(self):
 | 
 | 567 |         # Test that we can still handle a 304 
 | 
 | 568 |         # by only using the last-modified cache validator.
 | 
 | 569 |         uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt")
 | 
 | 570 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 571 | 
 | 
 | 572 |         self.assertNotEqual(response['last-modified'], "")
 | 
 | 573 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 574 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 575 |         self.assertEqual(response.status, 200)
 | 
 | 576 |         self.assertEqual(response.fromcache, True)
 | 
 | 577 | 
 | 
 | 578 |     def testGet307(self):
 | 
 | 579 |         # Test that we do follow 307 redirects but
 | 
 | 580 |         # do not cache the 307
 | 
 | 581 |         uri = urllib.parse.urljoin(base, "307/onestep.asis")
 | 
 | 582 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 583 |         self.assertEqual(response.status, 200)
 | 
 | 584 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 585 |         self.assertEqual(response.previous.status, 307)
 | 
 | 586 |         self.assertEqual(response.previous.fromcache, False)
 | 
 | 587 | 
 | 
 | 588 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 589 |         self.assertEqual(response.status, 200)
 | 
 | 590 |         self.assertEqual(response.fromcache, True)
 | 
 | 591 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 592 |         self.assertEqual(response.previous.status, 307)
 | 
 | 593 |         self.assertEqual(response.previous.fromcache, False)
 | 
 | 594 | 
 | 
 | 595 |     def testGet410(self):
 | 
 | 596 |         # Test that we pass 410's through
 | 
 | 597 |         uri = urllib.parse.urljoin(base, "410/410.asis")
 | 
 | 598 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 599 |         self.assertEqual(response.status, 410)
 | 
 | 600 | 
 | 
| chris dent | 89f1514 | 2009-12-24 14:02:57 -0600 | [diff] [blame] | 601 |     def testVaryHeaderSimple(self):
 | 
 | 602 |         """
 | 
 | 603 |         RFC 2616 13.6
 | 
 | 604 |         When the cache receives a subsequent request whose Request-URI
 | 
 | 605 |         specifies one or more cache entries including a Vary header field,
 | 
 | 606 |         the cache MUST NOT use such a cache entry to construct a response
 | 
 | 607 |         to the new request unless all of the selecting request-headers
 | 
 | 608 |         present in the new request match the corresponding stored
 | 
 | 609 |         request-headers in the original request.
 | 
 | 610 |         """
 | 
 | 611 |         # test that the vary header is sent
 | 
 | 612 |         uri = urllib.parse.urljoin(base, "vary/accept.asis")
 | 
 | 613 |         (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 | 
 | 614 |         self.assertEqual(response.status, 200)
 | 
 | 615 |         self.assertTrue('vary' in response)
 | 
 | 616 | 
 | 
 | 617 |         # get the resource again, from the cache since accept header in this
 | 
 | 618 |         # request is the same as the request
 | 
 | 619 |         (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 | 
 | 620 |         self.assertEqual(response.status, 200)
 | 
 | 621 |         self.assertEqual(response.fromcache, True, msg="Should be from cache")
 | 
 | 622 | 
 | 
 | 623 |         # get the resource again, not from cache since Accept headers does not match
 | 
 | 624 |         (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
 | 
 | 625 |         self.assertEqual(response.status, 200)
 | 
 | 626 |         self.assertEqual(response.fromcache, False, msg="Should not be from cache")
 | 
 | 627 | 
 | 
 | 628 |         # get the resource again, without any Accept header, so again no match
 | 
 | 629 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 630 |         self.assertEqual(response.status, 200)
 | 
 | 631 |         self.assertEqual(response.fromcache, False, msg="Should not be from cache")
 | 
 | 632 | 
 | 
 | 633 |     def testNoVary(self):
 | 
 | 634 |         # when there is no vary, a different Accept header (e.g.) should not
 | 
 | 635 |         # impact if the cache is used
 | 
 | 636 |         # test that the vary header is not sent
 | 
 | 637 |         uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
 | 
 | 638 |         (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 | 
 | 639 |         self.assertEqual(response.status, 200)
 | 
 | 640 |         self.assertFalse('vary' in response)
 | 
 | 641 | 
 | 
 | 642 |         (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 | 
 | 643 |         self.assertEqual(response.status, 200)
 | 
 | 644 |         self.assertEqual(response.fromcache, True, msg="Should be from cache")
 | 
 | 645 |         
 | 
 | 646 |         (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
 | 
 | 647 |         self.assertEqual(response.status, 200)
 | 
 | 648 |         self.assertEqual(response.fromcache, True, msg="Should be from cache")
 | 
 | 649 | 
 | 
 | 650 |     def testVaryHeaderDouble(self):
 | 
 | 651 |         uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
 | 
 | 652 |         (response, content) = self.http.request(uri, "GET", headers={
 | 
 | 653 |             'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
 | 
 | 654 |         self.assertEqual(response.status, 200)
 | 
 | 655 |         self.assertTrue('vary' in response)
 | 
 | 656 | 
 | 
 | 657 |         # we are from cache
 | 
 | 658 |         (response, content) = self.http.request(uri, "GET", headers={
 | 
 | 659 |             'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
 | 
 | 660 |         self.assertEqual(response.fromcache, True, msg="Should be from cache")
 | 
 | 661 | 
 | 
 | 662 |         (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
 | 
 | 663 |         self.assertEqual(response.status, 200)
 | 
 | 664 |         self.assertEqual(response.fromcache, False)
 | 
 | 665 | 
 | 
 | 666 |         # get the resource again, not from cache, varied headers don't match exact
 | 
 | 667 |         (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
 | 
 | 668 |         self.assertEqual(response.status, 200)
 | 
 | 669 |         self.assertEqual(response.fromcache, False, msg="Should not be from cache")
 | 
 | 670 | 
 | 
| jcgregorio | 88ef89b | 2010-05-13 23:42:11 -0400 | [diff] [blame] | 671 |     def testVaryUnusedHeader(self):
 | 
 | 672 |         # A header's value is not considered to vary if it's not used at all.
 | 
 | 673 |         uri = urllib.parse.urljoin(base, "vary/unused-header.asis")
 | 
 | 674 |         (response, content) = self.http.request(uri, "GET", headers={
 | 
 | 675 |             'Accept': 'text/plain'})
 | 
 | 676 |         self.assertEqual(response.status, 200)
 | 
 | 677 |         self.assertTrue('vary' in response)
 | 
 | 678 | 
 | 
 | 679 |         # we are from cache
 | 
 | 680 |         (response, content) = self.http.request(uri, "GET", headers={
 | 
 | 681 |             'Accept': 'text/plain',})
 | 
 | 682 |         self.assertEqual(response.fromcache, True, msg="Should be from cache")
 | 
 | 683 | 
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 684 |     def testHeadGZip(self):
 | 
 | 685 |         # Test that we don't try to decompress a HEAD response 
 | 
 | 686 |         uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
 | 
 | 687 |         (response, content) = self.http.request(uri, "HEAD")
 | 
 | 688 |         self.assertEqual(response.status, 200)
 | 
 | 689 |         self.assertNotEqual(int(response['content-length']), 0)
 | 
 | 690 |         self.assertEqual(content, b"")
 | 
 | 691 | 
 | 
 | 692 |     def testGetGZip(self):
 | 
 | 693 |         # Test that we support gzip compression
 | 
 | 694 |         uri = urllib.parse.urljoin(base, "gzip/final-destination.txt")
 | 
 | 695 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 696 |         self.assertEqual(response.status, 200)
 | 
 | 697 |         self.assertFalse('content-encoding' in response)
 | 
 | 698 |         self.assertTrue('-content-encoding' in response)
 | 
 | 699 |         self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n"))
 | 
 | 700 |         self.assertEqual(content, b"This is the final destination.\n")
 | 
 | 701 | 
 | 
| Joe Gregorio | d1137c5 | 2011-02-13 19:27:35 -0500 | [diff] [blame^] | 702 |     def testPostAndGZipResponse(self):
 | 
 | 703 |         uri = urllib.parse.urljoin(base, "gzip/post.cgi")
 | 
 | 704 |         (response, content) = self.http.request(uri, "POST", body=" ")
 | 
 | 705 |         self.assertEqual(response.status, 200)
 | 
 | 706 |         self.assertFalse('content-encoding' in response)
 | 
 | 707 |         self.assertTrue('-content-encoding' in response)
 | 
 | 708 | 
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 709 |     def testGetGZipFailure(self):
 | 
 | 710 |         # Test that we raise a good exception when the gzip fails
 | 
 | 711 |         self.http.force_exception_to_status_code = False 
 | 
 | 712 |         uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
 | 
 | 713 |         try:
 | 
 | 714 |             (response, content) = self.http.request(uri, "GET")
 | 
 | 715 |             self.fail("Should never reach here")
 | 
 | 716 |         except httplib2.FailedToDecompressContent:
 | 
 | 717 |             pass
 | 
 | 718 |         except Exception:
 | 
 | 719 |             self.fail("Threw wrong kind of exception")
 | 
 | 720 | 
 | 
 | 721 |         # Re-run the test with out the exceptions
 | 
 | 722 |         self.http.force_exception_to_status_code = True 
 | 
 | 723 | 
 | 
 | 724 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 725 |         self.assertEqual(response.status, 500)
 | 
 | 726 |         self.assertTrue(response.reason.startswith("Content purported"))
 | 
 | 727 | 
 | 
 | 728 |     def testTimeout(self):
 | 
 | 729 |         self.http.force_exception_to_status_code = True 
 | 
 | 730 |         uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
 | 
 | 731 |         try:
 | 
 | 732 |             import socket
 | 
 | 733 |             socket.setdefaulttimeout(1) 
 | 
 | 734 |         except:
 | 
 | 735 |             # Don't run the test if we can't set the timeout
 | 
 | 736 |             return 
 | 
 | 737 |         (response, content) = self.http.request(uri)
 | 
 | 738 |         self.assertEqual(response.status, 408)
 | 
 | 739 |         self.assertTrue(response.reason.startswith("Request Timeout"))
 | 
 | 740 |         self.assertTrue(content.startswith(b"Request Timeout"))
 | 
 | 741 | 
 | 
 | 742 |     def testIndividualTimeout(self):
 | 
 | 743 |         uri = urllib.parse.urljoin(base, "timeout/timeout.cgi")
 | 
 | 744 |         http = httplib2.Http(timeout=1)
 | 
 | 745 |         http.force_exception_to_status_code = True 
 | 
 | 746 | 
 | 
 | 747 |         (response, content) = http.request(uri)
 | 
 | 748 |         self.assertEqual(response.status, 408)
 | 
 | 749 |         self.assertTrue(response.reason.startswith("Request Timeout"))
 | 
 | 750 |         self.assertTrue(content.startswith(b"Request Timeout"))
 | 
 | 751 | 
 | 
 | 752 | 
 | 
 | 753 |     def testGetDeflate(self):
 | 
 | 754 |         # Test that we support deflate compression
 | 
 | 755 |         uri = urllib.parse.urljoin(base, "deflate/deflated.asis")
 | 
 | 756 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 757 |         self.assertEqual(response.status, 200)
 | 
 | 758 |         self.assertFalse('content-encoding' in response)
 | 
 | 759 |         self.assertEqual(int(response['content-length']), len("This is the final destination."))
 | 
 | 760 |         self.assertEqual(content, b"This is the final destination.")
 | 
 | 761 | 
 | 
 | 762 |     def testGetDeflateFailure(self):
 | 
 | 763 |         # Test that we raise a good exception when the deflate fails
 | 
 | 764 |         self.http.force_exception_to_status_code = False 
 | 
 | 765 | 
 | 
 | 766 |         uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
 | 
 | 767 |         try:
 | 
 | 768 |             (response, content) = self.http.request(uri, "GET")
 | 
 | 769 |             self.fail("Should never reach here")
 | 
 | 770 |         except httplib2.FailedToDecompressContent:
 | 
 | 771 |             pass
 | 
 | 772 |         except Exception:
 | 
 | 773 |             self.fail("Threw wrong kind of exception")
 | 
 | 774 | 
 | 
 | 775 |         # Re-run the test with out the exceptions
 | 
 | 776 |         self.http.force_exception_to_status_code = True 
 | 
 | 777 | 
 | 
 | 778 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 779 |         self.assertEqual(response.status, 500)
 | 
 | 780 |         self.assertTrue(response.reason.startswith("Content purported"))
 | 
 | 781 | 
 | 
 | 782 |     def testGetDuplicateHeaders(self):
 | 
 | 783 |         # Test that duplicate headers get concatenated via ','
 | 
 | 784 |         uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis")
 | 
 | 785 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 786 |         self.assertEqual(response.status, 200)
 | 
 | 787 |         self.assertEqual(content, b"This is content\n")
 | 
 | 788 |         self.assertEqual(response['link'].split(",")[0], '<http://bitworking.org>; rel="home"; title="BitWorking"')
 | 
 | 789 | 
 | 
 | 790 |     def testGetCacheControlNoCache(self):
 | 
 | 791 |         # Test Cache-Control: no-cache on requests
 | 
 | 792 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
 | 793 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 794 |         self.assertNotEqual(response['etag'], "")
 | 
 | 795 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 796 |         self.assertEqual(response.status, 200)
 | 
 | 797 |         self.assertEqual(response.fromcache, True)
 | 
 | 798 | 
 | 
 | 799 |         (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'})
 | 
 | 800 |         self.assertEqual(response.status, 200)
 | 
 | 801 |         self.assertEqual(response.fromcache, False)
 | 
 | 802 | 
 | 
 | 803 |     def testGetCacheControlPragmaNoCache(self):
 | 
 | 804 |         # Test Pragma: no-cache on requests
 | 
 | 805 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
 | 806 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 807 |         self.assertNotEqual(response['etag'], "")
 | 
 | 808 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 809 |         self.assertEqual(response.status, 200)
 | 
 | 810 |         self.assertEqual(response.fromcache, True)
 | 
 | 811 | 
 | 
 | 812 |         (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'})
 | 
 | 813 |         self.assertEqual(response.status, 200)
 | 
 | 814 |         self.assertEqual(response.fromcache, False)
 | 
 | 815 | 
 | 
 | 816 |     def testGetCacheControlNoStoreRequest(self):
 | 
 | 817 |         # A no-store request means that the response should not be stored.
 | 
 | 818 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
 | 819 | 
 | 
 | 820 |         (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
 | 
 | 821 |         self.assertEqual(response.status, 200)
 | 
 | 822 |         self.assertEqual(response.fromcache, False)
 | 
 | 823 | 
 | 
 | 824 |         (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'})
 | 
 | 825 |         self.assertEqual(response.status, 200)
 | 
 | 826 |         self.assertEqual(response.fromcache, False)
 | 
 | 827 | 
 | 
 | 828 |     def testGetCacheControlNoStoreResponse(self):
 | 
 | 829 |         # A no-store response means that the response should not be stored.
 | 
 | 830 |         uri = urllib.parse.urljoin(base, "no-store/no-store.asis")
 | 
 | 831 | 
 | 
 | 832 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 833 |         self.assertEqual(response.status, 200)
 | 
 | 834 |         self.assertEqual(response.fromcache, False)
 | 
 | 835 | 
 | 
 | 836 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 837 |         self.assertEqual(response.status, 200)
 | 
 | 838 |         self.assertEqual(response.fromcache, False)
 | 
 | 839 | 
 | 
 | 840 |     def testGetCacheControlNoCacheNoStoreRequest(self):
 | 
 | 841 |         # Test that a no-store, no-cache clears the entry from the cache
 | 
 | 842 |         # even if it was cached previously.
 | 
 | 843 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
 | 844 | 
 | 
 | 845 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 846 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 847 |         self.assertEqual(response.fromcache, True)
 | 
 | 848 |         (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
 | 
 | 849 |         (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'})
 | 
 | 850 |         self.assertEqual(response.status, 200)
 | 
 | 851 |         self.assertEqual(response.fromcache, False)
 | 
 | 852 | 
 | 
 | 853 |     def testUpdateInvalidatesCache(self):
 | 
 | 854 |         # Test that calling PUT or DELETE on a 
 | 
 | 855 |         # URI that is cache invalidates that cache.
 | 
 | 856 |         uri = urllib.parse.urljoin(base, "304/test_etag.txt")
 | 
 | 857 | 
 | 
 | 858 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 859 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 860 |         self.assertEqual(response.fromcache, True)
 | 
 | 861 |         (response, content) = self.http.request(uri, "DELETE")
 | 
 | 862 |         self.assertEqual(response.status, 405)
 | 
 | 863 | 
 | 
 | 864 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 865 |         self.assertEqual(response.fromcache, False)
 | 
 | 866 | 
 | 
 | 867 |     def testUpdateUsesCachedETag(self):
 | 
 | 868 |         # Test that we natively support http://www.w3.org/1999/04/Editing/ 
 | 
 | 869 |         uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
 | 
 | 870 | 
 | 
 | 871 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 872 |         self.assertEqual(response.status, 200)
 | 
 | 873 |         self.assertEqual(response.fromcache, False)
 | 
 | 874 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 875 |         self.assertEqual(response.status, 200)
 | 
 | 876 |         self.assertEqual(response.fromcache, True)
 | 
| Joe Gregorio | 799b207 | 2009-09-29 17:21:19 -0400 | [diff] [blame] | 877 |         (response, content) = self.http.request(uri, "PUT", body="foo")
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 878 |         self.assertEqual(response.status, 200)
 | 
| Joe Gregorio | 799b207 | 2009-09-29 17:21:19 -0400 | [diff] [blame] | 879 |         (response, content) = self.http.request(uri, "PUT", body="foo")
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 880 |         self.assertEqual(response.status, 412)
 | 
 | 881 | 
 | 
 | 882 |     def testUpdateUsesCachedETagAndOCMethod(self):
 | 
 | 883 |         # Test that we natively support http://www.w3.org/1999/04/Editing/ 
 | 
 | 884 |         uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
 | 
 | 885 | 
 | 
 | 886 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 887 |         self.assertEqual(response.status, 200)
 | 
 | 888 |         self.assertEqual(response.fromcache, False)
 | 
 | 889 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 890 |         self.assertEqual(response.status, 200)
 | 
 | 891 |         self.assertEqual(response.fromcache, True)
 | 
 | 892 |         self.http.optimistic_concurrency_methods.append("DELETE")
 | 
 | 893 |         (response, content) = self.http.request(uri, "DELETE")
 | 
 | 894 |         self.assertEqual(response.status, 200)
 | 
 | 895 | 
 | 
 | 896 | 
 | 
 | 897 |     def testUpdateUsesCachedETagOverridden(self):
 | 
 | 898 |         # Test that we natively support http://www.w3.org/1999/04/Editing/ 
 | 
 | 899 |         uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi")
 | 
 | 900 | 
 | 
 | 901 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 902 |         self.assertEqual(response.status, 200)
 | 
 | 903 |         self.assertEqual(response.fromcache, False)
 | 
 | 904 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 905 |         self.assertEqual(response.status, 200)
 | 
 | 906 |         self.assertEqual(response.fromcache, True)
 | 
| Joe Gregorio | 799b207 | 2009-09-29 17:21:19 -0400 | [diff] [blame] | 907 |         (response, content) = self.http.request(uri, "PUT", body="foo", headers={'if-match': 'fred'})
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 908 |         self.assertEqual(response.status, 412)
 | 
 | 909 | 
 | 
 | 910 |     def testBasicAuth(self):
 | 
 | 911 |         # Test Basic Authentication
 | 
 | 912 |         uri = urllib.parse.urljoin(base, "basic/file.txt")
 | 
 | 913 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 914 |         self.assertEqual(response.status, 401)
 | 
 | 915 | 
 | 
 | 916 |         uri = urllib.parse.urljoin(base, "basic/")
 | 
 | 917 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 918 |         self.assertEqual(response.status, 401)
 | 
 | 919 | 
 | 
 | 920 |         self.http.add_credentials('joe', 'password')
 | 
 | 921 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 922 |         self.assertEqual(response.status, 200)
 | 
 | 923 | 
 | 
 | 924 |         uri = urllib.parse.urljoin(base, "basic/file.txt")
 | 
 | 925 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 926 |         self.assertEqual(response.status, 200)
 | 
 | 927 | 
 | 
 | 928 |     def testBasicAuthWithDomain(self):
 | 
 | 929 |         # Test Basic Authentication
 | 
 | 930 |         uri = urllib.parse.urljoin(base, "basic/file.txt")
 | 
 | 931 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 932 |         self.assertEqual(response.status, 401)
 | 
 | 933 | 
 | 
 | 934 |         uri = urllib.parse.urljoin(base, "basic/")
 | 
 | 935 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 936 |         self.assertEqual(response.status, 401)
 | 
 | 937 | 
 | 
 | 938 |         self.http.add_credentials('joe', 'password', "example.org")
 | 
 | 939 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 940 |         self.assertEqual(response.status, 401)
 | 
 | 941 | 
 | 
 | 942 |         uri = urllib.parse.urljoin(base, "basic/file.txt")
 | 
 | 943 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 944 |         self.assertEqual(response.status, 401)
 | 
 | 945 | 
 | 
 | 946 |         domain = urllib.parse.urlparse(base)[1] 
 | 
 | 947 |         self.http.add_credentials('joe', 'password', domain)
 | 
 | 948 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 949 |         self.assertEqual(response.status, 200)
 | 
 | 950 | 
 | 
 | 951 |         uri = urllib.parse.urljoin(base, "basic/file.txt")
 | 
 | 952 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 953 |         self.assertEqual(response.status, 200)
 | 
 | 954 | 
 | 
 | 955 | 
 | 
 | 956 | 
 | 
 | 957 | 
 | 
 | 958 | 
 | 
 | 959 | 
 | 
 | 960 |     def testBasicAuthTwoDifferentCredentials(self):
 | 
 | 961 |         # Test Basic Authentication with multiple sets of credentials
 | 
 | 962 |         uri = urllib.parse.urljoin(base, "basic2/file.txt")
 | 
 | 963 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 964 |         self.assertEqual(response.status, 401)
 | 
 | 965 | 
 | 
 | 966 |         uri = urllib.parse.urljoin(base, "basic2/")
 | 
 | 967 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 968 |         self.assertEqual(response.status, 401)
 | 
 | 969 | 
 | 
 | 970 |         self.http.add_credentials('fred', 'barney')
 | 
 | 971 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 972 |         self.assertEqual(response.status, 200)
 | 
 | 973 | 
 | 
 | 974 |         uri = urllib.parse.urljoin(base, "basic2/file.txt")
 | 
 | 975 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 976 |         self.assertEqual(response.status, 200)
 | 
 | 977 | 
 | 
 | 978 |     def testBasicAuthNested(self):
 | 
 | 979 |         # Test Basic Authentication with resources
 | 
 | 980 |         # that are nested
 | 
 | 981 |         uri = urllib.parse.urljoin(base, "basic-nested/")
 | 
 | 982 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 983 |         self.assertEqual(response.status, 401)
 | 
 | 984 | 
 | 
 | 985 |         uri = urllib.parse.urljoin(base, "basic-nested/subdir")
 | 
 | 986 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 987 |         self.assertEqual(response.status, 401)
 | 
 | 988 | 
 | 
 | 989 |         # Now add in credentials one at a time and test.
 | 
 | 990 |         self.http.add_credentials('joe', 'password')
 | 
 | 991 | 
 | 
 | 992 |         uri = urllib.parse.urljoin(base, "basic-nested/")
 | 
 | 993 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 994 |         self.assertEqual(response.status, 200)
 | 
 | 995 | 
 | 
 | 996 |         uri = urllib.parse.urljoin(base, "basic-nested/subdir")
 | 
 | 997 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 998 |         self.assertEqual(response.status, 401)
 | 
 | 999 | 
 | 
 | 1000 |         self.http.add_credentials('fred', 'barney')
 | 
 | 1001 | 
 | 
 | 1002 |         uri = urllib.parse.urljoin(base, "basic-nested/")
 | 
 | 1003 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 1004 |         self.assertEqual(response.status, 200)
 | 
 | 1005 | 
 | 
 | 1006 |         uri = urllib.parse.urljoin(base, "basic-nested/subdir")
 | 
 | 1007 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 1008 |         self.assertEqual(response.status, 200)
 | 
 | 1009 | 
 | 
 | 1010 |     def testDigestAuth(self):
 | 
 | 1011 |         # Test that we support Digest Authentication
 | 
 | 1012 |         uri = urllib.parse.urljoin(base, "digest/")
 | 
 | 1013 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 1014 |         self.assertEqual(response.status, 401)
 | 
 | 1015 | 
 | 
 | 1016 |         self.http.add_credentials('joe', 'password')
 | 
 | 1017 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 1018 |         self.assertEqual(response.status, 200)
 | 
 | 1019 | 
 | 
 | 1020 |         uri = urllib.parse.urljoin(base, "digest/file.txt")
 | 
 | 1021 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 1022 | 
 | 
 | 1023 |     def testDigestAuthNextNonceAndNC(self):
 | 
 | 1024 |         # Test that if the server sets nextnonce that we reset
 | 
 | 1025 |         # the nonce count back to 1
 | 
 | 1026 |         uri = urllib.parse.urljoin(base, "digest/file.txt")
 | 
 | 1027 |         self.http.add_credentials('joe', 'password')
 | 
 | 1028 |         (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
 | 
 | 1029 |         info = httplib2._parse_www_authenticate(response, 'authentication-info')
 | 
 | 1030 |         self.assertEqual(response.status, 200)
 | 
 | 1031 |         (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
 | 
 | 1032 |         info2 = httplib2._parse_www_authenticate(response, 'authentication-info')
 | 
 | 1033 |         self.assertEqual(response.status, 200)
 | 
 | 1034 | 
 | 
 | 1035 |         if 'nextnonce' in info:
 | 
 | 1036 |             self.assertEqual(info2['nc'], 1)
 | 
 | 1037 | 
 | 
 | 1038 |     def testDigestAuthStale(self):
 | 
 | 1039 |         # Test that we can handle a nonce becoming stale
 | 
 | 1040 |         uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
 | 
 | 1041 |         self.http.add_credentials('joe', 'password')
 | 
 | 1042 |         (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
 | 
 | 1043 |         info = httplib2._parse_www_authenticate(response, 'authentication-info')
 | 
 | 1044 |         self.assertEqual(response.status, 200)
 | 
 | 1045 | 
 | 
 | 1046 |         time.sleep(3)
 | 
 | 1047 |         # Sleep long enough that the nonce becomes stale
 | 
 | 1048 | 
 | 
 | 1049 |         (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
 | 
 | 1050 |         self.assertFalse(response.fromcache)
 | 
 | 1051 |         self.assertTrue(response._stale_digest)
 | 
 | 1052 |         info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
 | 
 | 1053 |         self.assertEqual(response.status, 200)
 | 
 | 1054 | 
 | 
 | 1055 |     def reflector(self, content):
 | 
 | 1056 |         return  dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] )
 | 
 | 1057 | 
 | 
 | 1058 |     def testReflector(self):
 | 
 | 1059 |         uri = urllib.parse.urljoin(base, "reflector/reflector.cgi")
 | 
 | 1060 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 1061 |         d = self.reflector(content)
 | 
 | 1062 |         self.assertTrue('HTTP_USER_AGENT' in d) 
 | 
 | 1063 | 
 | 
| Joe Gregorio | 84cc10a | 2009-09-01 13:02:49 -0400 | [diff] [blame] | 1064 | 
 | 
 | 1065 |     def testConnectionClose(self):
 | 
 | 1066 |         uri = "http://www.google.com/"
 | 
 | 1067 |         (response, content) = self.http.request(uri, "GET")
 | 
 | 1068 |         for c in self.http.connections.values():
 | 
 | 1069 |             self.assertNotEqual(None, c.sock)
 | 
 | 1070 |         (response, content) = self.http.request(uri, "GET", headers={"connection": "close"})
 | 
 | 1071 |         for c in self.http.connections.values():
 | 
 | 1072 |             self.assertEqual(None, c.sock)
 | 
 | 1073 | 
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 1074 | try:
 | 
 | 1075 |     import memcache
 | 
 | 1076 |     class HttpTestMemCached(HttpTest):
 | 
 | 1077 |         def setUp(self):
 | 
 | 1078 |             self.cache = memcache.Client(['127.0.0.1:11211'], debug=0)
 | 
 | 1079 |             #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1)
 | 
 | 1080 |             self.http = httplib2.Http(self.cache)
 | 
 | 1081 |             self.cache.flush_all()
 | 
 | 1082 |             # Not exactly sure why the sleep is needed here, but
 | 
 | 1083 |             # if not present then some unit tests that rely on caching
 | 
 | 1084 |             # fail. Memcached seems to lose some sets immediately
 | 
 | 1085 |             # after a flush_all if the set is to a value that
 | 
 | 1086 |             # was previously cached. (Maybe the flush is handled async?)
 | 
 | 1087 |             time.sleep(1)
 | 
 | 1088 |             self.http.clear_credentials()
 | 
 | 1089 | except:
 | 
 | 1090 |     pass
 | 
 | 1091 | 
 | 
 | 1092 | 
 | 
 | 1093 | 
 | 
 | 1094 | # ------------------------------------------------------------------------
 | 
 | 1095 | 
 | 
 | 1096 | class HttpPrivateTest(unittest.TestCase):
 | 
 | 1097 | 
 | 
 | 1098 |     def testParseCacheControl(self):
 | 
 | 1099 |         # Test that we can parse the Cache-Control header
 | 
 | 1100 |         self.assertEqual({}, httplib2._parse_cache_control({}))
 | 
 | 1101 |         self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'}))
 | 
 | 1102 |         cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'})
 | 
 | 1103 |         self.assertEqual(cc['no-cache'], 1)
 | 
 | 1104 |         self.assertEqual(cc['max-age'], '7200')
 | 
 | 1105 |         cc = httplib2._parse_cache_control({'cache-control': ' , '})
 | 
 | 1106 |         self.assertEqual(cc[''], 1)
 | 
 | 1107 | 
 | 
| Joe Gregorio | e314e8b | 2009-07-16 20:11:28 -0400 | [diff] [blame] | 1108 |         try:
 | 
 | 1109 |             cc = httplib2._parse_cache_control({'cache-control': 'Max-age=3600;post-check=1800,pre-check=3600'})
 | 
 | 1110 |             self.assertTrue("max-age" in cc)
 | 
 | 1111 |         except:
 | 
 | 1112 |             self.fail("Should not throw exception")
 | 
 | 1113 | 
 | 
 | 1114 | 
 | 
 | 1115 | 
 | 
 | 1116 | 
 | 
| pilgrim | 00a352e | 2009-05-29 04:04:44 +0000 | [diff] [blame] | 1117 |     def testNormalizeHeaders(self):
 | 
 | 1118 |         # Test that we normalize headers to lowercase 
 | 
 | 1119 |         h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'})
 | 
 | 1120 |         self.assertTrue('cache-control' in h)
 | 
 | 1121 |         self.assertTrue('other' in h)
 | 
 | 1122 |         self.assertEqual('Stuff', h['other'])
 | 
 | 1123 | 
 | 
 | 1124 |     def testExpirationModelTransparent(self):
 | 
 | 1125 |         # Test that no-cache makes our request TRANSPARENT
 | 
 | 1126 |         response_headers = {
 | 
 | 1127 |             'cache-control': 'max-age=7200'
 | 
 | 1128 |         }
 | 
 | 1129 |         request_headers = {
 | 
 | 1130 |             'cache-control': 'no-cache'
 | 
 | 1131 |         }
 | 
 | 1132 |         self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1133 | 
 | 
 | 1134 |     def testMaxAgeNonNumeric(self):
 | 
 | 1135 |         # Test that no-cache makes our request TRANSPARENT
 | 
 | 1136 |         response_headers = {
 | 
 | 1137 |             'cache-control': 'max-age=fred, min-fresh=barney'
 | 
 | 1138 |         }
 | 
 | 1139 |         request_headers = {
 | 
 | 1140 |         }
 | 
 | 1141 |         self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1142 | 
 | 
 | 1143 | 
 | 
 | 1144 |     def testExpirationModelNoCacheResponse(self):
 | 
 | 1145 |         # The date and expires point to an entry that should be
 | 
 | 1146 |         # FRESH, but the no-cache over-rides that.
 | 
 | 1147 |         now = time.time()
 | 
 | 1148 |         response_headers = {
 | 
 | 1149 |             'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 | 
 | 1150 |             'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
 | 
 | 1151 |             'cache-control': 'no-cache'
 | 
 | 1152 |         }
 | 
 | 1153 |         request_headers = {
 | 
 | 1154 |         }
 | 
 | 1155 |         self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1156 | 
 | 
 | 1157 |     def testExpirationModelStaleRequestMustReval(self):
 | 
 | 1158 |         # must-revalidate forces STALE
 | 
 | 1159 |         self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'}))
 | 
 | 1160 | 
 | 
 | 1161 |     def testExpirationModelStaleResponseMustReval(self):
 | 
 | 1162 |         # must-revalidate forces STALE
 | 
 | 1163 |         self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {}))
 | 
 | 1164 | 
 | 
 | 1165 |     def testExpirationModelFresh(self):
 | 
 | 1166 |         response_headers = {
 | 
 | 1167 |             'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
 | 
 | 1168 |             'cache-control': 'max-age=2'
 | 
 | 1169 |         }
 | 
 | 1170 |         request_headers = {
 | 
 | 1171 |         }
 | 
 | 1172 |         self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1173 |         time.sleep(3)
 | 
 | 1174 |         self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1175 | 
 | 
 | 1176 |     def testExpirationMaxAge0(self):
 | 
 | 1177 |         response_headers = {
 | 
 | 1178 |             'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
 | 
 | 1179 |             'cache-control': 'max-age=0'
 | 
 | 1180 |         }
 | 
 | 1181 |         request_headers = {
 | 
 | 1182 |         }
 | 
 | 1183 |         self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1184 | 
 | 
 | 1185 |     def testExpirationModelDateAndExpires(self):
 | 
 | 1186 |         now = time.time()
 | 
 | 1187 |         response_headers = {
 | 
 | 1188 |             'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 | 
 | 1189 |             'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
 | 
 | 1190 |         }
 | 
 | 1191 |         request_headers = {
 | 
 | 1192 |         }
 | 
 | 1193 |         self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1194 |         time.sleep(3)
 | 
 | 1195 |         self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1196 | 
 | 
 | 1197 |     def testExpiresZero(self):
 | 
 | 1198 |         now = time.time()
 | 
 | 1199 |         response_headers = {
 | 
 | 1200 |             'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 | 
 | 1201 |             'expires': "0",
 | 
 | 1202 |         }
 | 
 | 1203 |         request_headers = {
 | 
 | 1204 |         }
 | 
 | 1205 |         self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1206 | 
 | 
 | 1207 |     def testExpirationModelDateOnly(self):
 | 
 | 1208 |         now = time.time()
 | 
 | 1209 |         response_headers = {
 | 
 | 1210 |             'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)),
 | 
 | 1211 |         }
 | 
 | 1212 |         request_headers = {
 | 
 | 1213 |         }
 | 
 | 1214 |         self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1215 | 
 | 
 | 1216 |     def testExpirationModelOnlyIfCached(self):
 | 
 | 1217 |         response_headers = {
 | 
 | 1218 |         }
 | 
 | 1219 |         request_headers = {
 | 
 | 1220 |             'cache-control': 'only-if-cached',
 | 
 | 1221 |         }
 | 
 | 1222 |         self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1223 | 
 | 
 | 1224 |     def testExpirationModelMaxAgeBoth(self):
 | 
 | 1225 |         now = time.time()
 | 
 | 1226 |         response_headers = {
 | 
 | 1227 |             'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 | 
 | 1228 |             'cache-control': 'max-age=2'
 | 
 | 1229 |         }
 | 
 | 1230 |         request_headers = {
 | 
 | 1231 |             'cache-control': 'max-age=0'
 | 
 | 1232 |         }
 | 
 | 1233 |         self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1234 | 
 | 
 | 1235 |     def testExpirationModelDateAndExpiresMinFresh1(self):
 | 
 | 1236 |         now = time.time()
 | 
 | 1237 |         response_headers = {
 | 
 | 1238 |             'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 | 
 | 1239 |             'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)),
 | 
 | 1240 |         }
 | 
 | 1241 |         request_headers = {
 | 
 | 1242 |             'cache-control': 'min-fresh=2'
 | 
 | 1243 |         }
 | 
 | 1244 |         self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1245 | 
 | 
 | 1246 |     def testExpirationModelDateAndExpiresMinFresh2(self):
 | 
 | 1247 |         now = time.time()
 | 
 | 1248 |         response_headers = {
 | 
 | 1249 |             'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)),
 | 
 | 1250 |             'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)),
 | 
 | 1251 |         }
 | 
 | 1252 |         request_headers = {
 | 
 | 1253 |             'cache-control': 'min-fresh=2'
 | 
 | 1254 |         }
 | 
 | 1255 |         self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers))
 | 
 | 1256 | 
 | 
 | 1257 |     def testParseWWWAuthenticateEmpty(self):
 | 
 | 1258 |         res = httplib2._parse_www_authenticate({})
 | 
 | 1259 |         self.assertEqual(len(list(res.keys())), 0) 
 | 
 | 1260 | 
 | 
 | 1261 |     def testParseWWWAuthenticate(self):
 | 
 | 1262 |         # different uses of spaces around commas
 | 
 | 1263 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'})
 | 
 | 1264 |         self.assertEqual(len(list(res.keys())), 1)
 | 
 | 1265 |         self.assertEqual(len(list(res['test'].keys())), 5)
 | 
 | 1266 |         
 | 
 | 1267 |         # tokens with non-alphanum
 | 
 | 1268 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'})
 | 
 | 1269 |         self.assertEqual(len(list(res.keys())), 1)
 | 
 | 1270 |         self.assertEqual(len(list(res['t*!%#st'].keys())), 2)
 | 
 | 1271 |         
 | 
 | 1272 |         # quoted string with quoted pairs
 | 
 | 1273 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'})
 | 
 | 1274 |         self.assertEqual(len(list(res.keys())), 1)
 | 
 | 1275 |         self.assertEqual(res['test']['realm'], 'a "test" realm')
 | 
 | 1276 | 
 | 
 | 1277 |     def testParseWWWAuthenticateStrict(self):
 | 
 | 1278 |         httplib2.USE_WWW_AUTH_STRICT_PARSING = 1;
 | 
 | 1279 |         self.testParseWWWAuthenticate();
 | 
 | 1280 |         httplib2.USE_WWW_AUTH_STRICT_PARSING = 0;
 | 
 | 1281 | 
 | 
 | 1282 |     def testParseWWWAuthenticateBasic(self):
 | 
 | 1283 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'})
 | 
 | 1284 |         basic = res['basic']
 | 
 | 1285 |         self.assertEqual('me', basic['realm'])
 | 
 | 1286 | 
 | 
 | 1287 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'})
 | 
 | 1288 |         basic = res['basic']
 | 
 | 1289 |         self.assertEqual('me', basic['realm'])
 | 
 | 1290 |         self.assertEqual('MD5', basic['algorithm'])
 | 
 | 1291 | 
 | 
 | 1292 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'})
 | 
 | 1293 |         basic = res['basic']
 | 
 | 1294 |         self.assertEqual('me', basic['realm'])
 | 
 | 1295 |         self.assertEqual('MD5', basic['algorithm'])
 | 
 | 1296 | 
 | 
 | 1297 |     def testParseWWWAuthenticateBasic2(self):
 | 
 | 1298 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '})
 | 
 | 1299 |         basic = res['basic']
 | 
 | 1300 |         self.assertEqual('me', basic['realm'])
 | 
 | 1301 |         self.assertEqual('fred', basic['other'])
 | 
 | 1302 | 
 | 
 | 1303 |     def testParseWWWAuthenticateBasic3(self):
 | 
 | 1304 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '})
 | 
 | 1305 |         basic = res['basic']
 | 
 | 1306 |         self.assertEqual('me', basic['realm'])
 | 
 | 1307 | 
 | 
 | 1308 | 
 | 
 | 1309 |     def testParseWWWAuthenticateDigest(self):
 | 
 | 1310 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 | 
 | 1311 |                 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'})
 | 
 | 1312 |         digest = res['digest']
 | 
 | 1313 |         self.assertEqual('testrealm@host.com', digest['realm'])
 | 
 | 1314 |         self.assertEqual('auth,auth-int', digest['qop'])
 | 
 | 1315 | 
 | 
 | 1316 | 
 | 
 | 1317 |     def testParseWWWAuthenticateMultiple(self):
 | 
 | 1318 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 | 
 | 1319 |                 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '})
 | 
 | 1320 |         digest = res['digest']
 | 
 | 1321 |         self.assertEqual('testrealm@host.com', digest['realm'])
 | 
 | 1322 |         self.assertEqual('auth,auth-int', digest['qop'])
 | 
 | 1323 |         self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
 | 
 | 1324 |         self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
 | 
 | 1325 |         basic = res['basic']
 | 
 | 1326 |         self.assertEqual('me', basic['realm'])
 | 
 | 1327 | 
 | 
 | 1328 |     def testParseWWWAuthenticateMultiple2(self):
 | 
 | 1329 |         # Handle an added comma between challenges, which might get thrown in if the challenges were
 | 
 | 1330 |         # originally sent in separate www-authenticate headers.
 | 
 | 1331 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 | 
 | 1332 |                 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '})
 | 
 | 1333 |         digest = res['digest']
 | 
 | 1334 |         self.assertEqual('testrealm@host.com', digest['realm'])
 | 
 | 1335 |         self.assertEqual('auth,auth-int', digest['qop'])
 | 
 | 1336 |         self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
 | 
 | 1337 |         self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
 | 
 | 1338 |         basic = res['basic']
 | 
 | 1339 |         self.assertEqual('me', basic['realm'])
 | 
 | 1340 | 
 | 
 | 1341 |     def testParseWWWAuthenticateMultiple3(self):
 | 
 | 1342 |         # Handle an added comma between challenges, which might get thrown in if the challenges were
 | 
 | 1343 |         # originally sent in separate www-authenticate headers.
 | 
 | 1344 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 | 
 | 1345 |                 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'})
 | 
 | 1346 |         digest = res['digest']
 | 
 | 1347 |         self.assertEqual('testrealm@host.com', digest['realm'])
 | 
 | 1348 |         self.assertEqual('auth,auth-int', digest['qop'])
 | 
 | 1349 |         self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce'])
 | 
 | 1350 |         self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque'])
 | 
 | 1351 |         basic = res['basic']
 | 
 | 1352 |         self.assertEqual('me', basic['realm'])
 | 
 | 1353 |         wsse = res['wsse']
 | 
 | 1354 |         self.assertEqual('foo', wsse['realm'])
 | 
 | 1355 |         self.assertEqual('UsernameToken', wsse['profile'])
 | 
 | 1356 | 
 | 
 | 1357 |     def testParseWWWAuthenticateMultiple4(self):
 | 
 | 1358 |         res = httplib2._parse_www_authenticate({ 'www-authenticate': 
 | 
 | 1359 |                 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'}) 
 | 
 | 1360 |         digest = res['digest']
 | 
 | 1361 |         self.assertEqual('test-real.m@host.com', digest['realm'])
 | 
 | 1362 |         self.assertEqual('\tauth,auth-int', digest['qop'])
 | 
 | 1363 |         self.assertEqual('(*)&^&$%#', digest['nonce'])
 | 
 | 1364 | 
 | 
 | 1365 |     def testParseWWWAuthenticateMoreQuoteCombos(self):
 | 
 | 1366 |         res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'})
 | 
 | 1367 |         digest = res['digest']
 | 
 | 1368 |         self.assertEqual('myrealm', digest['realm'])
 | 
 | 1369 | 
 | 
 | 1370 |     def testDigestObject(self):
 | 
 | 1371 |         credentials = ('joe', 'password')
 | 
 | 1372 |         host = None
 | 
 | 1373 |         request_uri = '/projects/httplib2/test/digest/' 
 | 
 | 1374 |         headers = {}
 | 
 | 1375 |         response = {
 | 
 | 1376 |             'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"'
 | 
 | 1377 |         }
 | 
 | 1378 |         content = b""
 | 
 | 1379 |         
 | 
 | 1380 |         d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
 | 
 | 1381 |         d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") 
 | 
 | 1382 |         our_request = "Authorization: %s" % headers['Authorization']
 | 
 | 1383 |         working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"'
 | 
 | 1384 |         self.assertEqual(our_request, working_request)
 | 
 | 1385 | 
 | 
 | 1386 | 
 | 
 | 1387 |     def testDigestObjectStale(self):
 | 
 | 1388 |         credentials = ('joe', 'password')
 | 
 | 1389 |         host = None
 | 
 | 1390 |         request_uri = '/projects/httplib2/test/digest/' 
 | 
 | 1391 |         headers = {}
 | 
 | 1392 |         response = httplib2.Response({ })
 | 
 | 1393 |         response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
 | 
 | 1394 |         response.status = 401
 | 
 | 1395 |         content = b""
 | 
 | 1396 |         d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
 | 
 | 1397 |         # Returns true to force a retry
 | 
 | 1398 |         self.assertTrue( d.response(response, content) )
 | 
 | 1399 | 
 | 
 | 1400 |     def testDigestObjectAuthInfo(self):
 | 
 | 1401 |         credentials = ('joe', 'password')
 | 
 | 1402 |         host = None
 | 
 | 1403 |         request_uri = '/projects/httplib2/test/digest/' 
 | 
 | 1404 |         headers = {}
 | 
 | 1405 |         response = httplib2.Response({ })
 | 
 | 1406 |         response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'
 | 
 | 1407 |         response['authentication-info'] = 'nextnonce="fred"'
 | 
 | 1408 |         content = b""
 | 
 | 1409 |         d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None)
 | 
 | 1410 |         # Returns true to force a retry
 | 
 | 1411 |         self.assertFalse( d.response(response, content) )
 | 
 | 1412 |         self.assertEqual('fred', d.challenge['nonce'])
 | 
 | 1413 |         self.assertEqual(1, d.challenge['nc'])
 | 
 | 1414 | 
 | 
 | 1415 |     def testWsseAlgorithm(self):
 | 
 | 1416 |         digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm")
 | 
 | 1417 |         expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY="
 | 
 | 1418 |         self.assertEqual(expected, digest)
 | 
 | 1419 | 
 | 
 | 1420 |     def testEnd2End(self):
 | 
 | 1421 |         # one end to end header
 | 
 | 1422 |         response = {'content-type': 'application/atom+xml', 'te': 'deflate'}
 | 
 | 1423 |         end2end = httplib2._get_end2end_headers(response)
 | 
 | 1424 |         self.assertTrue('content-type' in end2end)
 | 
 | 1425 |         self.assertTrue('te' not in end2end)
 | 
 | 1426 |         self.assertTrue('connection' not in end2end)
 | 
 | 1427 | 
 | 
 | 1428 |         # one end to end header that gets eliminated
 | 
 | 1429 |         response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'}
 | 
 | 1430 |         end2end = httplib2._get_end2end_headers(response)
 | 
 | 1431 |         self.assertTrue('content-type' not in end2end)
 | 
 | 1432 |         self.assertTrue('te' not in end2end)
 | 
 | 1433 |         self.assertTrue('connection' not in end2end)
 | 
 | 1434 | 
 | 
 | 1435 |         # Degenerate case of no headers
 | 
 | 1436 |         response = {}
 | 
 | 1437 |         end2end = httplib2._get_end2end_headers(response)
 | 
 | 1438 |         self.assertEquals(0, len(end2end))
 | 
 | 1439 | 
 | 
 | 1440 |         # Degenerate case of connection referrring to a header not passed in 
 | 
 | 1441 |         response = {'connection': 'content-type'}
 | 
 | 1442 |         end2end = httplib2._get_end2end_headers(response)
 | 
 | 1443 |         self.assertEquals(0, len(end2end))
 | 
 | 1444 | 
 | 
 | 1445 | unittest.main()
 |