| #!/usr/bin/env python | 
 |  | 
 | import unittest | 
 | from test import support | 
 |  | 
 | import errno | 
 | import socket | 
 | import select | 
 | import _thread as thread | 
 | import threading | 
 | import time | 
 | import traceback | 
 | import queue | 
 | import sys | 
 | import os | 
 | import array | 
 | import contextlib | 
 | from weakref import proxy | 
 | import signal | 
 | import math | 
 |  | 
 | def try_address(host, port=0, family=socket.AF_INET): | 
 |     """Try to bind a socket on the given host:port and return True | 
 |     if that has been possible.""" | 
 |     try: | 
 |         sock = socket.socket(family, socket.SOCK_STREAM) | 
 |         sock.bind((host, port)) | 
 |     except (socket.error, socket.gaierror): | 
 |         return False | 
 |     else: | 
 |         sock.close() | 
 |         return True | 
 |  | 
 | HOST = support.HOST | 
 | MSG = 'Michael Gilfix was here\u1234\r\n'.encode('utf8') ## test unicode string and carriage return | 
 | SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6) | 
 |  | 
 | class SocketTCPTest(unittest.TestCase): | 
 |  | 
 |     def setUp(self): | 
 |         self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |         self.port = support.bind_port(self.serv) | 
 |         self.serv.listen(1) | 
 |  | 
 |     def tearDown(self): | 
 |         self.serv.close() | 
 |         self.serv = None | 
 |  | 
 | class SocketUDPTest(unittest.TestCase): | 
 |  | 
 |     def setUp(self): | 
 |         self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | 
 |         self.port = support.bind_port(self.serv) | 
 |  | 
 |     def tearDown(self): | 
 |         self.serv.close() | 
 |         self.serv = None | 
 |  | 
 | class ThreadableTest: | 
 |     """Threadable Test class | 
 |  | 
 |     The ThreadableTest class makes it easy to create a threaded | 
 |     client/server pair from an existing unit test. To create a | 
 |     new threaded class from an existing unit test, use multiple | 
 |     inheritance: | 
 |  | 
 |         class NewClass (OldClass, ThreadableTest): | 
 |             pass | 
 |  | 
 |     This class defines two new fixture functions with obvious | 
 |     purposes for overriding: | 
 |  | 
 |         clientSetUp () | 
 |         clientTearDown () | 
 |  | 
 |     Any new test functions within the class must then define | 
 |     tests in pairs, where the test name is preceeded with a | 
 |     '_' to indicate the client portion of the test. Ex: | 
 |  | 
 |         def testFoo(self): | 
 |             # Server portion | 
 |  | 
 |         def _testFoo(self): | 
 |             # Client portion | 
 |  | 
 |     Any exceptions raised by the clients during their tests | 
 |     are caught and transferred to the main thread to alert | 
 |     the testing framework. | 
 |  | 
 |     Note, the server setup function cannot call any blocking | 
 |     functions that rely on the client thread during setup, | 
 |     unless serverExplicitReady() is called just before | 
 |     the blocking call (such as in setting up a client/server | 
 |     connection and performing the accept() in setUp(). | 
 |     """ | 
 |  | 
 |     def __init__(self): | 
 |         # Swap the true setup function | 
 |         self.__setUp = self.setUp | 
 |         self.__tearDown = self.tearDown | 
 |         self.setUp = self._setUp | 
 |         self.tearDown = self._tearDown | 
 |  | 
 |     def serverExplicitReady(self): | 
 |         """This method allows the server to explicitly indicate that | 
 |         it wants the client thread to proceed. This is useful if the | 
 |         server is about to execute a blocking routine that is | 
 |         dependent upon the client thread during its setup routine.""" | 
 |         self.server_ready.set() | 
 |  | 
 |     def _setUp(self): | 
 |         self.server_ready = threading.Event() | 
 |         self.client_ready = threading.Event() | 
 |         self.done = threading.Event() | 
 |         self.queue = queue.Queue(1) | 
 |  | 
 |         # Do some munging to start the client test. | 
 |         methodname = self.id() | 
 |         i = methodname.rfind('.') | 
 |         methodname = methodname[i+1:] | 
 |         test_method = getattr(self, '_' + methodname) | 
 |         self.client_thread = thread.start_new_thread( | 
 |             self.clientRun, (test_method,)) | 
 |  | 
 |         self.__setUp() | 
 |         if not self.server_ready.is_set(): | 
 |             self.server_ready.set() | 
 |         self.client_ready.wait() | 
 |  | 
 |     def _tearDown(self): | 
 |         self.__tearDown() | 
 |         self.done.wait() | 
 |  | 
 |         if self.queue.qsize(): | 
 |             msg = self.queue.get() | 
 |             self.fail(msg) | 
 |  | 
 |     def clientRun(self, test_func): | 
 |         self.server_ready.wait() | 
 |         self.client_ready.set() | 
 |         self.clientSetUp() | 
 |         if not hasattr(test_func, '__call__'): | 
 |             raise TypeError("test_func must be a callable function") | 
 |         try: | 
 |             test_func() | 
 |         except Exception as strerror: | 
 |             self.queue.put(strerror) | 
 |         self.clientTearDown() | 
 |  | 
 |     def clientSetUp(self): | 
 |         raise NotImplementedError("clientSetUp must be implemented.") | 
 |  | 
 |     def clientTearDown(self): | 
 |         self.done.set() | 
 |         thread.exit() | 
 |  | 
 | class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest): | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         SocketTCPTest.__init__(self, methodName=methodName) | 
 |         ThreadableTest.__init__(self) | 
 |  | 
 |     def clientSetUp(self): | 
 |         self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |  | 
 |     def clientTearDown(self): | 
 |         self.cli.close() | 
 |         self.cli = None | 
 |         ThreadableTest.clientTearDown(self) | 
 |  | 
 | class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest): | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         SocketUDPTest.__init__(self, methodName=methodName) | 
 |         ThreadableTest.__init__(self) | 
 |  | 
 |     def clientSetUp(self): | 
 |         self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | 
 |  | 
 |     def clientTearDown(self): | 
 |         self.cli.close() | 
 |         self.cli = None | 
 |         ThreadableTest.clientTearDown(self) | 
 |  | 
 | class SocketConnectedTest(ThreadedTCPSocketTest): | 
 |     """Socket tests for client-server connection. | 
 |  | 
 |     self.cli_conn is a client socket connected to the server.  The | 
 |     setUp() method guarantees that it is connected to the server. | 
 |     """ | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         ThreadedTCPSocketTest.__init__(self, methodName=methodName) | 
 |  | 
 |     def setUp(self): | 
 |         ThreadedTCPSocketTest.setUp(self) | 
 |         # Indicate explicitly we're ready for the client thread to | 
 |         # proceed and then perform the blocking call to accept | 
 |         self.serverExplicitReady() | 
 |         conn, addr = self.serv.accept() | 
 |         self.cli_conn = conn | 
 |  | 
 |     def tearDown(self): | 
 |         self.cli_conn.close() | 
 |         self.cli_conn = None | 
 |         ThreadedTCPSocketTest.tearDown(self) | 
 |  | 
 |     def clientSetUp(self): | 
 |         ThreadedTCPSocketTest.clientSetUp(self) | 
 |         self.cli.connect((HOST, self.port)) | 
 |         self.serv_conn = self.cli | 
 |  | 
 |     def clientTearDown(self): | 
 |         self.serv_conn.close() | 
 |         self.serv_conn = None | 
 |         ThreadedTCPSocketTest.clientTearDown(self) | 
 |  | 
 | class SocketPairTest(unittest.TestCase, ThreadableTest): | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         unittest.TestCase.__init__(self, methodName=methodName) | 
 |         ThreadableTest.__init__(self) | 
 |  | 
 |     def setUp(self): | 
 |         self.serv, self.cli = socket.socketpair() | 
 |  | 
 |     def tearDown(self): | 
 |         self.serv.close() | 
 |         self.serv = None | 
 |  | 
 |     def clientSetUp(self): | 
 |         pass | 
 |  | 
 |     def clientTearDown(self): | 
 |         self.cli.close() | 
 |         self.cli = None | 
 |         ThreadableTest.clientTearDown(self) | 
 |  | 
 |  | 
 | ####################################################################### | 
 | ## Begin Tests | 
 |  | 
 | class GeneralModuleTests(unittest.TestCase): | 
 |  | 
 |     def test_repr(self): | 
 |         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |         self.assertTrue(repr(s).startswith("<socket.socket object")) | 
 |  | 
 |     def test_weakref(self): | 
 |         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |         p = proxy(s) | 
 |         self.assertEqual(p.fileno(), s.fileno()) | 
 |         s.close() | 
 |         s = None | 
 |         try: | 
 |             p.fileno() | 
 |         except ReferenceError: | 
 |             pass | 
 |         else: | 
 |             self.fail('Socket proxy still exists') | 
 |  | 
 |     def testSocketError(self): | 
 |         # Testing socket module exceptions | 
 |         def raise_error(*args, **kwargs): | 
 |             raise socket.error | 
 |         def raise_herror(*args, **kwargs): | 
 |             raise socket.herror | 
 |         def raise_gaierror(*args, **kwargs): | 
 |             raise socket.gaierror | 
 |         self.assertRaises(socket.error, raise_error, | 
 |                               "Error raising socket exception.") | 
 |         self.assertRaises(socket.error, raise_herror, | 
 |                               "Error raising socket exception.") | 
 |         self.assertRaises(socket.error, raise_gaierror, | 
 |                               "Error raising socket exception.") | 
 |  | 
 |     def testCrucialConstants(self): | 
 |         # Testing for mission critical constants | 
 |         socket.AF_INET | 
 |         socket.SOCK_STREAM | 
 |         socket.SOCK_DGRAM | 
 |         socket.SOCK_RAW | 
 |         socket.SOCK_RDM | 
 |         socket.SOCK_SEQPACKET | 
 |         socket.SOL_SOCKET | 
 |         socket.SO_REUSEADDR | 
 |  | 
 |     def testHostnameRes(self): | 
 |         # Testing hostname resolution mechanisms | 
 |         hostname = socket.gethostname() | 
 |         try: | 
 |             ip = socket.gethostbyname(hostname) | 
 |         except socket.error: | 
 |             # Probably name lookup wasn't set up right; skip this test | 
 |             return | 
 |         self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.") | 
 |         try: | 
 |             hname, aliases, ipaddrs = socket.gethostbyaddr(ip) | 
 |         except socket.error: | 
 |             # Probably a similar problem as above; skip this test | 
 |             return | 
 |         all_host_names = [hostname, hname] + aliases | 
 |         fqhn = socket.getfqdn(ip) | 
 |         if not fqhn in all_host_names: | 
 |             self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names))) | 
 |  | 
 |     def testRefCountGetNameInfo(self): | 
 |         # Testing reference count for getnameinfo | 
 |         if hasattr(sys, "getrefcount"): | 
 |             try: | 
 |                 # On some versions, this loses a reference | 
 |                 orig = sys.getrefcount(__name__) | 
 |                 socket.getnameinfo(__name__,0) | 
 |             except TypeError: | 
 |                 if sys.getrefcount(__name__) != orig: | 
 |                     self.fail("socket.getnameinfo loses a reference") | 
 |  | 
 |     def testInterpreterCrash(self): | 
 |         # Making sure getnameinfo doesn't crash the interpreter | 
 |         try: | 
 |             # On some versions, this crashes the interpreter. | 
 |             socket.getnameinfo(('x', 0, 0, 0), 0) | 
 |         except socket.error: | 
 |             pass | 
 |  | 
 |     def testNtoH(self): | 
 |         # This just checks that htons etc. are their own inverse, | 
 |         # when looking at the lower 16 or 32 bits. | 
 |         sizes = {socket.htonl: 32, socket.ntohl: 32, | 
 |                  socket.htons: 16, socket.ntohs: 16} | 
 |         for func, size in sizes.items(): | 
 |             mask = (1<<size) - 1 | 
 |             for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210): | 
 |                 self.assertEqual(i & mask, func(func(i&mask)) & mask) | 
 |  | 
 |             swapped = func(mask) | 
 |             self.assertEqual(swapped & mask, mask) | 
 |             self.assertRaises(OverflowError, func, 1<<34) | 
 |  | 
 |     def testNtoHErrors(self): | 
 |         good_values = [ 1, 2, 3, 1, 2, 3 ] | 
 |         bad_values = [ -1, -2, -3, -1, -2, -3 ] | 
 |         for k in good_values: | 
 |             socket.ntohl(k) | 
 |             socket.ntohs(k) | 
 |             socket.htonl(k) | 
 |             socket.htons(k) | 
 |         for k in bad_values: | 
 |             self.assertRaises(OverflowError, socket.ntohl, k) | 
 |             self.assertRaises(OverflowError, socket.ntohs, k) | 
 |             self.assertRaises(OverflowError, socket.htonl, k) | 
 |             self.assertRaises(OverflowError, socket.htons, k) | 
 |  | 
 |     def testGetServBy(self): | 
 |         eq = self.assertEqual | 
 |         # Find one service that exists, then check all the related interfaces. | 
 |         # I've ordered this by protocols that have both a tcp and udp | 
 |         # protocol, at least for modern Linuxes. | 
 |         if (sys.platform.startswith('linux') or | 
 |             sys.platform.startswith('freebsd') or | 
 |             sys.platform.startswith('netbsd') or | 
 |             sys.platform == 'darwin'): | 
 |             # avoid the 'echo' service on this platform, as there is an | 
 |             # assumption breaking non-standard port/protocol entry | 
 |             services = ('daytime', 'qotd', 'domain') | 
 |         else: | 
 |             services = ('echo', 'daytime', 'domain') | 
 |         for service in services: | 
 |             try: | 
 |                 port = socket.getservbyname(service, 'tcp') | 
 |                 break | 
 |             except socket.error: | 
 |                 pass | 
 |         else: | 
 |             raise socket.error | 
 |         # Try same call with optional protocol omitted | 
 |         port2 = socket.getservbyname(service) | 
 |         eq(port, port2) | 
 |         # Try udp, but don't barf it it doesn't exist | 
 |         try: | 
 |             udpport = socket.getservbyname(service, 'udp') | 
 |         except socket.error: | 
 |             udpport = None | 
 |         else: | 
 |             eq(udpport, port) | 
 |         # Now make sure the lookup by port returns the same service name | 
 |         eq(socket.getservbyport(port2), service) | 
 |         eq(socket.getservbyport(port, 'tcp'), service) | 
 |         if udpport is not None: | 
 |             eq(socket.getservbyport(udpport, 'udp'), service) | 
 |         # Make sure getservbyport does not accept out of range ports. | 
 |         self.assertRaises(OverflowError, socket.getservbyport, -1) | 
 |         self.assertRaises(OverflowError, socket.getservbyport, 65536) | 
 |  | 
 |     def testDefaultTimeout(self): | 
 |         # Testing default timeout | 
 |         # The default timeout should initially be None | 
 |         self.assertEqual(socket.getdefaulttimeout(), None) | 
 |         s = socket.socket() | 
 |         self.assertEqual(s.gettimeout(), None) | 
 |         s.close() | 
 |  | 
 |         # Set the default timeout to 10, and see if it propagates | 
 |         socket.setdefaulttimeout(10) | 
 |         self.assertEqual(socket.getdefaulttimeout(), 10) | 
 |         s = socket.socket() | 
 |         self.assertEqual(s.gettimeout(), 10) | 
 |         s.close() | 
 |  | 
 |         # Reset the default timeout to None, and see if it propagates | 
 |         socket.setdefaulttimeout(None) | 
 |         self.assertEqual(socket.getdefaulttimeout(), None) | 
 |         s = socket.socket() | 
 |         self.assertEqual(s.gettimeout(), None) | 
 |         s.close() | 
 |  | 
 |         # Check that setting it to an invalid value raises ValueError | 
 |         self.assertRaises(ValueError, socket.setdefaulttimeout, -1) | 
 |  | 
 |         # Check that setting it to an invalid type raises TypeError | 
 |         self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") | 
 |  | 
 |     def testIPv4_inet_aton_fourbytes(self): | 
 |         if not hasattr(socket, 'inet_aton'): | 
 |             return  # No inet_aton, nothing to check | 
 |         # Test that issue1008086 and issue767150 are fixed. | 
 |         # It must return 4 bytes. | 
 |         self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) | 
 |         self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) | 
 |  | 
 |     def testIPv4toString(self): | 
 |         if not hasattr(socket, 'inet_pton'): | 
 |             return # No inet_pton() on this platform | 
 |         from socket import inet_aton as f, inet_pton, AF_INET | 
 |         g = lambda a: inet_pton(AF_INET, a) | 
 |  | 
 |         self.assertEqual(b'\x00\x00\x00\x00', f('0.0.0.0')) | 
 |         self.assertEqual(b'\xff\x00\xff\x00', f('255.0.255.0')) | 
 |         self.assertEqual(b'\xaa\xaa\xaa\xaa', f('170.170.170.170')) | 
 |         self.assertEqual(b'\x01\x02\x03\x04', f('1.2.3.4')) | 
 |         self.assertEqual(b'\xff\xff\xff\xff', f('255.255.255.255')) | 
 |  | 
 |         self.assertEqual(b'\x00\x00\x00\x00', g('0.0.0.0')) | 
 |         self.assertEqual(b'\xff\x00\xff\x00', g('255.0.255.0')) | 
 |         self.assertEqual(b'\xaa\xaa\xaa\xaa', g('170.170.170.170')) | 
 |         self.assertEqual(b'\xff\xff\xff\xff', g('255.255.255.255')) | 
 |  | 
 |     def testIPv6toString(self): | 
 |         if not hasattr(socket, 'inet_pton'): | 
 |             return # No inet_pton() on this platform | 
 |         try: | 
 |             from socket import inet_pton, AF_INET6, has_ipv6 | 
 |             if not has_ipv6: | 
 |                 return | 
 |         except ImportError: | 
 |             return | 
 |         f = lambda a: inet_pton(AF_INET6, a) | 
 |  | 
 |         self.assertEqual(b'\x00' * 16, f('::')) | 
 |         self.assertEqual(b'\x00' * 16, f('0::0')) | 
 |         self.assertEqual(b'\x00\x01' + b'\x00' * 14, f('1::')) | 
 |         self.assertEqual( | 
 |             b'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae', | 
 |             f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae') | 
 |         ) | 
 |  | 
 |     def testStringToIPv4(self): | 
 |         if not hasattr(socket, 'inet_ntop'): | 
 |             return # No inet_ntop() on this platform | 
 |         from socket import inet_ntoa as f, inet_ntop, AF_INET | 
 |         g = lambda a: inet_ntop(AF_INET, a) | 
 |  | 
 |         self.assertEqual('1.0.1.0', f(b'\x01\x00\x01\x00')) | 
 |         self.assertEqual('170.85.170.85', f(b'\xaa\x55\xaa\x55')) | 
 |         self.assertEqual('255.255.255.255', f(b'\xff\xff\xff\xff')) | 
 |         self.assertEqual('1.2.3.4', f(b'\x01\x02\x03\x04')) | 
 |  | 
 |         self.assertEqual('1.0.1.0', g(b'\x01\x00\x01\x00')) | 
 |         self.assertEqual('170.85.170.85', g(b'\xaa\x55\xaa\x55')) | 
 |         self.assertEqual('255.255.255.255', g(b'\xff\xff\xff\xff')) | 
 |  | 
 |     def testStringToIPv6(self): | 
 |         if not hasattr(socket, 'inet_ntop'): | 
 |             return # No inet_ntop() on this platform | 
 |         try: | 
 |             from socket import inet_ntop, AF_INET6, has_ipv6 | 
 |             if not has_ipv6: | 
 |                 return | 
 |         except ImportError: | 
 |             return | 
 |         f = lambda a: inet_ntop(AF_INET6, a) | 
 |  | 
 |         self.assertEqual('::', f(b'\x00' * 16)) | 
 |         self.assertEqual('::1', f(b'\x00' * 15 + b'\x01')) | 
 |         self.assertEqual( | 
 |             'aef:b01:506:1001:ffff:9997:55:170', | 
 |             f(b'\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70') | 
 |         ) | 
 |  | 
 |     # XXX The following don't test module-level functionality... | 
 |  | 
 |     def _get_unused_port(self, bind_address='0.0.0.0'): | 
 |         """Use a temporary socket to elicit an unused ephemeral port. | 
 |  | 
 |         Args: | 
 |             bind_address: Hostname or IP address to search for a port on. | 
 |  | 
 |         Returns: A most likely to be unused port. | 
 |         """ | 
 |         tempsock = socket.socket() | 
 |         tempsock.bind((bind_address, 0)) | 
 |         host, port = tempsock.getsockname() | 
 |         tempsock.close() | 
 |         return port | 
 |  | 
 |     def testSockName(self): | 
 |         # Testing getsockname() | 
 |         port = self._get_unused_port() | 
 |         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |         sock.bind(("0.0.0.0", port)) | 
 |         name = sock.getsockname() | 
 |         # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate | 
 |         # it reasonable to get the host's addr in addition to 0.0.0.0. | 
 |         # At least for eCos.  This is required for the S/390 to pass. | 
 |         try: | 
 |             my_ip_addr = socket.gethostbyname(socket.gethostname()) | 
 |         except socket.error: | 
 |             # Probably name lookup wasn't set up right; skip this test | 
 |             return | 
 |         self.assertTrue(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0]) | 
 |         self.assertEqual(name[1], port) | 
 |  | 
 |     def testGetSockOpt(self): | 
 |         # Testing getsockopt() | 
 |         # We know a socket should start without reuse==0 | 
 |         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 
 |         self.assertFalse(reuse != 0, "initial mode is reuse") | 
 |  | 
 |     def testSetSockOpt(self): | 
 |         # Testing setsockopt() | 
 |         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | 
 |         reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) | 
 |         self.assertFalse(reuse == 0, "failed to set reuse mode") | 
 |  | 
 |     def testSendAfterClose(self): | 
 |         # testing send() after close() with timeout | 
 |         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |         sock.settimeout(1) | 
 |         sock.close() | 
 |         self.assertRaises(socket.error, sock.send, b"spam") | 
 |  | 
 |     def testNewAttributes(self): | 
 |         # testing .family, .type and .protocol | 
 |         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |         self.assertEqual(sock.family, socket.AF_INET) | 
 |         self.assertEqual(sock.type, socket.SOCK_STREAM) | 
 |         self.assertEqual(sock.proto, 0) | 
 |         sock.close() | 
 |  | 
 |     def test_getsockaddrarg(self): | 
 |         host = '0.0.0.0' | 
 |         port = self._get_unused_port(bind_address=host) | 
 |         big_port = port + 65536 | 
 |         neg_port = port - 65536 | 
 |         sock = socket.socket() | 
 |         try: | 
 |             self.assertRaises(OverflowError, sock.bind, (host, big_port)) | 
 |             self.assertRaises(OverflowError, sock.bind, (host, neg_port)) | 
 |             sock.bind((host, port)) | 
 |         finally: | 
 |             sock.close() | 
 |  | 
 |     def test_sock_ioctl(self): | 
 |         if os.name != "nt": | 
 |             return | 
 |         self.assertTrue(hasattr(socket.socket, 'ioctl')) | 
 |         self.assertTrue(hasattr(socket, 'SIO_RCVALL')) | 
 |         self.assertTrue(hasattr(socket, 'RCVALL_ON')) | 
 |         self.assertTrue(hasattr(socket, 'RCVALL_OFF')) | 
 |  | 
 |     def testGetaddrinfo(self): | 
 |         try: | 
 |             socket.getaddrinfo('localhost', 80) | 
 |         except socket.gaierror as err: | 
 |             if err.errno == socket.EAI_SERVICE: | 
 |                 # see http://bugs.python.org/issue1282647 | 
 |                 self.skipTest("buggy libc version") | 
 |             raise | 
 |         # len of every sequence is supposed to be == 5 | 
 |         for info in socket.getaddrinfo(HOST, None): | 
 |             self.assertEqual(len(info), 5) | 
 |         # host can be a domain name, a string representation of an | 
 |         # IPv4/v6 address or None | 
 |         socket.getaddrinfo('localhost', 80) | 
 |         socket.getaddrinfo('127.0.0.1', 80) | 
 |         socket.getaddrinfo(None, 80) | 
 |         if SUPPORTS_IPV6: | 
 |             socket.getaddrinfo('::1', 80) | 
 |         # port can be a string service name such as "http", a numeric | 
 |         # port number or None | 
 |         socket.getaddrinfo(HOST, "http") | 
 |         socket.getaddrinfo(HOST, 80) | 
 |         socket.getaddrinfo(HOST, None) | 
 |         # test family and socktype filters | 
 |         infos = socket.getaddrinfo(HOST, None, socket.AF_INET) | 
 |         for family, _, _, _, _ in infos: | 
 |             self.assertEqual(family, socket.AF_INET) | 
 |         infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM) | 
 |         for _, socktype, _, _, _ in infos: | 
 |             self.assertEqual(socktype, socket.SOCK_STREAM) | 
 |         # test proto and flags arguments | 
 |         socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP) | 
 |         socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE) | 
 |         # a server willing to support both IPv4 and IPv6 will | 
 |         # usually do this | 
 |         socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, | 
 |                            socket.AI_PASSIVE) | 
 |  | 
 |  | 
 |     def check_sendall_interrupted(self, with_timeout): | 
 |         # socketpair() is not stricly required, but it makes things easier. | 
 |         if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'): | 
 |             self.skipTest("signal.alarm and socket.socketpair required for this test") | 
 |         # Our signal handlers clobber the C errno by calling a math function | 
 |         # with an invalid domain value. | 
 |         def ok_handler(*args): | 
 |             self.assertRaises(ValueError, math.acosh, 0) | 
 |         def raising_handler(*args): | 
 |             self.assertRaises(ValueError, math.acosh, 0) | 
 |             1 // 0 | 
 |         c, s = socket.socketpair() | 
 |         old_alarm = signal.signal(signal.SIGALRM, raising_handler) | 
 |         try: | 
 |             if with_timeout: | 
 |                 # Just above the one second minimum for signal.alarm | 
 |                 c.settimeout(1.5) | 
 |             with self.assertRaises(ZeroDivisionError): | 
 |                 signal.alarm(1) | 
 |                 c.sendall(b"x" * (1024**2)) | 
 |             if with_timeout: | 
 |                 signal.signal(signal.SIGALRM, ok_handler) | 
 |                 signal.alarm(1) | 
 |                 self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2)) | 
 |         finally: | 
 |             signal.signal(signal.SIGALRM, old_alarm) | 
 |             c.close() | 
 |             s.close() | 
 |  | 
 |     def test_sendall_interrupted(self): | 
 |         self.check_sendall_interrupted(False) | 
 |  | 
 |     def test_sendall_interrupted_with_timeout(self): | 
 |         self.check_sendall_interrupted(True) | 
 |  | 
 |  | 
 | class BasicTCPTest(SocketConnectedTest): | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         SocketConnectedTest.__init__(self, methodName=methodName) | 
 |  | 
 |     def testRecv(self): | 
 |         # Testing large receive over TCP | 
 |         msg = self.cli_conn.recv(1024) | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testRecv(self): | 
 |         self.serv_conn.send(MSG) | 
 |  | 
 |     def testOverFlowRecv(self): | 
 |         # Testing receive in chunks over TCP | 
 |         seg1 = self.cli_conn.recv(len(MSG) - 3) | 
 |         seg2 = self.cli_conn.recv(1024) | 
 |         msg = seg1 + seg2 | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testOverFlowRecv(self): | 
 |         self.serv_conn.send(MSG) | 
 |  | 
 |     def testRecvFrom(self): | 
 |         # Testing large recvfrom() over TCP | 
 |         msg, addr = self.cli_conn.recvfrom(1024) | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testRecvFrom(self): | 
 |         self.serv_conn.send(MSG) | 
 |  | 
 |     def testOverFlowRecvFrom(self): | 
 |         # Testing recvfrom() in chunks over TCP | 
 |         seg1, addr = self.cli_conn.recvfrom(len(MSG)-3) | 
 |         seg2, addr = self.cli_conn.recvfrom(1024) | 
 |         msg = seg1 + seg2 | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testOverFlowRecvFrom(self): | 
 |         self.serv_conn.send(MSG) | 
 |  | 
 |     def testSendAll(self): | 
 |         # Testing sendall() with a 2048 byte string over TCP | 
 |         msg = b'' | 
 |         while 1: | 
 |             read = self.cli_conn.recv(1024) | 
 |             if not read: | 
 |                 break | 
 |             msg += read | 
 |         self.assertEqual(msg, b'f' * 2048) | 
 |  | 
 |     def _testSendAll(self): | 
 |         big_chunk = b'f' * 2048 | 
 |         self.serv_conn.sendall(big_chunk) | 
 |  | 
 |     def testFromFd(self): | 
 |         # Testing fromfd() | 
 |         if not hasattr(socket, "fromfd"): | 
 |             return # On Windows, this doesn't exist | 
 |         fd = self.cli_conn.fileno() | 
 |         sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) | 
 |         msg = sock.recv(1024) | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testFromFd(self): | 
 |         self.serv_conn.send(MSG) | 
 |  | 
 |     def testDup(self): | 
 |         # Testing dup() | 
 |         sock = self.cli_conn.dup() | 
 |         msg = sock.recv(1024) | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testDup(self): | 
 |         self.serv_conn.send(MSG) | 
 |  | 
 |     def testShutdown(self): | 
 |         # Testing shutdown() | 
 |         msg = self.cli_conn.recv(1024) | 
 |         self.assertEqual(msg, MSG) | 
 |         # wait for _testShutdown to finish: on OS X, when the server | 
 |         # closes the connection the client also becomes disconnected, | 
 |         # and the client's shutdown call will fail. (Issue #4397.) | 
 |         self.done.wait() | 
 |  | 
 |     def _testShutdown(self): | 
 |         self.serv_conn.send(MSG) | 
 |         self.serv_conn.shutdown(2) | 
 |  | 
 | class BasicUDPTest(ThreadedUDPSocketTest): | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         ThreadedUDPSocketTest.__init__(self, methodName=methodName) | 
 |  | 
 |     def testSendtoAndRecv(self): | 
 |         # Testing sendto() and Recv() over UDP | 
 |         msg = self.serv.recv(len(MSG)) | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testSendtoAndRecv(self): | 
 |         self.cli.sendto(MSG, 0, (HOST, self.port)) | 
 |  | 
 |     def testRecvFrom(self): | 
 |         # Testing recvfrom() over UDP | 
 |         msg, addr = self.serv.recvfrom(len(MSG)) | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testRecvFrom(self): | 
 |         self.cli.sendto(MSG, 0, (HOST, self.port)) | 
 |  | 
 |     def testRecvFromNegative(self): | 
 |         # Negative lengths passed to recvfrom should give ValueError. | 
 |         self.assertRaises(ValueError, self.serv.recvfrom, -1) | 
 |  | 
 |     def _testRecvFromNegative(self): | 
 |         self.cli.sendto(MSG, 0, (HOST, self.port)) | 
 |  | 
 | class TCPCloserTest(ThreadedTCPSocketTest): | 
 |  | 
 |     def testClose(self): | 
 |         conn, addr = self.serv.accept() | 
 |         conn.close() | 
 |  | 
 |         sd = self.cli | 
 |         read, write, err = select.select([sd], [], [], 1.0) | 
 |         self.assertEqual(read, [sd]) | 
 |         self.assertEqual(sd.recv(1), b'') | 
 |  | 
 |         # Calling close() many times should be safe. | 
 |         conn.close() | 
 |         conn.close() | 
 |  | 
 |     def _testClose(self): | 
 |         self.cli.connect((HOST, self.port)) | 
 |         time.sleep(1.0) | 
 |  | 
 | class BasicSocketPairTest(SocketPairTest): | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         SocketPairTest.__init__(self, methodName=methodName) | 
 |  | 
 |     def testRecv(self): | 
 |         msg = self.serv.recv(1024) | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testRecv(self): | 
 |         self.cli.send(MSG) | 
 |  | 
 |     def testSend(self): | 
 |         self.serv.send(MSG) | 
 |  | 
 |     def _testSend(self): | 
 |         msg = self.cli.recv(1024) | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 | class NonBlockingTCPTests(ThreadedTCPSocketTest): | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         ThreadedTCPSocketTest.__init__(self, methodName=methodName) | 
 |  | 
 |     def testSetBlocking(self): | 
 |         # Testing whether set blocking works | 
 |         self.serv.setblocking(0) | 
 |         start = time.time() | 
 |         try: | 
 |             self.serv.accept() | 
 |         except socket.error: | 
 |             pass | 
 |         end = time.time() | 
 |         self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.") | 
 |  | 
 |     def _testSetBlocking(self): | 
 |         pass | 
 |  | 
 |     def testAccept(self): | 
 |         # Testing non-blocking accept | 
 |         self.serv.setblocking(0) | 
 |         try: | 
 |             conn, addr = self.serv.accept() | 
 |         except socket.error: | 
 |             pass | 
 |         else: | 
 |             self.fail("Error trying to do non-blocking accept.") | 
 |         read, write, err = select.select([self.serv], [], []) | 
 |         if self.serv in read: | 
 |             conn, addr = self.serv.accept() | 
 |         else: | 
 |             self.fail("Error trying to do accept after select.") | 
 |  | 
 |     def _testAccept(self): | 
 |         time.sleep(0.1) | 
 |         self.cli.connect((HOST, self.port)) | 
 |  | 
 |     def testConnect(self): | 
 |         # Testing non-blocking connect | 
 |         conn, addr = self.serv.accept() | 
 |  | 
 |     def _testConnect(self): | 
 |         self.cli.settimeout(10) | 
 |         self.cli.connect((HOST, self.port)) | 
 |  | 
 |     def testRecv(self): | 
 |         # Testing non-blocking recv | 
 |         conn, addr = self.serv.accept() | 
 |         conn.setblocking(0) | 
 |         try: | 
 |             msg = conn.recv(len(MSG)) | 
 |         except socket.error: | 
 |             pass | 
 |         else: | 
 |             self.fail("Error trying to do non-blocking recv.") | 
 |         read, write, err = select.select([conn], [], []) | 
 |         if conn in read: | 
 |             msg = conn.recv(len(MSG)) | 
 |             self.assertEqual(msg, MSG) | 
 |         else: | 
 |             self.fail("Error during select call to non-blocking socket.") | 
 |  | 
 |     def _testRecv(self): | 
 |         self.cli.connect((HOST, self.port)) | 
 |         time.sleep(0.1) | 
 |         self.cli.send(MSG) | 
 |  | 
 | class FileObjectClassTestCase(SocketConnectedTest): | 
 |     """Unit tests for the object returned by socket.makefile() | 
 |  | 
 |     self.read_file is the io object returned by makefile() on | 
 |     the client connection.  You can read from this file to | 
 |     get output from the server. | 
 |  | 
 |     self.write_file is the io object returned by makefile() on the | 
 |     server connection.  You can write to this file to send output | 
 |     to the client. | 
 |     """ | 
 |  | 
 |     bufsize = -1 # Use default buffer size | 
 |     encoding = 'utf8' | 
 |     errors = 'strict' | 
 |     newline = None | 
 |  | 
 |     read_mode = 'rb' | 
 |     read_msg = MSG | 
 |     write_mode = 'wb' | 
 |     write_msg = MSG | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         SocketConnectedTest.__init__(self, methodName=methodName) | 
 |  | 
 |     def setUp(self): | 
 |         SocketConnectedTest.setUp(self) | 
 |         self.read_file = self.cli_conn.makefile( | 
 |             self.read_mode, self.bufsize, | 
 |             encoding = self.encoding, | 
 |             errors = self.errors, | 
 |             newline = self.newline) | 
 |  | 
 |     def tearDown(self): | 
 |         self.read_file.close() | 
 |         self.assertTrue(self.read_file.closed) | 
 |         self.read_file = None | 
 |         SocketConnectedTest.tearDown(self) | 
 |  | 
 |     def clientSetUp(self): | 
 |         SocketConnectedTest.clientSetUp(self) | 
 |         self.write_file = self.serv_conn.makefile( | 
 |             self.write_mode, self.bufsize, | 
 |             encoding = self.encoding, | 
 |             errors = self.errors, | 
 |             newline = self.newline) | 
 |  | 
 |     def clientTearDown(self): | 
 |         self.write_file.close() | 
 |         self.assertTrue(self.write_file.closed) | 
 |         self.write_file = None | 
 |         SocketConnectedTest.clientTearDown(self) | 
 |  | 
 |     def testSmallRead(self): | 
 |         # Performing small file read test | 
 |         first_seg = self.read_file.read(len(self.read_msg)-3) | 
 |         second_seg = self.read_file.read(3) | 
 |         msg = first_seg + second_seg | 
 |         self.assertEqual(msg, self.read_msg) | 
 |  | 
 |     def _testSmallRead(self): | 
 |         self.write_file.write(self.write_msg) | 
 |         self.write_file.flush() | 
 |  | 
 |     def testFullRead(self): | 
 |         # read until EOF | 
 |         msg = self.read_file.read() | 
 |         self.assertEqual(msg, self.read_msg) | 
 |  | 
 |     def _testFullRead(self): | 
 |         self.write_file.write(self.write_msg) | 
 |         self.write_file.close() | 
 |  | 
 |     def testUnbufferedRead(self): | 
 |         # Performing unbuffered file read test | 
 |         buf = type(self.read_msg)() | 
 |         while 1: | 
 |             char = self.read_file.read(1) | 
 |             if not char: | 
 |                 break | 
 |             buf += char | 
 |         self.assertEqual(buf, self.read_msg) | 
 |  | 
 |     def _testUnbufferedRead(self): | 
 |         self.write_file.write(self.write_msg) | 
 |         self.write_file.flush() | 
 |  | 
 |     def testReadline(self): | 
 |         # Performing file readline test | 
 |         line = self.read_file.readline() | 
 |         self.assertEqual(line, self.read_msg) | 
 |  | 
 |     def _testReadline(self): | 
 |         self.write_file.write(self.write_msg) | 
 |         self.write_file.flush() | 
 |  | 
 |     def testCloseAfterMakefile(self): | 
 |         # The file returned by makefile should keep the socket open. | 
 |         self.cli_conn.close() | 
 |         # read until EOF | 
 |         msg = self.read_file.read() | 
 |         self.assertEqual(msg, self.read_msg) | 
 |  | 
 |     def _testCloseAfterMakefile(self): | 
 |         self.write_file.write(self.write_msg) | 
 |         self.write_file.flush() | 
 |  | 
 |     def testMakefileAfterMakefileClose(self): | 
 |         self.read_file.close() | 
 |         msg = self.cli_conn.recv(len(MSG)) | 
 |         if isinstance(self.read_msg, str): | 
 |             msg = msg.decode() | 
 |         self.assertEqual(msg, self.read_msg) | 
 |  | 
 |     def _testMakefileAfterMakefileClose(self): | 
 |         self.write_file.write(self.write_msg) | 
 |         self.write_file.flush() | 
 |  | 
 |     def testClosedAttr(self): | 
 |         self.assertTrue(not self.read_file.closed) | 
 |  | 
 |     def _testClosedAttr(self): | 
 |         self.assertTrue(not self.write_file.closed) | 
 |  | 
 |     def testAttributes(self): | 
 |         self.assertEqual(self.read_file.mode, self.read_mode) | 
 |         self.assertEqual(self.read_file.name, self.cli_conn.fileno()) | 
 |  | 
 |     def _testAttributes(self): | 
 |         self.assertEqual(self.write_file.mode, self.write_mode) | 
 |         self.assertEqual(self.write_file.name, self.serv_conn.fileno()) | 
 |  | 
 |     def testRealClose(self): | 
 |         self.read_file.close() | 
 |         self.assertRaises(ValueError, self.read_file.fileno) | 
 |         self.cli_conn.close() | 
 |         self.assertRaises(socket.error, self.cli_conn.getsockname) | 
 |  | 
 |     def _testRealClose(self): | 
 |         pass | 
 |  | 
 |  | 
 | class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase): | 
 |  | 
 |     """Repeat the tests from FileObjectClassTestCase with bufsize==0. | 
 |  | 
 |     In this case (and in this case only), it should be possible to | 
 |     create a file object, read a line from it, create another file | 
 |     object, read another line from it, without loss of data in the | 
 |     first file object's buffer.  Note that http.client relies on this | 
 |     when reading multiple requests from the same socket.""" | 
 |  | 
 |     bufsize = 0 # Use unbuffered mode | 
 |  | 
 |     def testUnbufferedReadline(self): | 
 |         # Read a line, create a new file object, read another line with it | 
 |         line = self.read_file.readline() # first line | 
 |         self.assertEqual(line, b"A. " + self.write_msg) # first line | 
 |         self.read_file = self.cli_conn.makefile('rb', 0) | 
 |         line = self.read_file.readline() # second line | 
 |         self.assertEqual(line, b"B. " + self.write_msg) # second line | 
 |  | 
 |     def _testUnbufferedReadline(self): | 
 |         self.write_file.write(b"A. " + self.write_msg) | 
 |         self.write_file.write(b"B. " + self.write_msg) | 
 |         self.write_file.flush() | 
 |  | 
 |     def testMakefileClose(self): | 
 |         # The file returned by makefile should keep the socket open... | 
 |         self.cli_conn.close() | 
 |         msg = self.cli_conn.recv(1024) | 
 |         self.assertEqual(msg, self.read_msg) | 
 |         # ...until the file is itself closed | 
 |         self.read_file.close() | 
 |         self.assertRaises(socket.error, self.cli_conn.recv, 1024) | 
 |  | 
 |     def _testMakefileClose(self): | 
 |         self.write_file.write(self.write_msg) | 
 |         self.write_file.flush() | 
 |  | 
 |     def testMakefileCloseSocketDestroy(self): | 
 |         refcount_before = sys.getrefcount(self.cli_conn) | 
 |         self.read_file.close() | 
 |         refcount_after = sys.getrefcount(self.cli_conn) | 
 |         self.assertEqual(refcount_before - 1, refcount_after) | 
 |  | 
 |     def _testMakefileCloseSocketDestroy(self): | 
 |         pass | 
 |  | 
 |  | 
 | class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase): | 
 |  | 
 |     bufsize = 1 # Default-buffered for reading; line-buffered for writing | 
 |  | 
 |  | 
 | class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase): | 
 |  | 
 |     bufsize = 2 # Exercise the buffering code | 
 |  | 
 |  | 
 | class UnicodeReadFileObjectClassTestCase(FileObjectClassTestCase): | 
 |     """Tests for socket.makefile() in text mode (rather than binary)""" | 
 |  | 
 |     read_mode = 'r' | 
 |     read_msg = MSG.decode('utf8') | 
 |     write_mode = 'wb' | 
 |     write_msg = MSG | 
 |     newline = '' | 
 |  | 
 |  | 
 | class UnicodeWriteFileObjectClassTestCase(FileObjectClassTestCase): | 
 |     """Tests for socket.makefile() in text mode (rather than binary)""" | 
 |  | 
 |     read_mode = 'rb' | 
 |     read_msg = MSG | 
 |     write_mode = 'w' | 
 |     write_msg = MSG.decode('utf8') | 
 |     newline = '' | 
 |  | 
 |  | 
 | class UnicodeReadWriteFileObjectClassTestCase(FileObjectClassTestCase): | 
 |     """Tests for socket.makefile() in text mode (rather than binary)""" | 
 |  | 
 |     read_mode = 'r' | 
 |     read_msg = MSG.decode('utf8') | 
 |     write_mode = 'w' | 
 |     write_msg = MSG.decode('utf8') | 
 |     newline = '' | 
 |  | 
 |  | 
 | class NetworkConnectionTest(object): | 
 |     """Prove network connection.""" | 
 |  | 
 |     def clientSetUp(self): | 
 |         # We're inherited below by BasicTCPTest2, which also inherits | 
 |         # BasicTCPTest, which defines self.port referenced below. | 
 |         self.cli = socket.create_connection((HOST, self.port)) | 
 |         self.serv_conn = self.cli | 
 |  | 
 | class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest): | 
 |     """Tests that NetworkConnection does not break existing TCP functionality. | 
 |     """ | 
 |  | 
 | class NetworkConnectionNoServer(unittest.TestCase): | 
 |  | 
 |     class MockSocket(socket.socket): | 
 |         def connect(self, *args): | 
 |             raise socket.timeout('timed out') | 
 |  | 
 |     @contextlib.contextmanager | 
 |     def mocked_socket_module(self): | 
 |         """Return a socket which times out on connect""" | 
 |         old_socket = socket.socket | 
 |         socket.socket = self.MockSocket | 
 |         try: | 
 |             yield | 
 |         finally: | 
 |             socket.socket = old_socket | 
 |  | 
 |     def test_connect(self): | 
 |         port = support.find_unused_port() | 
 |         cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
 |         try: | 
 |             cli.connect((HOST, port)) | 
 |         except socket.error as err: | 
 |             self.assertEqual(err.errno, errno.ECONNREFUSED) | 
 |         else: | 
 |             self.fail("socket.error not raised") | 
 |  | 
 |     def test_create_connection(self): | 
 |         # Issue #9792: errors raised by create_connection() should have | 
 |         # a proper errno attribute. | 
 |         port = support.find_unused_port() | 
 |         try: | 
 |             socket.create_connection((HOST, port)) | 
 |         except socket.error as err: | 
 |             self.assertEqual(err.errno, errno.ECONNREFUSED) | 
 |         else: | 
 |             self.fail("socket.error not raised") | 
 |  | 
 |     def test_create_connection_timeout(self): | 
 |         # Issue #9792: create_connection() should not recast timeout errors | 
 |         # as generic socket errors. | 
 |         with self.mocked_socket_module(): | 
 |             with self.assertRaises(socket.timeout): | 
 |                 socket.create_connection((HOST, 1234)) | 
 |  | 
 |  | 
 | class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest): | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         SocketTCPTest.__init__(self, methodName=methodName) | 
 |         ThreadableTest.__init__(self) | 
 |  | 
 |     def clientSetUp(self): | 
 |         pass | 
 |  | 
 |     def clientTearDown(self): | 
 |         self.cli.close() | 
 |         self.cli = None | 
 |         ThreadableTest.clientTearDown(self) | 
 |  | 
 |     def _justAccept(self): | 
 |         conn, addr = self.serv.accept() | 
 |  | 
 |     testFamily = _justAccept | 
 |     def _testFamily(self): | 
 |         self.cli = socket.create_connection((HOST, self.port), timeout=30) | 
 |         self.assertEqual(self.cli.family, 2) | 
 |  | 
 |     testTimeoutDefault = _justAccept | 
 |     def _testTimeoutDefault(self): | 
 |         # passing no explicit timeout uses socket's global default | 
 |         self.assertTrue(socket.getdefaulttimeout() is None) | 
 |         socket.setdefaulttimeout(42) | 
 |         try: | 
 |             self.cli = socket.create_connection((HOST, self.port)) | 
 |         finally: | 
 |             socket.setdefaulttimeout(None) | 
 |         self.assertEqual(self.cli.gettimeout(), 42) | 
 |  | 
 |     testTimeoutNone = _justAccept | 
 |     def _testTimeoutNone(self): | 
 |         # None timeout means the same as sock.settimeout(None) | 
 |         self.assertTrue(socket.getdefaulttimeout() is None) | 
 |         socket.setdefaulttimeout(30) | 
 |         try: | 
 |             self.cli = socket.create_connection((HOST, self.port), timeout=None) | 
 |         finally: | 
 |             socket.setdefaulttimeout(None) | 
 |         self.assertEqual(self.cli.gettimeout(), None) | 
 |  | 
 |     testTimeoutValueNamed = _justAccept | 
 |     def _testTimeoutValueNamed(self): | 
 |         self.cli = socket.create_connection((HOST, self.port), timeout=30) | 
 |         self.assertEqual(self.cli.gettimeout(), 30) | 
 |  | 
 |     testTimeoutValueNonamed = _justAccept | 
 |     def _testTimeoutValueNonamed(self): | 
 |         self.cli = socket.create_connection((HOST, self.port), 30) | 
 |         self.assertEqual(self.cli.gettimeout(), 30) | 
 |  | 
 | class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest): | 
 |  | 
 |     def __init__(self, methodName='runTest'): | 
 |         SocketTCPTest.__init__(self, methodName=methodName) | 
 |         ThreadableTest.__init__(self) | 
 |  | 
 |     def clientSetUp(self): | 
 |         pass | 
 |  | 
 |     def clientTearDown(self): | 
 |         self.cli.close() | 
 |         self.cli = None | 
 |         ThreadableTest.clientTearDown(self) | 
 |  | 
 |     def testInsideTimeout(self): | 
 |         conn, addr = self.serv.accept() | 
 |         time.sleep(3) | 
 |         conn.send(b"done!") | 
 |     testOutsideTimeout = testInsideTimeout | 
 |  | 
 |     def _testInsideTimeout(self): | 
 |         self.cli = sock = socket.create_connection((HOST, self.port)) | 
 |         data = sock.recv(5) | 
 |         self.assertEqual(data, b"done!") | 
 |  | 
 |     def _testOutsideTimeout(self): | 
 |         self.cli = sock = socket.create_connection((HOST, self.port), timeout=1) | 
 |         self.assertRaises(socket.timeout, lambda: sock.recv(5)) | 
 |  | 
 |  | 
 | class TCPTimeoutTest(SocketTCPTest): | 
 |  | 
 |     def testTCPTimeout(self): | 
 |         def raise_timeout(*args, **kwargs): | 
 |             self.serv.settimeout(1.0) | 
 |             self.serv.accept() | 
 |         self.assertRaises(socket.timeout, raise_timeout, | 
 |                               "Error generating a timeout exception (TCP)") | 
 |  | 
 |     def testTimeoutZero(self): | 
 |         ok = False | 
 |         try: | 
 |             self.serv.settimeout(0.0) | 
 |             foo = self.serv.accept() | 
 |         except socket.timeout: | 
 |             self.fail("caught timeout instead of error (TCP)") | 
 |         except socket.error: | 
 |             ok = True | 
 |         except: | 
 |             self.fail("caught unexpected exception (TCP)") | 
 |         if not ok: | 
 |             self.fail("accept() returned success when we did not expect it") | 
 |  | 
 |     def testInterruptedTimeout(self): | 
 |         # XXX I don't know how to do this test on MSWindows or any other | 
 |         # plaform that doesn't support signal.alarm() or os.kill(), though | 
 |         # the bug should have existed on all platforms. | 
 |         if not hasattr(signal, "alarm"): | 
 |             return                  # can only test on *nix | 
 |         self.serv.settimeout(5.0)   # must be longer than alarm | 
 |         class Alarm(Exception): | 
 |             pass | 
 |         def alarm_handler(signal, frame): | 
 |             raise Alarm | 
 |         old_alarm = signal.signal(signal.SIGALRM, alarm_handler) | 
 |         try: | 
 |             signal.alarm(2)    # POSIX allows alarm to be up to 1 second early | 
 |             try: | 
 |                 foo = self.serv.accept() | 
 |             except socket.timeout: | 
 |                 self.fail("caught timeout instead of Alarm") | 
 |             except Alarm: | 
 |                 pass | 
 |             except: | 
 |                 self.fail("caught other exception instead of Alarm:" | 
 |                           " %s(%s):\n%s" % | 
 |                           (sys.exc_info()[:2] + (traceback.format_exc(),))) | 
 |             else: | 
 |                 self.fail("nothing caught") | 
 |             finally: | 
 |                 signal.alarm(0)         # shut off alarm | 
 |         except Alarm: | 
 |             self.fail("got Alarm in wrong place") | 
 |         finally: | 
 |             # no alarm can be pending.  Safe to restore old handler. | 
 |             signal.signal(signal.SIGALRM, old_alarm) | 
 |  | 
 | class UDPTimeoutTest(SocketTCPTest): | 
 |  | 
 |     def testUDPTimeout(self): | 
 |         def raise_timeout(*args, **kwargs): | 
 |             self.serv.settimeout(1.0) | 
 |             self.serv.recv(1024) | 
 |         self.assertRaises(socket.timeout, raise_timeout, | 
 |                               "Error generating a timeout exception (UDP)") | 
 |  | 
 |     def testTimeoutZero(self): | 
 |         ok = False | 
 |         try: | 
 |             self.serv.settimeout(0.0) | 
 |             foo = self.serv.recv(1024) | 
 |         except socket.timeout: | 
 |             self.fail("caught timeout instead of error (UDP)") | 
 |         except socket.error: | 
 |             ok = True | 
 |         except: | 
 |             self.fail("caught unexpected exception (UDP)") | 
 |         if not ok: | 
 |             self.fail("recv() returned success when we did not expect it") | 
 |  | 
 | class TestExceptions(unittest.TestCase): | 
 |  | 
 |     def testExceptionTree(self): | 
 |         self.assertTrue(issubclass(socket.error, Exception)) | 
 |         self.assertTrue(issubclass(socket.herror, socket.error)) | 
 |         self.assertTrue(issubclass(socket.gaierror, socket.error)) | 
 |         self.assertTrue(issubclass(socket.timeout, socket.error)) | 
 |  | 
 | class TestLinuxAbstractNamespace(unittest.TestCase): | 
 |  | 
 |     UNIX_PATH_MAX = 108 | 
 |  | 
 |     def testLinuxAbstractNamespace(self): | 
 |         address = b"\x00python-test-hello\x00\xff" | 
 |         s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 
 |         s1.bind(address) | 
 |         s1.listen(1) | 
 |         s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 
 |         s2.connect(s1.getsockname()) | 
 |         s1.accept() | 
 |         self.assertEqual(s1.getsockname(), address) | 
 |         self.assertEqual(s2.getpeername(), address) | 
 |  | 
 |     def testMaxName(self): | 
 |         address = b"\x00" + b"h" * (self.UNIX_PATH_MAX - 1) | 
 |         s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 
 |         s.bind(address) | 
 |         self.assertEqual(s.getsockname(), address) | 
 |  | 
 |     def testNameOverflow(self): | 
 |         address = "\x00" + "h" * self.UNIX_PATH_MAX | 
 |         s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | 
 |         self.assertRaises(socket.error, s.bind, address) | 
 |  | 
 |  | 
 | class BufferIOTest(SocketConnectedTest): | 
 |     """ | 
 |     Test the buffer versions of socket.recv() and socket.send(). | 
 |     """ | 
 |     def __init__(self, methodName='runTest'): | 
 |         SocketConnectedTest.__init__(self, methodName=methodName) | 
 |  | 
 |     def testRecvInto(self): | 
 |         buf = bytearray(1024) | 
 |         nbytes = self.cli_conn.recv_into(buf) | 
 |         self.assertEqual(nbytes, len(MSG)) | 
 |         msg = buf[:len(MSG)] | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testRecvInto(self): | 
 |         buf = bytes(MSG) | 
 |         self.serv_conn.send(buf) | 
 |  | 
 |     def testRecvFromInto(self): | 
 |         buf = bytearray(1024) | 
 |         nbytes, addr = self.cli_conn.recvfrom_into(buf) | 
 |         self.assertEqual(nbytes, len(MSG)) | 
 |         msg = buf[:len(MSG)] | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |     def _testRecvFromInto(self): | 
 |         buf = bytes(MSG) | 
 |         self.serv_conn.send(buf) | 
 |  | 
 |  | 
 | TIPC_STYPE = 2000 | 
 | TIPC_LOWER = 200 | 
 | TIPC_UPPER = 210 | 
 |  | 
 | def isTipcAvailable(): | 
 |     """Check if the TIPC module is loaded | 
 |  | 
 |     The TIPC module is not loaded automatically on Ubuntu and probably | 
 |     other Linux distros. | 
 |     """ | 
 |     if not hasattr(socket, "AF_TIPC"): | 
 |         return False | 
 |     if not os.path.isfile("/proc/modules"): | 
 |         return False | 
 |     with open("/proc/modules") as f: | 
 |         for line in f: | 
 |             if line.startswith("tipc "): | 
 |                 return True | 
 |     if support.verbose: | 
 |         print("TIPC module is not loaded, please 'sudo modprobe tipc'") | 
 |     return False | 
 |  | 
 | class TIPCTest (unittest.TestCase): | 
 |     def testRDM(self): | 
 |         srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) | 
 |         cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) | 
 |  | 
 |         srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | 
 |         srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, | 
 |                 TIPC_LOWER, TIPC_UPPER) | 
 |         srv.bind(srvaddr) | 
 |  | 
 |         sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, | 
 |                 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) | 
 |         cli.sendto(MSG, sendaddr) | 
 |  | 
 |         msg, recvaddr = srv.recvfrom(1024) | 
 |  | 
 |         self.assertEqual(cli.getsockname(), recvaddr) | 
 |         self.assertEqual(msg, MSG) | 
 |  | 
 |  | 
 | class TIPCThreadableTest (unittest.TestCase, ThreadableTest): | 
 |     def __init__(self, methodName = 'runTest'): | 
 |         unittest.TestCase.__init__(self, methodName = methodName) | 
 |         ThreadableTest.__init__(self) | 
 |  | 
 |     def setUp(self): | 
 |         self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) | 
 |         self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | 
 |         srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE, | 
 |                 TIPC_LOWER, TIPC_UPPER) | 
 |         self.srv.bind(srvaddr) | 
 |         self.srv.listen(5) | 
 |         self.serverExplicitReady() | 
 |         self.conn, self.connaddr = self.srv.accept() | 
 |  | 
 |     def clientSetUp(self): | 
 |         # The is a hittable race between serverExplicitReady() and the | 
 |         # accept() call; sleep a little while to avoid it, otherwise | 
 |         # we could get an exception | 
 |         time.sleep(0.1) | 
 |         self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM) | 
 |         addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE, | 
 |                 TIPC_LOWER + int((TIPC_UPPER - TIPC_LOWER) / 2), 0) | 
 |         self.cli.connect(addr) | 
 |         self.cliaddr = self.cli.getsockname() | 
 |  | 
 |     def testStream(self): | 
 |         msg = self.conn.recv(1024) | 
 |         self.assertEqual(msg, MSG) | 
 |         self.assertEqual(self.cliaddr, self.connaddr) | 
 |  | 
 |     def _testStream(self): | 
 |         self.cli.send(MSG) | 
 |         self.cli.close() | 
 |  | 
 |  | 
 | def test_main(): | 
 |     tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest, | 
 |              TestExceptions, BufferIOTest, BasicTCPTest2] | 
 |     if sys.platform != 'mac': | 
 |         tests.extend([ BasicUDPTest, UDPTimeoutTest ]) | 
 |  | 
 |     tests.extend([ | 
 |         NonBlockingTCPTests, | 
 |         FileObjectClassTestCase, | 
 |         UnbufferedFileObjectClassTestCase, | 
 |         LineBufferedFileObjectClassTestCase, | 
 |         SmallBufferedFileObjectClassTestCase, | 
 |         UnicodeReadFileObjectClassTestCase, | 
 |         UnicodeWriteFileObjectClassTestCase, | 
 |         UnicodeReadWriteFileObjectClassTestCase, | 
 |         NetworkConnectionNoServer, | 
 |         NetworkConnectionAttributesTest, | 
 |         NetworkConnectionBehaviourTest, | 
 |     ]) | 
 |     if hasattr(socket, "socketpair"): | 
 |         tests.append(BasicSocketPairTest) | 
 |     if sys.platform == 'linux2': | 
 |         tests.append(TestLinuxAbstractNamespace) | 
 |     if isTipcAvailable(): | 
 |         tests.append(TIPCTest) | 
 |         tests.append(TIPCThreadableTest) | 
 |  | 
 |     thread_info = support.threading_setup() | 
 |     support.run_unittest(*tests) | 
 |     support.threading_cleanup(*thread_info) | 
 |  | 
 | if __name__ == "__main__": | 
 |     test_main() |