blob: bad4d19b598c289625f74fa398a795785c429262 [file] [log] [blame]
R. David Murraye8dc2582009-12-10 02:08:06 +00001from test import support
2# If we end up with a significant number of tests that don't require
3# threading, this test module should be split. Right now we skip
4# them all if we don't have threading.
5threading = support.import_module('threading')
6
7from contextlib import contextmanager
Martin v. Löwisea752fb2002-01-05 11:31:49 +00008import imaplib
R. David Murraye8dc2582009-12-10 02:08:06 +00009import os.path
R. David Murraye8dc2582009-12-10 02:08:06 +000010import socketserver
Tim Peters108b7912002-07-31 16:42:33 +000011import time
Martin v. Löwisea752fb2002-01-05 11:31:49 +000012
Antoine Pitroub1436f12010-11-09 22:55:55 +000013from test.support import reap_threads, verbose, transient_internet
Christian Heimesf6cd9672008-03-26 13:45:42 +000014import unittest
Piers Lauderf0a70f62002-06-17 07:06:24 +000015
R. David Murraye8dc2582009-12-10 02:08:06 +000016try:
17 import ssl
18except ImportError:
19 ssl = None
20
21CERTFILE = None
22
Martin v. Löwisea752fb2002-01-05 11:31:49 +000023
Christian Heimesf6cd9672008-03-26 13:45:42 +000024class TestImaplib(unittest.TestCase):
R. David Murraye8dc2582009-12-10 02:08:06 +000025
Alexander Belopolsky19e0a9e2011-01-29 17:19:08 +000026 def test_Internaldate2tuple(self):
27 tt = imaplib.Internaldate2tuple(
28 b'25 (INTERNALDATE "01-Jan-1970 00:00:00 +0000")')
29 self.assertEqual(time.mktime(tt), 0)
30 tt = imaplib.Internaldate2tuple(
31 b'25 (INTERNALDATE "01-Jan-1970 11:30:00 +1130")')
32 self.assertEqual(time.mktime(tt), 0)
33 tt = imaplib.Internaldate2tuple(
34 b'25 (INTERNALDATE "31-Dec-1969 12:30:00 -1130")')
35 self.assertEqual(time.mktime(tt), 0)
36
Christian Heimesf6cd9672008-03-26 13:45:42 +000037 def test_that_Time2Internaldate_returns_a_result(self):
38 # We can check only that it successfully produces a result,
39 # not the correctness of the result itself, since the result
40 # depends on the timezone the machine is in.
41 timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
42 '"18-May-2033 05:33:20 +0200"']
43
44 for t in timevalues:
45 imaplib.Time2Internaldate(t)
46
47
R. David Murraye8dc2582009-12-10 02:08:06 +000048if ssl:
49
50 class SecureTCPServer(socketserver.TCPServer):
51
52 def get_request(self):
53 newsocket, fromaddr = self.socket.accept()
54 connstream = ssl.wrap_socket(newsocket,
55 server_side=True,
56 certfile=CERTFILE)
57 return connstream, fromaddr
58
59 IMAP4_SSL = imaplib.IMAP4_SSL
60
61else:
62
63 class SecureTCPServer:
64 pass
65
66 IMAP4_SSL = None
67
68
69class SimpleIMAPHandler(socketserver.StreamRequestHandler):
70
71 timeout = 1
72
73 def _send(self, message):
Antoine Pitrou36c0dbc2010-11-16 17:49:46 +000074 if verbose: print("SENT: %r" % message.strip())
R. David Murraye8dc2582009-12-10 02:08:06 +000075 self.wfile.write(message)
76
77 def handle(self):
78 # Send a welcome message.
79 self._send(b'* OK IMAP4rev1\r\n')
80 while 1:
81 # Gather up input until we receive a line terminator or we timeout.
82 # Accumulate read(1) because it's simpler to handle the differences
83 # between naked sockets and SSL sockets.
84 line = b''
85 while 1:
86 try:
87 part = self.rfile.read(1)
88 if part == b'':
89 # Naked sockets return empty strings..
90 return
91 line += part
92 except IOError:
93 # ..but SSLSockets throw exceptions.
94 return
95 if line.endswith(b'\r\n'):
96 break
97
Antoine Pitrou36c0dbc2010-11-16 17:49:46 +000098 if verbose: print('GOT: %r' % line.strip())
R. David Murraye8dc2582009-12-10 02:08:06 +000099 splitline = line.split()
100 tag = splitline[0].decode('ASCII')
101 cmd = splitline[1].decode('ASCII')
102 args = splitline[2:]
103
104 if hasattr(self, 'cmd_'+cmd):
105 getattr(self, 'cmd_'+cmd)(tag, args)
106 else:
107 self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
108
109 def cmd_CAPABILITY(self, tag, args):
110 self._send(b'* CAPABILITY IMAP4rev1\r\n')
111 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
112
113
114class BaseThreadedNetworkedTests(unittest.TestCase):
115
116 def make_server(self, addr, hdlr):
117
118 class MyServer(self.server_class):
119 def handle_error(self, request, client_address):
120 self.close_request(request)
121 self.server_close()
122 raise
123
124 if verbose: print("creating server")
125 server = MyServer(addr, hdlr)
Ezio Melottib3aedd42010-11-20 19:04:17 +0000126 self.assertEqual(server.server_address, server.socket.getsockname())
R. David Murraye8dc2582009-12-10 02:08:06 +0000127
128 if verbose:
129 print("server created")
130 print("ADDR =", addr)
131 print("CLASS =", self.server_class)
132 print("HDLR =", server.RequestHandlerClass)
133
134 t = threading.Thread(
135 name='%s serving' % self.server_class,
136 target=server.serve_forever,
137 # Short poll interval to make the test finish quickly.
138 # Time between requests is short enough that we won't wake
139 # up spuriously too many times.
140 kwargs={'poll_interval':0.01})
141 t.daemon = True # In case this function raises.
142 t.start()
143 if verbose: print("server running")
144 return server, t
145
146 def reap_server(self, server, thread):
147 if verbose: print("waiting for server")
148 server.shutdown()
Victor Stinner73efd622011-01-05 23:01:38 +0000149 server.server_close()
R. David Murraye8dc2582009-12-10 02:08:06 +0000150 thread.join()
151 if verbose: print("done")
152
153 @contextmanager
154 def reaped_server(self, hdlr):
155 server, thread = self.make_server((support.HOST, 0), hdlr)
156 try:
157 yield server
158 finally:
159 self.reap_server(server, thread)
160
161 @reap_threads
162 def test_connect(self):
163 with self.reaped_server(SimpleIMAPHandler) as server:
164 client = self.imap_class(*server.server_address)
165 client.shutdown()
166
167 @reap_threads
168 def test_issue5949(self):
169
170 class EOFHandler(socketserver.StreamRequestHandler):
171 def handle(self):
172 # EOF without sending a complete welcome message.
173 self.wfile.write(b'* OK')
174
175 with self.reaped_server(EOFHandler) as server:
176 self.assertRaises(imaplib.IMAP4.abort,
177 self.imap_class, *server.server_address)
178
179 @reap_threads
180 def test_line_termination(self):
181
182 class BadNewlineHandler(SimpleIMAPHandler):
183
184 def cmd_CAPABILITY(self, tag, args):
185 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
186 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
187
188 with self.reaped_server(BadNewlineHandler) as server:
189 self.assertRaises(imaplib.IMAP4.abort,
190 self.imap_class, *server.server_address)
191
192
193
194class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
195
196 server_class = socketserver.TCPServer
197 imap_class = imaplib.IMAP4
198
199
200@unittest.skipUnless(ssl, "SSL not available")
201class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
202
203 server_class = SecureTCPServer
204 imap_class = IMAP4_SSL
205
206
Antoine Pitroub1436f12010-11-09 22:55:55 +0000207class RemoteIMAPTest(unittest.TestCase):
208 host = 'cyrus.andrew.cmu.edu'
209 port = 143
210 username = 'anonymous'
211 password = 'pass'
212 imap_class = imaplib.IMAP4
R. David Murraye8dc2582009-12-10 02:08:06 +0000213
Antoine Pitroub1436f12010-11-09 22:55:55 +0000214 def setUp(self):
215 with transient_internet(self.host):
216 self.server = self.imap_class(self.host, self.port)
217
218 def tearDown(self):
219 if self.server is not None:
220 self.server.logout()
221
222 def test_logincapa(self):
Antoine Pitroudbe75192010-11-16 17:55:26 +0000223 for cap in self.server.capabilities:
224 self.assertIsInstance(cap, str)
Antoine Pitroub1436f12010-11-09 22:55:55 +0000225 self.assertTrue('LOGINDISABLED' in self.server.capabilities)
Antoine Pitroub1436f12010-11-09 22:55:55 +0000226 self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
227 rs = self.server.login(self.username, self.password)
228 self.assertEqual(rs[0], 'OK')
229
230 def test_logout(self):
231 rs = self.server.logout()
Antoine Pitroud79f3c82010-11-09 23:10:33 +0000232 self.server = None
Antoine Pitroub1436f12010-11-09 22:55:55 +0000233 self.assertEqual(rs[0], 'BYE')
234
235
236@unittest.skipUnless(ssl, "SSL not available")
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000237class RemoteIMAP_STARTTLSTest(RemoteIMAPTest):
238
239 def setUp(self):
240 super().setUp()
241 rs = self.server.starttls()
242 self.assertEqual(rs[0], 'OK')
243
244 def test_logincapa(self):
Antoine Pitroudbe75192010-11-16 17:55:26 +0000245 for cap in self.server.capabilities:
246 self.assertIsInstance(cap, str)
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000247 self.assertFalse('LOGINDISABLED' in self.server.capabilities)
248
249
250@unittest.skipUnless(ssl, "SSL not available")
Antoine Pitroub1436f12010-11-09 22:55:55 +0000251class RemoteIMAP_SSLTest(RemoteIMAPTest):
252 port = 993
253 imap_class = IMAP4_SSL
254
255 def test_logincapa(self):
Antoine Pitroudbe75192010-11-16 17:55:26 +0000256 for cap in self.server.capabilities:
257 self.assertIsInstance(cap, str)
Antoine Pitroub1436f12010-11-09 22:55:55 +0000258 self.assertFalse('LOGINDISABLED' in self.server.capabilities)
259 self.assertTrue('AUTH=PLAIN' in self.server.capabilities)
260
261
262def test_main():
R. David Murraye8dc2582009-12-10 02:08:06 +0000263 tests = [TestImaplib]
264
265 if support.is_resource_enabled('network'):
266 if ssl:
267 global CERTFILE
268 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
269 "keycert.pem")
270 if not os.path.exists(CERTFILE):
271 raise support.TestFailed("Can't read certificate files!")
Antoine Pitroub1436f12010-11-09 22:55:55 +0000272 tests.extend([
273 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
Antoine Pitrouf3b001f2010-11-12 18:49:16 +0000274 RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest,
Antoine Pitroub1436f12010-11-09 22:55:55 +0000275 ])
R. David Murraye8dc2582009-12-10 02:08:06 +0000276
277 support.run_unittest(*tests)
Christian Heimesf6cd9672008-03-26 13:45:42 +0000278
279
280if __name__ == "__main__":
R. David Murraye8dc2582009-12-10 02:08:06 +0000281 support.use_resources = ['network']
282 test_main()