blob: f288ef0f979ad4304f85a9b3b34643475dbd7f58 [file] [log] [blame]
R. David Murray07ca7612009-12-12 18:36:47 +00001from test import test_support as support
2# If we end up with a significant number of tests that don't require
3# threading, this test module should be split. Right now we skip
4# them all if we don't have threading.
5threading = support.import_module('threading')
6
7from contextlib import contextmanager
Martin v. Löwisea752fb2002-01-05 11:31:49 +00008import imaplib
R. David Murray07ca7612009-12-12 18:36:47 +00009import os.path
10import select
11import socket
12import SocketServer
13import sys
Tim Peters108b7912002-07-31 16:42:33 +000014import time
Martin v. Löwisea752fb2002-01-05 11:31:49 +000015
R. David Murray07ca7612009-12-12 18:36:47 +000016from test_support import verbose
Jerry Seutter14f0bc72008-03-26 05:03:03 +000017import unittest
Piers Lauderf0a70f62002-06-17 07:06:24 +000018
R. David Murray07ca7612009-12-12 18:36:47 +000019try:
20 import ssl
21except ImportError:
22 ssl = None
23
24CERTFILE = None
25
Martin v. Löwisea752fb2002-01-05 11:31:49 +000026
Jerry Seutter14f0bc72008-03-26 05:03:03 +000027class TestImaplib(unittest.TestCase):
R. David Murray07ca7612009-12-12 18:36:47 +000028
Jerry Seutter14f0bc72008-03-26 05:03:03 +000029 def test_that_Time2Internaldate_returns_a_result(self):
30 # We can check only that it successfully produces a result,
31 # not the correctness of the result itself, since the result
32 # depends on the timezone the machine is in.
33 timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
34 '"18-May-2033 05:33:20 +0200"']
35
36 for t in timevalues:
37 imaplib.Time2Internaldate(t)
38
39
R. David Murray07ca7612009-12-12 18:36:47 +000040if ssl:
Jerry Seutter14f0bc72008-03-26 05:03:03 +000041
R. David Murray07ca7612009-12-12 18:36:47 +000042 class SecureTCPServer(SocketServer.TCPServer):
43
44 def get_request(self):
45 newsocket, fromaddr = self.socket.accept()
46 connstream = ssl.wrap_socket(newsocket,
47 server_side=True,
48 certfile=CERTFILE)
49 return connstream, fromaddr
50
51 IMAP4_SSL = imaplib.IMAP4_SSL
52
53else:
54
55 class SecureTCPServer:
56 pass
57
58 IMAP4_SSL = None
59
60class TimeoutStreamRequestHandler(SocketServer.StreamRequestHandler):
61
62 timeout = 1
63
64 def setup(self):
65 self.connection = self.request
66 if self.timeout is not None:
67 self.connection.settimeout(self.timeout)
68 self.rfile = self.connection.makefile('rb', self.rbufsize)
69 self.wfile = self.connection.makefile('wb', self.wbufsize)
70
71
72class SimpleIMAPHandler(TimeoutStreamRequestHandler):
73
74 def _send(self, message):
75 if verbose: print "SENT:", message.strip()
76 self.wfile.write(message)
77
78 def handle(self):
79 # Send a welcome message.
80 self._send('* OK IMAP4rev1\r\n')
81 while 1:
82 # Gather up input until we receive a line terminator or we timeout.
83 # Accumulate read(1) because it's simpler to handle the differences
84 # between naked sockets and SSL sockets.
85 line = ''
86 while 1:
87 try:
88 part = self.rfile.read(1)
89 if part == '':
90 # Naked sockets return empty strings..
91 return
92 line += part
93 except IOError:
94 # ..but SSLSockets throw exceptions.
95 return
96 if line.endswith('\r\n'):
97 break
98
99 if verbose: print 'GOT:', line.strip()
100 splitline = line.split()
101 tag = splitline[0]
102 cmd = splitline[1]
103 args = splitline[2:]
104
105 if hasattr(self, 'cmd_%s' % (cmd,)):
106 getattr(self, 'cmd_%s' % (cmd,))(tag, args)
107 else:
108 self._send('%s BAD %s unknown\r\n' % (tag, cmd))
109
110 def cmd_CAPABILITY(self, tag, args):
111 self._send('* CAPABILITY IMAP4rev1\r\n')
112 self._send('%s OK CAPABILITY completed\r\n' % (tag,))
113
114class BaseThreadedNetworkedTests(unittest.TestCase):
115
116 def make_server(self, addr, hdlr):
117
118 class MyServer(self.server_class):
119 def handle_error(self, request, client_address):
120 self.close_request(request)
121 self.server_close()
122 raise
123
124 if verbose: print "creating server"
125 server = MyServer(addr, hdlr)
126 self.assertEquals(server.server_address, server.socket.getsockname())
127
128 if verbose:
129 print "server created"
130 print "ADDR =", addr
131 print "CLASS =", self.server_class
132 print "HDLR =", server.RequestHandlerClass
133
134 t = threading.Thread(
135 name='%s serving' % self.server_class,
136 target=server.serve_forever,
137 # Short poll interval to make the test finish quickly.
138 # Time between requests is short enough that we won't wake
139 # up spuriously too many times.
140 kwargs={'poll_interval':0.01})
141 t.daemon = True # In case this function raises.
142 t.start()
143 if verbose: print "server running"
144 return server, t
145
146 def reap_server(self, server, thread):
147 if verbose: print "waiting for server"
148 server.shutdown()
149 thread.join()
150 if verbose: print "done"
151
152 @contextmanager
153 def reaped_server(self, hdlr):
154 server, thread = self.make_server((support.HOST, 0), hdlr)
155 try:
156 yield server
157 finally:
158 self.reap_server(server, thread)
159
160 def test_connect(self):
161 with self.reaped_server(SimpleIMAPHandler) as server:
162 client = self.imap_class(*server.server_address)
163 client.shutdown()
164
165 def test_issue5949(self):
166
167 class EOFHandler(TimeoutStreamRequestHandler):
168 def handle(self):
169 # EOF without sending a complete welcome message.
170 self.wfile.write('* OK')
171 # explicitly shutdown. socket.close() merely releases
172 # the socket and waits for GC to perform the actual close.
173 self.request.shutdown(socket.SHUT_WR)
174
175 with self.reaped_server(EOFHandler) as server:
176 self.assertRaises(imaplib.IMAP4.abort,
177 self.imap_class, *server.server_address)
178
179class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
180
181 server_class = SocketServer.TCPServer
182 imap_class = imaplib.IMAP4
183
184
185class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
186
187 server_class = SecureTCPServer
188 imap_class = IMAP4_SSL
189
190
191def test_main():
192 tests = [TestImaplib]
193
194 if support.is_resource_enabled('network'):
195 if ssl:
196 global CERTFILE
197 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
198 "keycert.pem")
199 if not os.path.exists(CERTFILE):
200 raise support.TestFailed("Can't read certificate files!")
201 tests.append(ThreadedNetworkedTestsSSL)
202 tests.append(ThreadedNetworkedTests)
203
204 threadinfo = support.threading_setup()
205
206 support.run_unittest(*tests)
207
208 support.threading_cleanup(*threadinfo)
Jerry Seutter14f0bc72008-03-26 05:03:03 +0000209
210if __name__ == "__main__":
R. David Murray07ca7612009-12-12 18:36:47 +0000211 support.use_resources = ['network']
212 test_main()