| from test import test_support as support | 
 | # If we end up with a significant number of tests that don't require | 
 | # threading, this test module should be split.  Right now we skip | 
 | # them all if we don't have threading. | 
 | threading = support.import_module('threading') | 
 |  | 
 | from contextlib import contextmanager | 
 | import imaplib | 
 | import os.path | 
 | import SocketServer | 
 | import time | 
 |  | 
 | from test_support import reap_threads, verbose, transient_internet | 
 | import unittest | 
 |  | 
 | try: | 
 |     import ssl | 
 | except ImportError: | 
 |     ssl = None | 
 |  | 
 | CERTFILE = None | 
 |  | 
 |  | 
 | class TestImaplib(unittest.TestCase): | 
 |  | 
 |     def test_that_Time2Internaldate_returns_a_result(self): | 
 |         # We can check only that it successfully produces a result, | 
 |         # not the correctness of the result itself, since the result | 
 |         # depends on the timezone the machine is in. | 
 |         timevalues = [2000000000, 2000000000.0, time.localtime(2000000000), | 
 |                       '"18-May-2033 05:33:20 +0200"'] | 
 |  | 
 |         for t in timevalues: | 
 |             imaplib.Time2Internaldate(t) | 
 |  | 
 |  | 
 | if ssl: | 
 |  | 
 |     class SecureTCPServer(SocketServer.TCPServer): | 
 |  | 
 |         def get_request(self): | 
 |             newsocket, fromaddr = self.socket.accept() | 
 |             connstream = ssl.wrap_socket(newsocket, | 
 |                                          server_side=True, | 
 |                                          certfile=CERTFILE) | 
 |             return connstream, fromaddr | 
 |  | 
 |     IMAP4_SSL = imaplib.IMAP4_SSL | 
 |  | 
 | else: | 
 |  | 
 |     class SecureTCPServer: | 
 |         pass | 
 |  | 
 |     IMAP4_SSL = None | 
 |  | 
 |  | 
 | class SimpleIMAPHandler(SocketServer.StreamRequestHandler): | 
 |  | 
 |     timeout = 1 | 
 |  | 
 |     def _send(self, message): | 
 |         if verbose: print "SENT:", message.strip() | 
 |         self.wfile.write(message) | 
 |  | 
 |     def handle(self): | 
 |         # Send a welcome message. | 
 |         self._send('* OK IMAP4rev1\r\n') | 
 |         while 1: | 
 |             # Gather up input until we receive a line terminator or we timeout. | 
 |             # Accumulate read(1) because it's simpler to handle the differences | 
 |             # between naked sockets and SSL sockets. | 
 |             line = '' | 
 |             while 1: | 
 |                 try: | 
 |                     part = self.rfile.read(1) | 
 |                     if part == '': | 
 |                         # Naked sockets return empty strings.. | 
 |                         return | 
 |                     line += part | 
 |                 except IOError: | 
 |                     # ..but SSLSockets raise exceptions. | 
 |                     return | 
 |                 if line.endswith('\r\n'): | 
 |                     break | 
 |  | 
 |             if verbose: print 'GOT:', line.strip() | 
 |             splitline = line.split() | 
 |             tag = splitline[0] | 
 |             cmd = splitline[1] | 
 |             args = splitline[2:] | 
 |  | 
 |             if hasattr(self, 'cmd_%s' % (cmd,)): | 
 |                 getattr(self, 'cmd_%s' % (cmd,))(tag, args) | 
 |             else: | 
 |                 self._send('%s BAD %s unknown\r\n' % (tag, cmd)) | 
 |  | 
 |     def cmd_CAPABILITY(self, tag, args): | 
 |         self._send('* CAPABILITY IMAP4rev1\r\n') | 
 |         self._send('%s OK CAPABILITY completed\r\n' % (tag,)) | 
 |  | 
 |  | 
 | class BaseThreadedNetworkedTests(unittest.TestCase): | 
 |  | 
 |     def make_server(self, addr, hdlr): | 
 |  | 
 |         class MyServer(self.server_class): | 
 |             def handle_error(self, request, client_address): | 
 |                 self.close_request(request) | 
 |                 self.server_close() | 
 |                 raise | 
 |  | 
 |         if verbose: print "creating server" | 
 |         server = MyServer(addr, hdlr) | 
 |         self.assertEqual(server.server_address, server.socket.getsockname()) | 
 |  | 
 |         if verbose: | 
 |             print "server created" | 
 |             print "ADDR =", addr | 
 |             print "CLASS =", self.server_class | 
 |             print "HDLR =", server.RequestHandlerClass | 
 |  | 
 |         t = threading.Thread( | 
 |             name='%s serving' % self.server_class, | 
 |             target=server.serve_forever, | 
 |             # Short poll interval to make the test finish quickly. | 
 |             # Time between requests is short enough that we won't wake | 
 |             # up spuriously too many times. | 
 |             kwargs={'poll_interval':0.01}) | 
 |         t.daemon = True  # In case this function raises. | 
 |         t.start() | 
 |         if verbose: print "server running" | 
 |         return server, t | 
 |  | 
 |     def reap_server(self, server, thread): | 
 |         if verbose: print "waiting for server" | 
 |         server.shutdown() | 
 |         thread.join() | 
 |         if verbose: print "done" | 
 |  | 
 |     @contextmanager | 
 |     def reaped_server(self, hdlr): | 
 |         server, thread = self.make_server((support.HOST, 0), hdlr) | 
 |         try: | 
 |             yield server | 
 |         finally: | 
 |             self.reap_server(server, thread) | 
 |  | 
 |     @reap_threads | 
 |     def test_connect(self): | 
 |         with self.reaped_server(SimpleIMAPHandler) as server: | 
 |             client = self.imap_class(*server.server_address) | 
 |             client.shutdown() | 
 |  | 
 |     @reap_threads | 
 |     def test_issue5949(self): | 
 |  | 
 |         class EOFHandler(SocketServer.StreamRequestHandler): | 
 |             def handle(self): | 
 |                 # EOF without sending a complete welcome message. | 
 |                 self.wfile.write('* OK') | 
 |  | 
 |         with self.reaped_server(EOFHandler) as server: | 
 |             self.assertRaises(imaplib.IMAP4.abort, | 
 |                               self.imap_class, *server.server_address) | 
 |  | 
 |  | 
 |     def test_linetoolong(self): | 
 |         class TooLongHandler(SimpleIMAPHandler): | 
 |             def handle(self): | 
 |                 # Send a very long response line | 
 |                 self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n') | 
 |  | 
 |         with self.reaped_server(TooLongHandler) as server: | 
 |             self.assertRaises(imaplib.IMAP4.error, | 
 |                               self.imap_class, *server.server_address) | 
 |  | 
 | class ThreadedNetworkedTests(BaseThreadedNetworkedTests): | 
 |  | 
 |     server_class = SocketServer.TCPServer | 
 |     imap_class = imaplib.IMAP4 | 
 |  | 
 |  | 
 | @unittest.skipUnless(ssl, "SSL not available") | 
 | class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): | 
 |  | 
 |     server_class = SecureTCPServer | 
 |     imap_class = IMAP4_SSL | 
 |  | 
 |     def test_linetoolong(self): | 
 |         raise unittest.SkipTest("test is not reliable on 2.7; see issue 20118") | 
 |  | 
 |  | 
 | class RemoteIMAPTest(unittest.TestCase): | 
 |     host = 'cyrus.andrew.cmu.edu' | 
 |     port = 143 | 
 |     username = 'anonymous' | 
 |     password = 'pass' | 
 |     imap_class = imaplib.IMAP4 | 
 |  | 
 |     def setUp(self): | 
 |         with transient_internet(self.host): | 
 |             self.server = self.imap_class(self.host, self.port) | 
 |  | 
 |     def tearDown(self): | 
 |         if self.server is not None: | 
 |             self.server.logout() | 
 |  | 
 |     def test_logincapa(self): | 
 |         self.assertTrue('LOGINDISABLED' in self.server.capabilities) | 
 |  | 
 |     def test_anonlogin(self): | 
 |         self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities) | 
 |         rs = self.server.login(self.username, self.password) | 
 |         self.assertEqual(rs[0], 'OK') | 
 |  | 
 |     def test_logout(self): | 
 |         rs = self.server.logout() | 
 |         self.server = None | 
 |         self.assertEqual(rs[0], 'BYE') | 
 |  | 
 |  | 
 | @unittest.skipUnless(ssl, "SSL not available") | 
 | class RemoteIMAP_SSLTest(RemoteIMAPTest): | 
 |     port = 993 | 
 |     imap_class = IMAP4_SSL | 
 |  | 
 |     def test_logincapa(self): | 
 |         self.assertFalse('LOGINDISABLED' in self.server.capabilities) | 
 |         self.assertTrue('AUTH=PLAIN' in self.server.capabilities) | 
 |  | 
 |  | 
 | def test_main(): | 
 |     tests = [TestImaplib] | 
 |  | 
 |     if support.is_resource_enabled('network'): | 
 |         if ssl: | 
 |             global CERTFILE | 
 |             CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, | 
 |                                     "keycert.pem") | 
 |             if not os.path.exists(CERTFILE): | 
 |                 raise support.TestFailed("Can't read certificate files!") | 
 |         tests.extend([ | 
 |             ThreadedNetworkedTests, ThreadedNetworkedTestsSSL, | 
 |             RemoteIMAPTest, RemoteIMAP_SSLTest, | 
 |         ]) | 
 |  | 
 |     support.run_unittest(*tests) | 
 |  | 
 |  | 
 | if __name__ == "__main__": | 
 |     support.use_resources = ['network'] | 
 |     test_main() |