blob: ded88286ec2a9db064e274bf50a2860e420e08a6 [file] [log] [blame]
R. David Murray93321f32009-12-09 15:15:31 +00001from test import test_support as support
2# If we end up with a significant number of tests that don't require
3# threading, this test module should be split. Right now we skip
4# them all if we don't have threading.
5threading = support.import_module('threading')
6
7from contextlib import contextmanager
Martin v. Löwisea752fb2002-01-05 11:31:49 +00008import imaplib
R. David Murray93321f32009-12-09 15:15:31 +00009import os.path
10import select
11import socket
12import SocketServer
13import ssl
14import sys
Tim Peters108b7912002-07-31 16:42:33 +000015import time
Martin v. Löwisea752fb2002-01-05 11:31:49 +000016
R. David Murray93321f32009-12-09 15:15:31 +000017from test_support import reap_threads, verbose
Jerry Seutter14f0bc72008-03-26 05:03:03 +000018import unittest
Piers Lauderf0a70f62002-06-17 07:06:24 +000019
R. David Murray93321f32009-12-09 15:15:31 +000020CERTFILE = None
21
Martin v. Löwisea752fb2002-01-05 11:31:49 +000022
Jerry Seutter14f0bc72008-03-26 05:03:03 +000023class TestImaplib(unittest.TestCase):
R. David Murray93321f32009-12-09 15:15:31 +000024
Jerry Seutter14f0bc72008-03-26 05:03:03 +000025 def test_that_Time2Internaldate_returns_a_result(self):
26 # We can check only that it successfully produces a result,
27 # not the correctness of the result itself, since the result
28 # depends on the timezone the machine is in.
29 timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
30 '"18-May-2033 05:33:20 +0200"']
31
32 for t in timevalues:
33 imaplib.Time2Internaldate(t)
34
35
R. David Murray93321f32009-12-09 15:15:31 +000036class SecureTCPServer(SocketServer.TCPServer):
37
38 def get_request(self):
39 newsocket, fromaddr = self.socket.accept()
40 connstream = ssl.wrap_socket(newsocket,
41 server_side=True,
42 certfile=CERTFILE)
43 return connstream, fromaddr
44
45
46class SimpleIMAPHandler(SocketServer.StreamRequestHandler):
47
48 timeout = 1
49
50 def _send(self, message):
51 if verbose: print "SENT:", message.strip()
52 self.wfile.write(message)
53
54 def handle(self):
55 # Send a welcome message.
56 self._send('* OK IMAP4rev1\r\n')
57 while 1:
58 # Gather up input until we receive a line terminator or we timeout.
59 # Accumulate read(1) because it's simpler to handle the differences
60 # between naked sockets and SSL sockets.
61 line = ''
62 while 1:
63 try:
64 part = self.rfile.read(1)
65 if part == '':
66 # Naked sockets return empty strings..
67 return
68 line += part
69 except IOError:
70 # ..but SSLSockets throw exceptions.
71 return
72 if line.endswith('\r\n'):
73 break
74
75 if verbose: print 'GOT:', line.strip()
76 splitline = line.split()
77 tag = splitline[0]
78 cmd = splitline[1]
79 args = splitline[2:]
80
81 if hasattr(self, 'cmd_%s' % (cmd,)):
82 getattr(self, 'cmd_%s' % (cmd,))(tag, args)
83 else:
84 self._send('%s BAD %s unknown\r\n' % (tag, cmd))
85
86 def cmd_CAPABILITY(self, tag, args):
87 self._send('* CAPABILITY IMAP4rev1\r\n')
88 self._send('%s OK CAPABILITY completed\r\n' % (tag,))
89
90
91class BaseThreadedNetworkedTests(unittest.TestCase):
92
93 def make_server(self, addr, hdlr):
94
95 class MyServer(self.server_class):
96 def handle_error(self, request, client_address):
97 self.close_request(request)
98 self.server_close()
99 raise
100
101 if verbose: print "creating server"
102 server = MyServer(addr, hdlr)
103 self.assertEquals(server.server_address, server.socket.getsockname())
104
105 if verbose:
106 print "server created"
107 print "ADDR =", addr
108 print "CLASS =", self.server_class
109 print "HDLR =", server.RequestHandlerClass
110
111 t = threading.Thread(
112 name='%s serving' % self.server_class,
113 target=server.serve_forever,
114 # Short poll interval to make the test finish quickly.
115 # Time between requests is short enough that we won't wake
116 # up spuriously too many times.
117 kwargs={'poll_interval':0.01})
118 t.daemon = True # In case this function raises.
119 t.start()
120 if verbose: print "server running"
121 return server, t
122
123 def reap_server(self, server, thread):
124 if verbose: print "waiting for server"
125 server.shutdown()
126 thread.join()
127 if verbose: print "done"
128
129 @contextmanager
130 def reaped_server(self, hdlr):
131 server, thread = self.make_server((support.HOST, 0), hdlr)
132 try:
133 yield server
134 finally:
135 self.reap_server(server, thread)
136
137 @reap_threads
138 def test_connect(self):
139 with self.reaped_server(SimpleIMAPHandler) as server:
140 client = self.imap_class(*server.server_address)
141 client.shutdown()
142
143 @reap_threads
144 def test_issue5949(self):
145
146 class EOFHandler(SocketServer.StreamRequestHandler):
147 def handle(self):
148 # EOF without sending a complete welcome message.
149 self.wfile.write('* OK')
150
151 with self.reaped_server(EOFHandler) as server:
152 self.assertRaises(imaplib.IMAP4.abort,
153 self.imap_class, *server.server_address)
154
155
156class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
157
158 server_class = SocketServer.TCPServer
159 imap_class = imaplib.IMAP4
160
161
162class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
163
164 server_class = SecureTCPServer
165 imap_class = imaplib.IMAP4_SSL
166
167
Jerry Seutter14f0bc72008-03-26 05:03:03 +0000168def test_main():
R. David Murray93321f32009-12-09 15:15:31 +0000169
170 tests = [TestImaplib]
171
172 if support.is_resource_enabled('network'):
173 global CERTFILE
174 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
175 "keycert.pem")
176 if not os.path.exists(CERTFILE):
177 raise support.TestFailed("Can't read certificate files!")
178 tests.extend([ThreadedNetworkedTests, ThreadedNetworkedTestsSSL])
179
180 support.run_unittest(*tests)
Jerry Seutter14f0bc72008-03-26 05:03:03 +0000181
182
183if __name__ == "__main__":
R. David Murray93321f32009-12-09 15:15:31 +0000184 support.use_resources = ['network']
185 test_main()