blob: 512bba9c3a2be941938cf6bd7856280abf07452f [file] [log] [blame]
R. David Murraye8dc2582009-12-10 02:08:06 +00001from test import support
2# If we end up with a significant number of tests that don't require
3# threading, this test module should be split. Right now we skip
4# them all if we don't have threading.
5threading = support.import_module('threading')
6
7from contextlib import contextmanager
Martin v. Löwisea752fb2002-01-05 11:31:49 +00008import imaplib
R. David Murraye8dc2582009-12-10 02:08:06 +00009import os.path
R. David Murraye8dc2582009-12-10 02:08:06 +000010import socketserver
Tim Peters108b7912002-07-31 16:42:33 +000011import time
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000012import calendar
Martin v. Löwisea752fb2002-01-05 11:31:49 +000013
Alexander Belopolsky8141cc72012-06-22 21:03:39 -040014from test.support import reap_threads, verbose, transient_internet, run_with_tz, run_with_locale
Christian Heimesf6cd9672008-03-26 13:45:42 +000015import unittest
Alexander Belopolsky8141cc72012-06-22 21:03:39 -040016from datetime import datetime, timezone, timedelta
R. David Murraye8dc2582009-12-10 02:08:06 +000017try:
18 import ssl
19except ImportError:
20 ssl = None
21
22CERTFILE = None
23
Martin v. Löwisea752fb2002-01-05 11:31:49 +000024
Christian Heimesf6cd9672008-03-26 13:45:42 +000025class TestImaplib(unittest.TestCase):
R. David Murraye8dc2582009-12-10 02:08:06 +000026
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000027 def test_Internaldate2tuple(self):
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000028 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000029 tt = imaplib.Internaldate2tuple(
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000030 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
31 self.assertEqual(time.mktime(tt), t0)
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000032 tt = imaplib.Internaldate2tuple(
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000033 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
34 self.assertEqual(time.mktime(tt), t0)
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000035 tt = imaplib.Internaldate2tuple(
Alexander Belopolsky7dabf162011-01-29 19:49:40 +000036 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
37 self.assertEqual(time.mktime(tt), t0)
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000038
Alexander Belopolsky2420d832012-04-29 15:56:49 -040039 @run_with_tz('MST+07MDT,M4.1.0,M10.5.0')
40 def test_Internaldate2tuple_issue10941(self):
41 self.assertNotEqual(imaplib.Internaldate2tuple(
42 b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'),
43 imaplib.Internaldate2tuple(
44 b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'))
45
Christian Heimesf6cd9672008-03-26 13:45:42 +000046
Alexander Belopolsky8141cc72012-06-22 21:03:39 -040047
48 def timevalues(self):
49 return [2000000000, 2000000000.0, time.localtime(2000000000),
50 (2033, 5, 18, 5, 33, 20, -1, -1, -1),
51 (2033, 5, 18, 5, 33, 20, -1, -1, 1),
Alexander Belopolsky64892132012-06-22 21:10:50 -040052 datetime.fromtimestamp(2000000000,
Alexander Belopolsky8141cc72012-06-22 21:03:39 -040053 timezone(timedelta(0, 2*60*60))),
54 '"18-May-2033 05:33:20 +0200"']
55
56 @run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
57 @run_with_tz('STD-1DST')
58 def test_Time2Internaldate(self):
59 expected = '"18-May-2033 05:33:20 +0200"'
60
61 for t in self.timevalues():
62 internal = imaplib.Time2Internaldate(t)
63 self.assertEqual(internal, expected)
64
65 def test_that_Time2Internaldate_returns_a_result(self):
66 # Without tzset, we can check only that it successfully
67 # produces a result, not the correctness of the result itself,
68 # since the result depends on the timezone the machine is in.
69 for t in self.timevalues():
Christian Heimesf6cd9672008-03-26 13:45:42 +000070 imaplib.Time2Internaldate(t)
71
72
R. David Murraye8dc2582009-12-10 02:08:06 +000073if ssl:
74
75 class SecureTCPServer(socketserver.TCPServer):
76
77 def get_request(self):
78 newsocket, fromaddr = self.socket.accept()
79 connstream = ssl.wrap_socket(newsocket,
80 server_side=True,
81 certfile=CERTFILE)
82 return connstream, fromaddr
83
84 IMAP4_SSL = imaplib.IMAP4_SSL
85
86else:
87
88 class SecureTCPServer:
89 pass
90
91 IMAP4_SSL = None
92
93
94class SimpleIMAPHandler(socketserver.StreamRequestHandler):
95
96 timeout = 1
97
98 def _send(self, message):
Antoine Pitrou36c0dbc2010-11-16 17:49:46 +000099 if verbose: print("SENT: %r" % message.strip())
R. David Murraye8dc2582009-12-10 02:08:06 +0000100 self.wfile.write(message)
101
102 def handle(self):
103 # Send a welcome message.
104 self._send(b'* OK IMAP4rev1\r\n')
105 while 1:
106 # Gather up input until we receive a line terminator or we timeout.
107 # Accumulate read(1) because it's simpler to handle the differences
108 # between naked sockets and SSL sockets.
109 line = b''
110 while 1:
111 try:
112 part = self.rfile.read(1)
113 if part == b'':
114 # Naked sockets return empty strings..
115 return
116 line += part
117 except IOError:
118 # ..but SSLSockets throw exceptions.
119 return
120 if line.endswith(b'\r\n'):
121 break
122
Antoine Pitrou36c0dbc2010-11-16 17:49:46 +0000123 if verbose: print('GOT: %r' % line.strip())
R. David Murraye8dc2582009-12-10 02:08:06 +0000124 splitline = line.split()
125 tag = splitline[0].decode('ASCII')
126 cmd = splitline[1].decode('ASCII')
127 args = splitline[2:]
128
129 if hasattr(self, 'cmd_'+cmd):
130 getattr(self, 'cmd_'+cmd)(tag, args)
131 else:
132 self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
133
134 def cmd_CAPABILITY(self, tag, args):
135 self._send(b'* CAPABILITY IMAP4rev1\r\n')
136 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
137
138
139class BaseThreadedNetworkedTests(unittest.TestCase):
140
141 def make_server(self, addr, hdlr):
142
143 class MyServer(self.server_class):
144 def handle_error(self, request, client_address):
145 self.close_request(request)
146 self.server_close()
147 raise
148
149 if verbose: print("creating server")
150 server = MyServer(addr, hdlr)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000151 self.assertEqual(server.server_address, server.socket.getsockname())
R. David Murraye8dc2582009-12-10 02:08:06 +0000152
153 if verbose:
154 print("server created")
155 print("ADDR =", addr)
156 print("CLASS =", self.server_class)
157 print("HDLR =", server.RequestHandlerClass)
158
159 t = threading.Thread(
160 name='%s serving' % self.server_class,
161 target=server.serve_forever,
162 # Short poll interval to make the test finish quickly.
163 # Time between requests is short enough that we won't wake
164 # up spuriously too many times.
165 kwargs={'poll_interval':0.01})
166 t.daemon = True # In case this function raises.
167 t.start()
168 if verbose: print("server running")
169 return server, t
170
171 def reap_server(self, server, thread):
172 if verbose: print("waiting for server")
173 server.shutdown()
Victor Stinner73efd622011-01-05 23:01:38 +0000174 server.server_close()
R. David Murraye8dc2582009-12-10 02:08:06 +0000175 thread.join()
176 if verbose: print("done")
177
178 @contextmanager
179 def reaped_server(self, hdlr):
180 server, thread = self.make_server((support.HOST, 0), hdlr)
181 try:
182 yield server
183 finally:
184 self.reap_server(server, thread)
185
186 @reap_threads
187 def test_connect(self):
188 with self.reaped_server(SimpleIMAPHandler) as server:
189 client = self.imap_class(*server.server_address)
190 client.shutdown()
191
192 @reap_threads
193 def test_issue5949(self):
194
195 class EOFHandler(socketserver.StreamRequestHandler):
196 def handle(self):
197 # EOF without sending a complete welcome message.
198 self.wfile.write(b'* OK')
199
200 with self.reaped_server(EOFHandler) as server:
201 self.assertRaises(imaplib.IMAP4.abort,
202 self.imap_class, *server.server_address)
203
204 @reap_threads
205 def test_line_termination(self):
206
207 class BadNewlineHandler(SimpleIMAPHandler):
208
209 def cmd_CAPABILITY(self, tag, args):
210 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
211 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
212
213 with self.reaped_server(BadNewlineHandler) as server:
214 self.assertRaises(imaplib.IMAP4.abort,
215 self.imap_class, *server.server_address)
216
217
218
219class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
220
221 server_class = socketserver.TCPServer
222 imap_class = imaplib.IMAP4
223
224
225@unittest.skipUnless(ssl, "SSL not available")
226class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
227
228 server_class = SecureTCPServer
229 imap_class = IMAP4_SSL
230
231
Antoine Pitroub1436f12010-11-09 22:55:55 +0000232class RemoteIMAPTest(unittest.TestCase):
233 host = 'cyrus.andrew.cmu.edu'
234 port = 143
235 username = 'anonymous'
236 password = 'pass'
237 imap_class = imaplib.IMAP4
R. David Murraye8dc2582009-12-10 02:08:06 +0000238
Antoine Pitroub1436f12010-11-09 22:55:55 +0000239 def setUp(self):
240 with transient_internet(self.host):
241 self.server = self.imap_class(self.host, self.port)
242
243 def tearDown(self):
244 if self.server is not None:
Antoine Pitrou924cbea2011-03-23 03:10:14 +0100245 with transient_internet(self.host):
246 self.server.logout()
Antoine Pitroub1436f12010-11-09 22:55:55 +0000247
248 def test_logincapa(self):
Antoine Pitrou924cbea2011-03-23 03:10:14 +0100249 with transient_internet(self.host):
250 for cap in self.server.capabilities:
251 self.assertIsInstance(cap, str)
Nick Coghlane6ef4622012-06-17 21:10:21 +1000252 self.assertIn('LOGINDISABLED', self.server.capabilities)
253 self.assertIn('AUTH=ANONYMOUS', self.server.capabilities)
Antoine Pitrou924cbea2011-03-23 03:10:14 +0100254 rs = self.server.login(self.username, self.password)
255 self.assertEqual(rs[0], 'OK')
Antoine Pitroub1436f12010-11-09 22:55:55 +0000256
257 def test_logout(self):
Antoine Pitrou924cbea2011-03-23 03:10:14 +0100258 with transient_internet(self.host):
259 rs = self.server.logout()
260 self.server = None
261 self.assertEqual(rs[0], 'BYE')
Antoine Pitroub1436f12010-11-09 22:55:55 +0000262
263
264@unittest.skipUnless(ssl, "SSL not available")
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000265class RemoteIMAP_STARTTLSTest(RemoteIMAPTest):
266
267 def setUp(self):
268 super().setUp()
Antoine Pitrou924cbea2011-03-23 03:10:14 +0100269 with transient_internet(self.host):
270 rs = self.server.starttls()
271 self.assertEqual(rs[0], 'OK')
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000272
273 def test_logincapa(self):
Antoine Pitroudbe75192010-11-16 17:55:26 +0000274 for cap in self.server.capabilities:
275 self.assertIsInstance(cap, str)
Nick Coghlane6ef4622012-06-17 21:10:21 +1000276 self.assertNotIn('LOGINDISABLED', self.server.capabilities)
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000277
278
279@unittest.skipUnless(ssl, "SSL not available")
Antoine Pitroub1436f12010-11-09 22:55:55 +0000280class RemoteIMAP_SSLTest(RemoteIMAPTest):
281 port = 993
282 imap_class = IMAP4_SSL
283
Antoine Pitrou08728162011-05-06 18:49:52 +0200284 def setUp(self):
285 pass
286
287 def tearDown(self):
288 pass
289
290 def create_ssl_context(self):
291 ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
292 ssl_context.load_cert_chain(CERTFILE)
293 return ssl_context
294
295 def check_logincapa(self, server):
296 try:
297 for cap in server.capabilities:
298 self.assertIsInstance(cap, str)
Nick Coghlane51e25a2012-06-17 21:15:45 +1000299 self.assertNotIn('LOGINDISABLED', server.capabilities)
300 self.assertIn('AUTH=PLAIN', server.capabilities)
Antoine Pitrou08728162011-05-06 18:49:52 +0200301 rs = server.login(self.username, self.password)
302 self.assertEqual(rs[0], 'OK')
303 finally:
304 server.logout()
305
Antoine Pitroub1436f12010-11-09 22:55:55 +0000306 def test_logincapa(self):
Antoine Pitrou08728162011-05-06 18:49:52 +0200307 with transient_internet(self.host):
308 _server = self.imap_class(self.host, self.port)
309 self.check_logincapa(_server)
310
311 def test_logincapa_with_client_certfile(self):
312 with transient_internet(self.host):
313 _server = self.imap_class(self.host, self.port, certfile=CERTFILE)
314 self.check_logincapa(_server)
315
316 def test_logincapa_with_client_ssl_context(self):
317 with transient_internet(self.host):
318 _server = self.imap_class(self.host, self.port, ssl_context=self.create_ssl_context())
319 self.check_logincapa(_server)
320
321 def test_logout(self):
322 with transient_internet(self.host):
323 _server = self.imap_class(self.host, self.port)
324 rs = _server.logout()
325 self.assertEqual(rs[0], 'BYE')
326
327 def test_ssl_context_certfile_exclusive(self):
328 with transient_internet(self.host):
329 self.assertRaises(ValueError, self.imap_class, self.host, self.port,
330 certfile=CERTFILE, ssl_context=self.create_ssl_context())
331
332 def test_ssl_context_keyfile_exclusive(self):
333 with transient_internet(self.host):
334 self.assertRaises(ValueError, self.imap_class, self.host, self.port,
335 keyfile=CERTFILE, ssl_context=self.create_ssl_context())
Antoine Pitroub1436f12010-11-09 22:55:55 +0000336
337
338def test_main():
R. David Murraye8dc2582009-12-10 02:08:06 +0000339 tests = [TestImaplib]
340
341 if support.is_resource_enabled('network'):
342 if ssl:
343 global CERTFILE
344 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
345 "keycert.pem")
346 if not os.path.exists(CERTFILE):
347 raise support.TestFailed("Can't read certificate files!")
Antoine Pitroub1436f12010-11-09 22:55:55 +0000348 tests.extend([
349 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000350 RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest,
Antoine Pitroub1436f12010-11-09 22:55:55 +0000351 ])
R. David Murraye8dc2582009-12-10 02:08:06 +0000352
353 support.run_unittest(*tests)
Christian Heimesf6cd9672008-03-26 13:45:42 +0000354
355
356if __name__ == "__main__":
R. David Murraye8dc2582009-12-10 02:08:06 +0000357 support.use_resources = ['network']
358 test_main()