blob: 6116c8bc96103dddc3f22aaf750ebeca30164ab3 [file] [log] [blame]
R. David Murraya6cdd832009-12-12 18:38:24 +00001from test import support
2# If we end up with a significant number of tests that don't require
3# threading, this test module should be split. Right now we skip
4# them all if we don't have threading.
5threading = support.import_module('threading')
6
7from contextlib import contextmanager
Martin v. Löwisea752fb2002-01-05 11:31:49 +00008import imaplib
R. David Murraya6cdd832009-12-12 18:38:24 +00009import os.path
10import select
11import socket
12import socketserver
13import sys
Tim Peters108b7912002-07-31 16:42:33 +000014import time
Alexander Belopolsky74d31a02011-01-29 20:42:46 +000015import calendar
Martin v. Löwisea752fb2002-01-05 11:31:49 +000016
Antoine Pitrou7ec819e2010-11-09 22:57:20 +000017from test.support import reap_threads, verbose, transient_internet
Christian Heimesf6cd9672008-03-26 13:45:42 +000018import unittest
Piers Lauderf0a70f62002-06-17 07:06:24 +000019
R. David Murraya6cdd832009-12-12 18:38:24 +000020try:
21 import ssl
22except ImportError:
23 ssl = None
24
25CERTFILE = None
26
Martin v. Löwisea752fb2002-01-05 11:31:49 +000027
Christian Heimesf6cd9672008-03-26 13:45:42 +000028class TestImaplib(unittest.TestCase):
R. David Murraya6cdd832009-12-12 18:38:24 +000029
Alexander Belopolsky87bb6e92011-01-29 18:43:43 +000030 def test_Internaldate2tuple(self):
Alexander Belopolsky74d31a02011-01-29 20:42:46 +000031 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
Alexander Belopolsky87bb6e92011-01-29 18:43:43 +000032 tt = imaplib.Internaldate2tuple(
Alexander Belopolsky74d31a02011-01-29 20:42:46 +000033 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
34 self.assertEqual(time.mktime(tt), t0)
Alexander Belopolsky87bb6e92011-01-29 18:43:43 +000035 tt = imaplib.Internaldate2tuple(
Alexander Belopolsky74d31a02011-01-29 20:42:46 +000036 b'25 (INTERNALDATE "01-Jan-2000 11:30:00 +1130")')
37 self.assertEqual(time.mktime(tt), t0)
Alexander Belopolsky87bb6e92011-01-29 18:43:43 +000038 tt = imaplib.Internaldate2tuple(
Alexander Belopolsky74d31a02011-01-29 20:42:46 +000039 b'25 (INTERNALDATE "31-Dec-1999 12:30:00 -1130")')
40 self.assertEqual(time.mktime(tt), t0)
Alexander Belopolsky87bb6e92011-01-29 18:43:43 +000041
Christian Heimesf6cd9672008-03-26 13:45:42 +000042 def test_that_Time2Internaldate_returns_a_result(self):
43 # We can check only that it successfully produces a result,
44 # not the correctness of the result itself, since the result
45 # depends on the timezone the machine is in.
46 timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
47 '"18-May-2033 05:33:20 +0200"']
48
49 for t in timevalues:
50 imaplib.Time2Internaldate(t)
51
52
R. David Murraya6cdd832009-12-12 18:38:24 +000053if ssl:
54
55 class SecureTCPServer(socketserver.TCPServer):
56
57 def get_request(self):
58 newsocket, fromaddr = self.socket.accept()
59 connstream = ssl.wrap_socket(newsocket,
60 server_side=True,
61 certfile=CERTFILE)
62 return connstream, fromaddr
63
64 IMAP4_SSL = imaplib.IMAP4_SSL
65
66else:
67
68 class SecureTCPServer:
69 pass
70
71 IMAP4_SSL = None
72
73
74class SimpleIMAPHandler(socketserver.StreamRequestHandler):
75
76 timeout = 1
77
78 def _send(self, message):
79 if verbose: print("SENT:", message.strip())
80 self.wfile.write(message)
81
82 def handle(self):
83 # Send a welcome message.
84 self._send(b'* OK IMAP4rev1\r\n')
85 while 1:
86 # Gather up input until we receive a line terminator or we timeout.
87 # Accumulate read(1) because it's simpler to handle the differences
88 # between naked sockets and SSL sockets.
89 line = b''
90 while 1:
91 try:
92 part = self.rfile.read(1)
93 if part == b'':
94 # Naked sockets return empty strings..
95 return
96 line += part
97 except IOError:
98 # ..but SSLSockets throw exceptions.
99 return
100 if line.endswith(b'\r\n'):
101 break
102
103 if verbose: print('GOT:', line.strip())
104 splitline = line.split()
105 tag = splitline[0].decode('ASCII')
106 cmd = splitline[1].decode('ASCII')
107 args = splitline[2:]
108
109 if hasattr(self, 'cmd_'+cmd):
110 getattr(self, 'cmd_'+cmd)(tag, args)
111 else:
112 self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII'))
113
114 def cmd_CAPABILITY(self, tag, args):
115 self._send(b'* CAPABILITY IMAP4rev1\r\n')
116 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
117
118
119class BaseThreadedNetworkedTests(unittest.TestCase):
120
121 def make_server(self, addr, hdlr):
122
123 class MyServer(self.server_class):
124 def handle_error(self, request, client_address):
125 self.close_request(request)
126 self.server_close()
127 raise
128
129 if verbose: print("creating server")
130 server = MyServer(addr, hdlr)
Ezio Melotti19f2aeb2010-11-21 01:30:29 +0000131 self.assertEqual(server.server_address, server.socket.getsockname())
R. David Murraya6cdd832009-12-12 18:38:24 +0000132
133 if verbose:
134 print("server created")
135 print("ADDR =", addr)
136 print("CLASS =", self.server_class)
137 print("HDLR =", server.RequestHandlerClass)
138
139 t = threading.Thread(
140 name='%s serving' % self.server_class,
141 target=server.serve_forever,
142 # Short poll interval to make the test finish quickly.
143 # Time between requests is short enough that we won't wake
144 # up spuriously too many times.
145 kwargs={'poll_interval':0.01})
146 t.daemon = True # In case this function raises.
147 t.start()
148 if verbose: print("server running")
149 return server, t
150
151 def reap_server(self, server, thread):
152 if verbose: print("waiting for server")
153 server.shutdown()
154 thread.join()
155 if verbose: print("done")
156
157 @contextmanager
158 def reaped_server(self, hdlr):
159 server, thread = self.make_server((support.HOST, 0), hdlr)
160 try:
161 yield server
162 finally:
163 self.reap_server(server, thread)
164
165 @reap_threads
166 def test_connect(self):
167 with self.reaped_server(SimpleIMAPHandler) as server:
168 client = self.imap_class(*server.server_address)
169 client.shutdown()
170
171 @reap_threads
172 def test_issue5949(self):
173
174 class EOFHandler(socketserver.StreamRequestHandler):
175 def handle(self):
176 # EOF without sending a complete welcome message.
177 self.wfile.write(b'* OK')
178
179 with self.reaped_server(EOFHandler) as server:
180 self.assertRaises(imaplib.IMAP4.abort,
181 self.imap_class, *server.server_address)
182
183 @reap_threads
184 def test_line_termination(self):
185
186 class BadNewlineHandler(SimpleIMAPHandler):
187
188 def cmd_CAPABILITY(self, tag, args):
189 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
190 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
191
192 with self.reaped_server(BadNewlineHandler) as server:
193 self.assertRaises(imaplib.IMAP4.abort,
194 self.imap_class, *server.server_address)
195
196
197
198class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
199
200 server_class = socketserver.TCPServer
201 imap_class = imaplib.IMAP4
202
203
204@unittest.skipUnless(ssl, "SSL not available")
205class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
206
207 server_class = SecureTCPServer
208 imap_class = IMAP4_SSL
209
210
Antoine Pitrou7ec819e2010-11-09 22:57:20 +0000211class RemoteIMAPTest(unittest.TestCase):
212 host = 'cyrus.andrew.cmu.edu'
213 port = 143
214 username = 'anonymous'
215 password = 'pass'
216 imap_class = imaplib.IMAP4
R. David Murraya6cdd832009-12-12 18:38:24 +0000217
Antoine Pitrou7ec819e2010-11-09 22:57:20 +0000218 def setUp(self):
219 with transient_internet(self.host):
220 self.server = self.imap_class(self.host, self.port)
221
222 def tearDown(self):
223 if self.server is not None:
224 self.server.logout()
225
226 def test_logincapa(self):
227 self.assertTrue('LOGINDISABLED' in self.server.capabilities)
228
229 def test_anonlogin(self):
230 self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
231 rs = self.server.login(self.username, self.password)
232 self.assertEqual(rs[0], 'OK')
233
234 def test_logout(self):
235 rs = self.server.logout()
Antoine Pitrou5466e922010-11-09 23:12:32 +0000236 self.server = None
Antoine Pitrou7ec819e2010-11-09 22:57:20 +0000237 self.assertEqual(rs[0], 'BYE')
238
239
240@unittest.skipUnless(ssl, "SSL not available")
241class RemoteIMAP_SSLTest(RemoteIMAPTest):
242 port = 993
243 imap_class = IMAP4_SSL
244
245 def test_logincapa(self):
246 self.assertFalse('LOGINDISABLED' in self.server.capabilities)
247 self.assertTrue('AUTH=PLAIN' in self.server.capabilities)
248
249
250def test_main():
R. David Murraya6cdd832009-12-12 18:38:24 +0000251 tests = [TestImaplib]
252
253 if support.is_resource_enabled('network'):
254 if ssl:
255 global CERTFILE
256 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
257 "keycert.pem")
258 if not os.path.exists(CERTFILE):
259 raise support.TestFailed("Can't read certificate files!")
Antoine Pitrou7ec819e2010-11-09 22:57:20 +0000260 tests.extend([
261 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
262 RemoteIMAPTest, RemoteIMAP_SSLTest,
263 ])
R. David Murraya6cdd832009-12-12 18:38:24 +0000264
265 support.run_unittest(*tests)
Christian Heimesf6cd9672008-03-26 13:45:42 +0000266
267
268if __name__ == "__main__":
R. David Murraya6cdd832009-12-12 18:38:24 +0000269 support.use_resources = ['network']
270 test_main()