blob: 5694bb5cda6ea817cd61640205eb298108ea4190 [file] [log] [blame]
Guido van Rossum806c2462007-08-06 23:33:07 +00001import asyncore
Guido van Rossum04110fb2007-08-24 16:32:05 +00002import email.utils
Guido van Rossumd8faa362007-04-27 19:54:29 +00003import socket
4import threading
Guido van Rossum806c2462007-08-06 23:33:07 +00005import smtpd
Guido van Rossumd8faa362007-04-27 19:54:29 +00006import smtplib
Guido van Rossum806c2462007-08-06 23:33:07 +00007import io
8import sys
Guido van Rossumd8faa362007-04-27 19:54:29 +00009import time
Guido van Rossum806c2462007-08-06 23:33:07 +000010import select
Guido van Rossumd8faa362007-04-27 19:54:29 +000011
12from unittest import TestCase
13from test import test_support
14
Guido van Rossum806c2462007-08-06 23:33:07 +000015# PORT is used to communicate the port number assigned to the server
16# to the test client
17HOST = "localhost"
18PORT = None
Guido van Rossumd8faa362007-04-27 19:54:29 +000019
Guido van Rossum806c2462007-08-06 23:33:07 +000020def server(evt, buf):
Christian Heimes380f7f22008-02-28 11:19:05 +000021 serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
22 serv.settimeout(1)
23 serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
24 serv.bind(("", 0))
25 global PORT
26 PORT = serv.getsockname()[1]
27 serv.listen(5)
28 evt.set()
Guido van Rossumd8faa362007-04-27 19:54:29 +000029 try:
30 conn, addr = serv.accept()
31 except socket.timeout:
32 pass
33 else:
Guido van Rossum806c2462007-08-06 23:33:07 +000034 n = 500
35 while buf and n > 0:
36 r, w, e = select.select([], [conn], [])
37 if w:
38 sent = conn.send(buf)
39 buf = buf[sent:]
40
41 n -= 1
Guido van Rossum806c2462007-08-06 23:33:07 +000042
Guido van Rossumd8faa362007-04-27 19:54:29 +000043 conn.close()
44 finally:
45 serv.close()
Guido van Rossum806c2462007-08-06 23:33:07 +000046 PORT = None
Guido van Rossumd8faa362007-04-27 19:54:29 +000047 evt.set()
48
49class GeneralTests(TestCase):
50
51 def setUp(self):
52 self.evt = threading.Event()
Guido van Rossum8a392d72007-11-21 22:09:45 +000053 servargs = (self.evt, b"220 Hola mundo\n")
Guido van Rossum806c2462007-08-06 23:33:07 +000054 threading.Thread(target=server, args=servargs).start()
Christian Heimes380f7f22008-02-28 11:19:05 +000055 self.evt.wait()
56 self.evt.clear()
Guido van Rossumd8faa362007-04-27 19:54:29 +000057
58 def tearDown(self):
59 self.evt.wait()
60
Guido van Rossum806c2462007-08-06 23:33:07 +000061 def testBasic1(self):
Guido van Rossumd8faa362007-04-27 19:54:29 +000062 # connects
Guido van Rossum806c2462007-08-06 23:33:07 +000063 smtp = smtplib.SMTP(HOST, PORT)
Guido van Rossumd8faa362007-04-27 19:54:29 +000064 smtp.sock.close()
65
Guido van Rossum806c2462007-08-06 23:33:07 +000066 def testBasic2(self):
67 # connects, include port in host name
68 smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
69 smtp.sock.close()
70
71 def testLocalHostName(self):
72 # check that supplied local_hostname is used
73 smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
74 self.assertEqual(smtp.local_hostname, "testhost")
75 smtp.sock.close()
76
Guido van Rossumd8faa362007-04-27 19:54:29 +000077 def testTimeoutDefault(self):
78 # default
Guido van Rossum806c2462007-08-06 23:33:07 +000079 smtp = smtplib.SMTP(HOST, PORT)
Guido van Rossumd8faa362007-04-27 19:54:29 +000080 self.assertTrue(smtp.sock.gettimeout() is None)
81 smtp.sock.close()
82
83 def testTimeoutValue(self):
84 # a value
Guido van Rossum806c2462007-08-06 23:33:07 +000085 smtp = smtplib.SMTP(HOST, PORT, timeout=30)
Guido van Rossumd8faa362007-04-27 19:54:29 +000086 self.assertEqual(smtp.sock.gettimeout(), 30)
87 smtp.sock.close()
88
89 def testTimeoutNone(self):
90 # None, having other default
91 previous = socket.getdefaulttimeout()
92 socket.setdefaulttimeout(30)
93 try:
Guido van Rossum806c2462007-08-06 23:33:07 +000094 smtp = smtplib.SMTP(HOST, PORT, timeout=None)
Guido van Rossumd8faa362007-04-27 19:54:29 +000095 finally:
96 socket.setdefaulttimeout(previous)
97 self.assertEqual(smtp.sock.gettimeout(), 30)
98 smtp.sock.close()
99
100
Guido van Rossum04110fb2007-08-24 16:32:05 +0000101# Test server thread using the specified SMTP server class
102def debugging_server(server_class, serv_evt, client_evt):
103 serv = server_class(("", 0), ('nowhere', -1))
Guido van Rossum806c2462007-08-06 23:33:07 +0000104 global PORT
105 PORT = serv.getsockname()[1]
Christian Heimes380f7f22008-02-28 11:19:05 +0000106 serv_evt.set()
Guido van Rossum806c2462007-08-06 23:33:07 +0000107
108 try:
109 if hasattr(select, 'poll'):
110 poll_fun = asyncore.poll2
111 else:
112 poll_fun = asyncore.poll
113
114 n = 1000
115 while asyncore.socket_map and n > 0:
116 poll_fun(0.01, asyncore.socket_map)
117
118 # when the client conversation is finished, it will
119 # set client_evt, and it's then ok to kill the server
120 if client_evt.isSet():
121 serv.close()
122 break
123
124 n -= 1
125
126 except socket.timeout:
127 pass
128 finally:
Christian Heimes380f7f22008-02-28 11:19:05 +0000129 if not client_evt.isSet():
130 # allow some time for the client to read the result
131 time.sleep(0.5)
132 serv.close()
Guido van Rossum806c2462007-08-06 23:33:07 +0000133 asyncore.close_all()
134 PORT = None
Guido van Rossum806c2462007-08-06 23:33:07 +0000135 serv_evt.set()
136
137MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
138MSG_END = '------------ END MESSAGE ------------\n'
139
Guido van Rossum04110fb2007-08-24 16:32:05 +0000140# NOTE: Some SMTP objects in the tests below are created with a non-default
141# local_hostname argument to the constructor, since (on some systems) the FQDN
142# lookup caused by the default local_hostname sometimes takes so long that the
Guido van Rossum806c2462007-08-06 23:33:07 +0000143# test server times out, causing the test to fail.
Guido van Rossum04110fb2007-08-24 16:32:05 +0000144
145# Test behavior of smtpd.DebuggingServer
Guido van Rossum806c2462007-08-06 23:33:07 +0000146class DebuggingServerTests(TestCase):
147
148 def setUp(self):
149 # temporarily replace sys.stdout to capture DebuggingServer output
150 self.old_stdout = sys.stdout
151 self.output = io.StringIO()
152 sys.stdout = self.output
153
154 self.serv_evt = threading.Event()
155 self.client_evt = threading.Event()
Guido van Rossum04110fb2007-08-24 16:32:05 +0000156 serv_args = (smtpd.DebuggingServer, self.serv_evt, self.client_evt)
Guido van Rossum806c2462007-08-06 23:33:07 +0000157 threading.Thread(target=debugging_server, args=serv_args).start()
158
159 # wait until server thread has assigned a port number
Christian Heimes380f7f22008-02-28 11:19:05 +0000160 self.serv_evt.wait()
161 self.serv_evt.clear()
Guido van Rossum806c2462007-08-06 23:33:07 +0000162
163 def tearDown(self):
164 # indicate that the client is finished
165 self.client_evt.set()
166 # wait for the server thread to terminate
167 self.serv_evt.wait()
168 # restore sys.stdout
169 sys.stdout = self.old_stdout
170
171 def testBasic(self):
172 # connect
173 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
174 smtp.quit()
175
Guido van Rossum04110fb2007-08-24 16:32:05 +0000176 def testNOOP(self):
177 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
178 expected = (250, b'Ok')
179 self.assertEqual(smtp.noop(), expected)
180 smtp.quit()
181
182 def testRSET(self):
183 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
184 expected = (250, b'Ok')
185 self.assertEqual(smtp.rset(), expected)
186 smtp.quit()
187
188 def testNotImplemented(self):
189 # EHLO isn't implemented in DebuggingServer
Guido van Rossum806c2462007-08-06 23:33:07 +0000190 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
191 expected = (502, b'Error: command "EHLO" not implemented')
192 self.assertEqual(smtp.ehlo(), expected)
193 smtp.quit()
194
Guido van Rossum04110fb2007-08-24 16:32:05 +0000195 def testVRFY(self):
196 # VRFY isn't implemented in DebuggingServer
197 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
198 expected = (502, b'Error: command "VRFY" not implemented')
199 self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
200 self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
201 smtp.quit()
202
203 def testSecondHELO(self):
204 # check that a second HELO returns a message that it's a duplicate
205 # (this behavior is specific to smtpd.SMTPChannel)
206 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
207 smtp.helo()
208 expected = (503, b'Duplicate HELO/EHLO')
209 self.assertEqual(smtp.helo(), expected)
210 smtp.quit()
211
Guido van Rossum806c2462007-08-06 23:33:07 +0000212 def testHELP(self):
213 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
214 self.assertEqual(smtp.help(), b'Error: command "HELP" not implemented')
215 smtp.quit()
216
217 def testSend(self):
218 # connect and send mail
219 m = 'A test message'
220 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
221 smtp.sendmail('John', 'Sally', m)
222 smtp.quit()
223
224 self.client_evt.set()
225 self.serv_evt.wait()
226 self.output.flush()
227 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
228 self.assertEqual(self.output.getvalue(), mexpect)
229
230
Christian Heimes380f7f22008-02-28 11:19:05 +0000231class NonConnectingTests(TestCase):
232
233 def testNotConnected(self):
234 # Test various operations on an unconnected SMTP object that
235 # should raise exceptions (at present the attempt in SMTP.send
236 # to reference the nonexistent 'sock' attribute of the SMTP object
237 # causes an AttributeError)
238 smtp = smtplib.SMTP()
239 self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
240 self.assertRaises(smtplib.SMTPServerDisconnected,
241 smtp.send, 'test msg')
242
243 def testNonnumericPort(self):
244 # check that non-numeric port raises socket.error
245 self.assertRaises(socket.error, smtplib.SMTP,
246 "localhost", "bogus")
247 self.assertRaises(socket.error, smtplib.SMTP,
248 "localhost:bogus")
249
250
Guido van Rossum04110fb2007-08-24 16:32:05 +0000251# test response of client to a non-successful HELO message
Guido van Rossum806c2462007-08-06 23:33:07 +0000252class BadHELOServerTests(TestCase):
253
254 def setUp(self):
255 self.old_stdout = sys.stdout
256 self.output = io.StringIO()
257 sys.stdout = self.output
258
259 self.evt = threading.Event()
260 servargs = (self.evt, b"199 no hello for you!\n")
261 threading.Thread(target=server, args=servargs).start()
Christian Heimes380f7f22008-02-28 11:19:05 +0000262 self.evt.wait()
263 self.evt.clear()
Guido van Rossum806c2462007-08-06 23:33:07 +0000264
265 def tearDown(self):
266 self.evt.wait()
267 sys.stdout = self.old_stdout
268
269 def testFailingHELO(self):
270 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
271 HOST, PORT, 'localhost', 3)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000272
Guido van Rossum04110fb2007-08-24 16:32:05 +0000273
274sim_users = {'Mr.A@somewhere.com':'John A',
275 'Ms.B@somewhere.com':'Sally B',
276 'Mrs.C@somewhereesle.com':'Ruth C',
277 }
278
279sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
280 'list-2':['Ms.B@somewhere.com',],
281 }
282
283# Simulated SMTP channel & server
284class SimSMTPChannel(smtpd.SMTPChannel):
285 def smtp_EHLO(self, arg):
286 resp = '250-testhost\r\n' \
287 '250-EXPN\r\n' \
288 '250-SIZE 20000000\r\n' \
289 '250-STARTTLS\r\n' \
290 '250-DELIVERBY\r\n' \
291 '250 HELP'
292 self.push(resp)
293
294 def smtp_VRFY(self, arg):
295# print '\nsmtp_VRFY(%r)\n' % arg
296
297 raw_addr = email.utils.parseaddr(arg)[1]
298 quoted_addr = smtplib.quoteaddr(arg)
299 if raw_addr in sim_users:
300 self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr))
301 else:
302 self.push('550 No such user: %s' % arg)
303
304 def smtp_EXPN(self, arg):
305# print '\nsmtp_EXPN(%r)\n' % arg
306
307 list_name = email.utils.parseaddr(arg)[1].lower()
308 if list_name in sim_lists:
309 user_list = sim_lists[list_name]
310 for n, user_email in enumerate(user_list):
311 quoted_addr = smtplib.quoteaddr(user_email)
312 if n < len(user_list) - 1:
313 self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
314 else:
315 self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
316 else:
317 self.push('550 No access for you!')
318
319
320class SimSMTPServer(smtpd.SMTPServer):
321 def handle_accept(self):
322 conn, addr = self.accept()
323 channel = SimSMTPChannel(self, conn, addr)
324
325 def process_message(self, peer, mailfrom, rcpttos, data):
326 pass
327
328
329# Test various SMTP & ESMTP commands/behaviors that require a simulated server
330# (i.e., something with more features than DebuggingServer)
331class SMTPSimTests(TestCase):
332
333 def setUp(self):
334 self.serv_evt = threading.Event()
335 self.client_evt = threading.Event()
336 serv_args = (SimSMTPServer, self.serv_evt, self.client_evt)
337 threading.Thread(target=debugging_server, args=serv_args).start()
338
339 # wait until server thread has assigned a port number
Christian Heimes380f7f22008-02-28 11:19:05 +0000340 self.serv_evt.wait()
341 self.serv_evt.clear()
Guido van Rossum04110fb2007-08-24 16:32:05 +0000342
343 def tearDown(self):
344 # indicate that the client is finished
345 self.client_evt.set()
346 # wait for the server thread to terminate
347 self.serv_evt.wait()
348
349 def testBasic(self):
350 # smoke test
351 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
352 smtp.quit()
353
354 def testEHLO(self):
355 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
356
357 # no features should be present before the EHLO
358 self.assertEqual(smtp.esmtp_features, {})
359
360 # features expected from the test server
361 expected_features = {'expn':'',
362 'size': '20000000',
363 'starttls': '',
364 'deliverby': '',
365 'help': '',
366 }
367
368 smtp.ehlo()
369 self.assertEqual(smtp.esmtp_features, expected_features)
370 for k in expected_features:
371 self.assertTrue(smtp.has_extn(k))
372 self.assertFalse(smtp.has_extn('unsupported-feature'))
373 smtp.quit()
374
375 def testVRFY(self):
376 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
377
378 for email, name in sim_users.items():
379 expected_known = (250, bytes('%s %s' %
Guido van Rossum5a23cc52007-08-30 14:02:43 +0000380 (name, smtplib.quoteaddr(email)),
381 "ascii"))
Guido van Rossum04110fb2007-08-24 16:32:05 +0000382 self.assertEqual(smtp.vrfy(email), expected_known)
383
384 u = 'nobody@nowhere.com'
Thomas Wouters74e68c72007-08-31 00:20:14 +0000385 expected_unknown = (550, ('No such user: %s'
386 % smtplib.quoteaddr(u)).encode('ascii'))
Guido van Rossum04110fb2007-08-24 16:32:05 +0000387 self.assertEqual(smtp.vrfy(u), expected_unknown)
388 smtp.quit()
389
390 def testEXPN(self):
391 smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
392
393 for listname, members in sim_lists.items():
394 users = []
395 for m in members:
396 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
Guido van Rossum5a23cc52007-08-30 14:02:43 +0000397 expected_known = (250, bytes('\n'.join(users), "ascii"))
Guido van Rossum04110fb2007-08-24 16:32:05 +0000398 self.assertEqual(smtp.expn(listname), expected_known)
399
400 u = 'PSU-Members-List'
401 expected_unknown = (550, b'No access for you!')
402 self.assertEqual(smtp.expn(u), expected_unknown)
403 smtp.quit()
404
405
406
Guido van Rossumd8faa362007-04-27 19:54:29 +0000407def test_main(verbose=None):
Guido van Rossum806c2462007-08-06 23:33:07 +0000408 test_support.run_unittest(GeneralTests, DebuggingServerTests,
Christian Heimes380f7f22008-02-28 11:19:05 +0000409 NonConnectingTests,
Guido van Rossum04110fb2007-08-24 16:32:05 +0000410 BadHELOServerTests, SMTPSimTests)
Guido van Rossumd8faa362007-04-27 19:54:29 +0000411
412if __name__ == '__main__':
413 test_main()