blob: c4c7eccb005e619a3c82fad17fa14bd443c0e9d6 [file] [log] [blame]
R. David Murraye8dc2582009-12-10 02:08:06 +00001from test import support
2# If we end up with a significant number of tests that don't require
3# threading, this test module should be split. Right now we skip
4# them all if we don't have threading.
5threading = support.import_module('threading')
6
7from contextlib import contextmanager
Martin v. Löwisea752fb2002-01-05 11:31:49 +00008import imaplib
R. David Murraye8dc2582009-12-10 02:08:06 +00009import os.path
R. David Murraye8dc2582009-12-10 02:08:06 +000010import socketserver
Tim Peters108b7912002-07-31 16:42:33 +000011import time
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000012import calendar
Martin v. Löwisea752fb2002-01-05 11:31:49 +000013
Antoine Pitroub1436f12010-11-09 22:55:55 +000014from test.support import reap_threads, verbose, transient_internet
Christian Heimesf6cd9672008-03-26 13:45:42 +000015import unittest
Piers Lauderf0a70f62002-06-17 07:06:24 +000016
R. David Murraye8dc2582009-12-10 02:08:06 +000017try:
18 import ssl
19except ImportError:
20 ssl = None
21
22CERTFILE = None
23
Martin v. Löwisea752fb2002-01-05 11:31:49 +000024
Christian Heimesf6cd9672008-03-26 13:45:42 +000025class TestImaplib(unittest.TestCase):
R. David Murraye8dc2582009-12-10 02:08:06 +000026
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000027 def test_Internaldate2tuple(self):
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000028 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000029 tt = imaplib.Internaldate2tuple(
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000030 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
31 self.assertEqual(time.mktime(tt), t0)
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000032 tt = imaplib.Internaldate2tuple(
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000033 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
34 self.assertEqual(time.mktime(tt), t0)
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000035 tt = imaplib.Internaldate2tuple(
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000036 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
37 self.assertEqual(time.mktime(tt), t0)
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000038
Christian Heimesf6cd9672008-03-26 13:45:42 +000039 def test_that_Time2Internaldate_returns_a_result(self):
40 # We can check only that it successfully produces a result,
41 # not the correctness of the result itself, since the result
42 # depends on the timezone the machine is in.
43 timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
44 '"18-May-2033 05:33:20 +0200"']
45
46 for t in timevalues:
47 imaplib.Time2Internaldate(t)
48
49
R. David Murraye8dc2582009-12-10 02:08:06 +000050if ssl:
51
52 class SecureTCPServer(socketserver.TCPServer):
53
54 def get_request(self):
55 newsocket, fromaddr = self.socket.accept()
56 connstream = ssl.wrap_socket(newsocket,
57 server_side=True,
58 certfile=CERTFILE)
59 return connstream, fromaddr
60
61 IMAP4_SSL = imaplib.IMAP4_SSL
62
63else:
64
65 class SecureTCPServer:
66 pass
67
68 IMAP4_SSL = None
69
70
71class SimpleIMAPHandler(socketserver.StreamRequestHandler):
72
73 timeout = 1
74
75 def _send(self, message):
Antoine Pitrou36c0dbc2010-11-16 17:49:46 +000076 if verbose: print("SENT: %r" % message.strip())
R. David Murraye8dc2582009-12-10 02:08:06 +000077 self.wfile.write(message)
78
79 def handle(self):
80 # Send a welcome message.
81 self._send(b'* OK IMAP4rev1\r\n')
82 while 1:
83 # Gather up input until we receive a line terminator or we timeout.
84 # Accumulate read(1) because it's simpler to handle the differences
85 # between naked sockets and SSL sockets.
86 line = b''
87 while 1:
88 try:
89 part = self.rfile.read(1)
90 if part == b'':
91 # Naked sockets return empty strings..
92 return
93 line += part
94 except IOError:
95 # ..but SSLSockets throw exceptions.
96 return
97 if line.endswith(b'\r\n'):
98 break
99
Antoine Pitrou36c0dbc2010-11-16 17:49:46 +0000100 if verbose: print('GOT: %r' % line.strip())
R. David Murraye8dc2582009-12-10 02:08:06 +0000101 splitline = line.split()
102 tag = splitline[0].decode('ASCII')
103 cmd = splitline[1].decode('ASCII')
104 args = splitline[2:]
105
106 if hasattr(self, 'cmd_'+cmd):
107 getattr(self, 'cmd_'+cmd)(tag, args)
108 else:
109 self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
110
111 def cmd_CAPABILITY(self, tag, args):
112 self._send(b'* CAPABILITY IMAP4rev1\r\n')
113 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
114
115
116class BaseThreadedNetworkedTests(unittest.TestCase):
117
118 def make_server(self, addr, hdlr):
119
120 class MyServer(self.server_class):
121 def handle_error(self, request, client_address):
122 self.close_request(request)
123 self.server_close()
124 raise
125
126 if verbose: print("creating server")
127 server = MyServer(addr, hdlr)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000128 self.assertEqual(server.server_address, server.socket.getsockname())
R. David Murraye8dc2582009-12-10 02:08:06 +0000129
130 if verbose:
131 print("server created")
132 print("ADDR =", addr)
133 print("CLASS =", self.server_class)
134 print("HDLR =", server.RequestHandlerClass)
135
136 t = threading.Thread(
137 name='%s serving' % self.server_class,
138 target=server.serve_forever,
139 # Short poll interval to make the test finish quickly.
140 # Time between requests is short enough that we won't wake
141 # up spuriously too many times.
142 kwargs={'poll_interval':0.01})
143 t.daemon = True # In case this function raises.
144 t.start()
145 if verbose: print("server running")
146 return server, t
147
148 def reap_server(self, server, thread):
149 if verbose: print("waiting for server")
150 server.shutdown()
Victor Stinner73efd622011-01-05 23:01:38 +0000151 server.server_close()
R. David Murraye8dc2582009-12-10 02:08:06 +0000152 thread.join()
153 if verbose: print("done")
154
155 @contextmanager
156 def reaped_server(self, hdlr):
157 server, thread = self.make_server((support.HOST, 0), hdlr)
158 try:
159 yield server
160 finally:
161 self.reap_server(server, thread)
162
163 @reap_threads
164 def test_connect(self):
165 with self.reaped_server(SimpleIMAPHandler) as server:
166 client = self.imap_class(*server.server_address)
167 client.shutdown()
168
169 @reap_threads
170 def test_issue5949(self):
171
172 class EOFHandler(socketserver.StreamRequestHandler):
173 def handle(self):
174 # EOF without sending a complete welcome message.
175 self.wfile.write(b'* OK')
176
177 with self.reaped_server(EOFHandler) as server:
178 self.assertRaises(imaplib.IMAP4.abort,
179 self.imap_class, *server.server_address)
180
181 @reap_threads
182 def test_line_termination(self):
183
184 class BadNewlineHandler(SimpleIMAPHandler):
185
186 def cmd_CAPABILITY(self, tag, args):
187 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
188 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
189
190 with self.reaped_server(BadNewlineHandler) as server:
191 self.assertRaises(imaplib.IMAP4.abort,
192 self.imap_class, *server.server_address)
193
194
195
196class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
197
198 server_class = socketserver.TCPServer
199 imap_class = imaplib.IMAP4
200
201
202@unittest.skipUnless(ssl, "SSL not available")
203class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
204
205 server_class = SecureTCPServer
206 imap_class = IMAP4_SSL
207
208
Antoine Pitroub1436f12010-11-09 22:55:55 +0000209class RemoteIMAPTest(unittest.TestCase):
210 host = 'cyrus.andrew.cmu.edu'
211 port = 143
212 username = 'anonymous'
213 password = 'pass'
214 imap_class = imaplib.IMAP4
R. David Murraye8dc2582009-12-10 02:08:06 +0000215
Antoine Pitroub1436f12010-11-09 22:55:55 +0000216 def setUp(self):
217 with transient_internet(self.host):
218 self.server = self.imap_class(self.host, self.port)
219
220 def tearDown(self):
221 if self.server is not None:
Antoine Pitrou924cbea2011-03-23 03:10:14 +0100222 with transient_internet(self.host):
223 self.server.logout()
Antoine Pitroub1436f12010-11-09 22:55:55 +0000224
225 def test_logincapa(self):
Antoine Pitrou924cbea2011-03-23 03:10:14 +0100226 with transient_internet(self.host):
227 for cap in self.server.capabilities:
228 self.assertIsInstance(cap, str)
229 self.assertTrue('LOGINDISABLED' in self.server.capabilities)
230 self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
231 rs = self.server.login(self.username, self.password)
232 self.assertEqual(rs[0], 'OK')
Antoine Pitroub1436f12010-11-09 22:55:55 +0000233
234 def test_logout(self):
Antoine Pitrou924cbea2011-03-23 03:10:14 +0100235 with transient_internet(self.host):
236 rs = self.server.logout()
237 self.server = None
238 self.assertEqual(rs[0], 'BYE')
Antoine Pitroub1436f12010-11-09 22:55:55 +0000239
240
241@unittest.skipUnless(ssl, "SSL not available")
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000242class RemoteIMAP_STARTTLSTest(RemoteIMAPTest):
243
244 def setUp(self):
245 super().setUp()
Antoine Pitrou924cbea2011-03-23 03:10:14 +0100246 with transient_internet(self.host):
247 rs = self.server.starttls()
248 self.assertEqual(rs[0], 'OK')
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000249
250 def test_logincapa(self):
Antoine Pitroudbe75192010-11-16 17:55:26 +0000251 for cap in self.server.capabilities:
252 self.assertIsInstance(cap, str)
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000253 self.assertFalse('LOGINDISABLED' in self.server.capabilities)
254
255
256@unittest.skipUnless(ssl, "SSL not available")
Antoine Pitroub1436f12010-11-09 22:55:55 +0000257class RemoteIMAP_SSLTest(RemoteIMAPTest):
258 port = 993
259 imap_class = IMAP4_SSL
260
Antoine Pitrou08728162011-05-06 18:49:52 +0200261 def setUp(self):
262 pass
263
264 def tearDown(self):
265 pass
266
267 def create_ssl_context(self):
268 ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
269 ssl_context.load_cert_chain(CERTFILE)
270 return ssl_context
271
272 def check_logincapa(self, server):
273 try:
274 for cap in server.capabilities:
275 self.assertIsInstance(cap, str)
276 self.assertFalse('LOGINDISABLED' in server.capabilities)
277 self.assertTrue('AUTH=PLAIN' in server.capabilities)
278 rs = server.login(self.username, self.password)
279 self.assertEqual(rs[0], 'OK')
280 finally:
281 server.logout()
282
Antoine Pitroub1436f12010-11-09 22:55:55 +0000283 def test_logincapa(self):
Antoine Pitrou08728162011-05-06 18:49:52 +0200284 with transient_internet(self.host):
285 _server = self.imap_class(self.host, self.port)
286 self.check_logincapa(_server)
287
288 def test_logincapa_with_client_certfile(self):
289 with transient_internet(self.host):
290 _server = self.imap_class(self.host, self.port, certfile=CERTFILE)
291 self.check_logincapa(_server)
292
293 def test_logincapa_with_client_ssl_context(self):
294 with transient_internet(self.host):
295 _server = self.imap_class(self.host, self.port, ssl_context=self.create_ssl_context())
296 self.check_logincapa(_server)
297
298 def test_logout(self):
299 with transient_internet(self.host):
300 _server = self.imap_class(self.host, self.port)
301 rs = _server.logout()
302 self.assertEqual(rs[0], 'BYE')
303
304 def test_ssl_context_certfile_exclusive(self):
305 with transient_internet(self.host):
306 self.assertRaises(ValueError, self.imap_class, self.host, self.port,
307 certfile=CERTFILE, ssl_context=self.create_ssl_context())
308
309 def test_ssl_context_keyfile_exclusive(self):
310 with transient_internet(self.host):
311 self.assertRaises(ValueError, self.imap_class, self.host, self.port,
312 keyfile=CERTFILE, ssl_context=self.create_ssl_context())
Antoine Pitroub1436f12010-11-09 22:55:55 +0000313
314
315def test_main():
R. David Murraye8dc2582009-12-10 02:08:06 +0000316 tests = [TestImaplib]
317
318 if support.is_resource_enabled('network'):
319 if ssl:
320 global CERTFILE
321 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
322 "keycert.pem")
323 if not os.path.exists(CERTFILE):
324 raise support.TestFailed("Can't read certificate files!")
Antoine Pitroub1436f12010-11-09 22:55:55 +0000325 tests.extend([
326 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000327 RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest,
Antoine Pitroub1436f12010-11-09 22:55:55 +0000328 ])
R. David Murraye8dc2582009-12-10 02:08:06 +0000329
330 support.run_unittest(*tests)
Christian Heimesf6cd9672008-03-26 13:45:42 +0000331
332
333if __name__ == "__main__":
R. David Murraye8dc2582009-12-10 02:08:06 +0000334 support.use_resources = ['network']
335 test_main()