| import httplib | 
 | import StringIO | 
 | import sys | 
 |  | 
 | from unittest import TestCase | 
 |  | 
 | from test import test_support | 
 |  | 
 | class FakeSocket: | 
 |     def __init__(self, text, fileclass=StringIO.StringIO): | 
 |         self.text = text | 
 |         self.fileclass = fileclass | 
 |  | 
 |     def sendall(self, data): | 
 |         self.data = data | 
 |  | 
 |     def makefile(self, mode, bufsize=None): | 
 |         if mode != 'r' and mode != 'rb': | 
 |             raise httplib.UnimplementedFileMode() | 
 |         return self.fileclass(self.text) | 
 |  | 
 | class NoEOFStringIO(StringIO.StringIO): | 
 |     """Like StringIO, but raises AssertionError on EOF. | 
 |  | 
 |     This is used below to test that httplib doesn't try to read | 
 |     more from the underlying file than it should. | 
 |     """ | 
 |     def read(self, n=-1): | 
 |         data = StringIO.StringIO.read(self, n) | 
 |         if data == '': | 
 |             raise AssertionError('caller tried to read past EOF') | 
 |         return data | 
 |  | 
 |     def readline(self, length=None): | 
 |         data = StringIO.StringIO.readline(self, length) | 
 |         if data == '': | 
 |             raise AssertionError('caller tried to read past EOF') | 
 |         return data | 
 |  | 
 |  | 
 | class HeaderTests(TestCase): | 
 |     def test_auto_headers(self): | 
 |         # Some headers are added automatically, but should not be added by | 
 |         # .request() if they are explicitly set. | 
 |  | 
 |         import httplib | 
 |  | 
 |         class HeaderCountingBuffer(list): | 
 |             def __init__(self): | 
 |                 self.count = {} | 
 |             def append(self, item): | 
 |                 kv = item.split(':') | 
 |                 if len(kv) > 1: | 
 |                     # item is a 'Key: Value' header string | 
 |                     lcKey = kv[0].lower() | 
 |                     self.count.setdefault(lcKey, 0) | 
 |                     self.count[lcKey] += 1 | 
 |                 list.append(self, item) | 
 |  | 
 |         for explicit_header in True, False: | 
 |             for header in 'Content-length', 'Host', 'Accept-encoding': | 
 |                 conn = httplib.HTTPConnection('example.com') | 
 |                 conn.sock = FakeSocket('blahblahblah') | 
 |                 conn._buffer = HeaderCountingBuffer() | 
 |  | 
 |                 body = 'spamspamspam' | 
 |                 headers = {} | 
 |                 if explicit_header: | 
 |                     headers[header] = str(len(body)) | 
 |                 conn.request('POST', '/', body, headers) | 
 |                 self.assertEqual(conn._buffer.count[header.lower()], 1) | 
 |  | 
 | # Collect output to a buffer so that we don't have to cope with line-ending | 
 | # issues across platforms.  Specifically, the headers will have \r\n pairs | 
 | # and some platforms will strip them from the output file. | 
 |  | 
 | def test(): | 
 |     buf = StringIO.StringIO() | 
 |     _stdout = sys.stdout | 
 |     try: | 
 |         sys.stdout = buf | 
 |         _test() | 
 |     finally: | 
 |         sys.stdout = _stdout | 
 |  | 
 |     # print individual lines with endings stripped | 
 |     s = buf.getvalue() | 
 |     for line in s.split("\n"): | 
 |         print line.strip() | 
 |  | 
 | def _test(): | 
 |     # Test HTTP status lines | 
 |  | 
 |     body = "HTTP/1.1 200 Ok\r\n\r\nText" | 
 |     sock = FakeSocket(body) | 
 |     resp = httplib.HTTPResponse(sock, 1) | 
 |     resp.begin() | 
 |     print resp.read() | 
 |     resp.close() | 
 |  | 
 |     body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText" | 
 |     sock = FakeSocket(body) | 
 |     resp = httplib.HTTPResponse(sock, 1) | 
 |     try: | 
 |         resp.begin() | 
 |     except httplib.BadStatusLine: | 
 |         print "BadStatusLine raised as expected" | 
 |     else: | 
 |         print "Expect BadStatusLine" | 
 |  | 
 |     # Check invalid host_port | 
 |  | 
 |     for hp in ("www.python.org:abc", "www.python.org:"): | 
 |         try: | 
 |             h = httplib.HTTP(hp) | 
 |         except httplib.InvalidURL: | 
 |             print "InvalidURL raised as expected" | 
 |         else: | 
 |             print "Expect InvalidURL" | 
 |  | 
 |     for hp,h,p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b", 8000), | 
 |                    ("www.python.org:80", "www.python.org", 80), | 
 |                    ("www.python.org", "www.python.org", 80), | 
 |                    ("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)): | 
 |         try: | 
 |             http = httplib.HTTP(hp) | 
 |         except httplib.InvalidURL: | 
 |             print "InvalidURL raised erroneously" | 
 |         c = http._conn | 
 |         if h != c.host: raise AssertionError, ("Host incorrectly parsed", h, c.host) | 
 |         if p != c.port: raise AssertionError, ("Port incorrectly parsed", p, c.host) | 
 |  | 
 |     # test response with multiple message headers with the same field name. | 
 |     text = ('HTTP/1.1 200 OK\r\n' | 
 |             'Set-Cookie: Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"\r\n' | 
 |             'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";' | 
 |             ' Path="/acme"\r\n' | 
 |             '\r\n' | 
 |             'No body\r\n') | 
 |     hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"' | 
 |            ', ' | 
 |            'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"') | 
 |     s = FakeSocket(text) | 
 |     r = httplib.HTTPResponse(s, 1) | 
 |     r.begin() | 
 |     cookies = r.getheader("Set-Cookie") | 
 |     if cookies != hdr: | 
 |         raise AssertionError, "multiple headers not combined properly" | 
 |  | 
 |     # Test that the library doesn't attempt to read any data | 
 |     # from a HEAD request.  (Tickles SF bug #622042.) | 
 |     sock = FakeSocket( | 
 |         'HTTP/1.1 200 OK\r\n' | 
 |         'Content-Length: 14432\r\n' | 
 |         '\r\n', | 
 |         NoEOFStringIO) | 
 |     resp = httplib.HTTPResponse(sock, 1, method="HEAD") | 
 |     resp.begin() | 
 |     if resp.read() != "": | 
 |         raise AssertionError, "Did not expect response from HEAD request" | 
 |     resp.close() | 
 |  | 
 |  | 
 | def test_main(verbose=None): | 
 |     tests = [HeaderTests,] | 
 |     test_support.run_unittest(*tests) | 
 |  | 
 | test() |