blob: df0bf4c1b009bf1d6cf0679bf2dd5a3c8025591f [file] [log] [blame]
Facundo Batista16ed5b42007-07-24 21:20:42 +00001import asyncore
Facundo Batista366d6262007-03-28 18:25:54 +00002import socket
3import threading
Facundo Batista16ed5b42007-07-24 21:20:42 +00004import smtpd
Facundo Batista366d6262007-03-28 18:25:54 +00005import smtplib
Facundo Batista16ed5b42007-07-24 21:20:42 +00006import StringIO
7import sys
Facundo Batista366d6262007-03-28 18:25:54 +00008import time
Facundo Batista16ed5b42007-07-24 21:20:42 +00009import select
Facundo Batista366d6262007-03-28 18:25:54 +000010
11from unittest import TestCase
12from test import test_support
13
Facundo Batista16ed5b42007-07-24 21:20:42 +000014HOST = "localhost"
15PORT = 54328
Facundo Batista366d6262007-03-28 18:25:54 +000016
Facundo Batista16ed5b42007-07-24 21:20:42 +000017def server(evt, buf):
Facundo Batista366d6262007-03-28 18:25:54 +000018 serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
19 serv.settimeout(3)
20 serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
Facundo Batista16ed5b42007-07-24 21:20:42 +000021 serv.bind(("", PORT))
Facundo Batista366d6262007-03-28 18:25:54 +000022 serv.listen(5)
23 try:
24 conn, addr = serv.accept()
25 except socket.timeout:
26 pass
27 else:
Facundo Batista16ed5b42007-07-24 21:20:42 +000028 n = 200
29 while buf and n > 0:
30 r, w, e = select.select([], [conn], [])
31 if w:
32 sent = conn.send(buf)
33 buf = buf[sent:]
34
35 n -= 1
36 time.sleep(0.01)
37
Facundo Batista366d6262007-03-28 18:25:54 +000038 conn.close()
39 finally:
40 serv.close()
41 evt.set()
42
43class GeneralTests(TestCase):
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000044
Facundo Batista366d6262007-03-28 18:25:54 +000045 def setUp(self):
46 self.evt = threading.Event()
Facundo Batista16ed5b42007-07-24 21:20:42 +000047 servargs = (self.evt, "220 Hola mundo\n")
48 threading.Thread(target=server, args=servargs).start()
Facundo Batista366d6262007-03-28 18:25:54 +000049 time.sleep(.1)
50
51 def tearDown(self):
52 self.evt.wait()
53
Facundo Batista16ed5b42007-07-24 21:20:42 +000054 def testBasic1(self):
Facundo Batista366d6262007-03-28 18:25:54 +000055 # connects
Facundo Batista16ed5b42007-07-24 21:20:42 +000056 smtp = smtplib.SMTP(HOST, PORT)
Facundo Batista366d6262007-03-28 18:25:54 +000057 smtp.sock.close()
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000058
Facundo Batista16ed5b42007-07-24 21:20:42 +000059 def testBasic2(self):
60 # connects, include port in host name
61 smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
62 smtp.sock.close()
63
64 def testLocalHostName(self):
65 # check that supplied local_hostname is used
66 smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
67 self.assertEqual(smtp.local_hostname, "testhost")
68 smtp.sock.close()
69
70 def testNonnumericPort(self):
71 # check that non-numeric port raises ValueError
72 self.assertRaises(socket.error, smtplib.SMTP, "localhost", "bogus")
73
Facundo Batista366d6262007-03-28 18:25:54 +000074 def testTimeoutDefault(self):
75 # default
Facundo Batista16ed5b42007-07-24 21:20:42 +000076 smtp = smtplib.SMTP(HOST, PORT)
Facundo Batista366d6262007-03-28 18:25:54 +000077 self.assertTrue(smtp.sock.gettimeout() is None)
78 smtp.sock.close()
Neal Norwitz0d4c06e2007-04-25 06:30:05 +000079
Facundo Batista366d6262007-03-28 18:25:54 +000080 def testTimeoutValue(self):
81 # a value
Facundo Batista16ed5b42007-07-24 21:20:42 +000082 smtp = smtplib.SMTP(HOST, PORT, timeout=30)
Facundo Batista366d6262007-03-28 18:25:54 +000083 self.assertEqual(smtp.sock.gettimeout(), 30)
84 smtp.sock.close()
85
86 def testTimeoutNone(self):
87 # None, having other default
88 previous = socket.getdefaulttimeout()
89 socket.setdefaulttimeout(30)
90 try:
Facundo Batista16ed5b42007-07-24 21:20:42 +000091 smtp = smtplib.SMTP(HOST, PORT, timeout=None)
Facundo Batista366d6262007-03-28 18:25:54 +000092 finally:
93 socket.setdefaulttimeout(previous)
94 self.assertEqual(smtp.sock.gettimeout(), 30)
95 smtp.sock.close()
96
97
Facundo Batista16ed5b42007-07-24 21:20:42 +000098# Test server using smtpd.DebuggingServer
99def debugging_server(evt):
100 serv = smtpd.DebuggingServer(("", PORT), ('nowhere', -1))
101
102 try:
103 asyncore.loop(timeout=.01, count=300)
104 except socket.timeout:
105 pass
106 finally:
107 # allow some time for the client to read the result
108 time.sleep(0.5)
109 asyncore.close_all()
110 evt.set()
111
112MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
113MSG_END = '------------ END MESSAGE ------------\n'
114
115# Test behavior of smtpd.DebuggingServer
116class DebuggingServerTests(TestCase):
117
118 def setUp(self):
119 self.old_stdout = sys.stdout
120 self.output = StringIO.StringIO()
121 sys.stdout = self.output
122
123 self.evt = threading.Event()
124 threading.Thread(target=debugging_server, args=(self.evt,)).start()
125 time.sleep(.5)
126
127 def tearDown(self):
128 self.evt.wait()
129 sys.stdout = self.old_stdout
130
131 def testBasic(self):
132 # connect
133 smtp = smtplib.SMTP(HOST, PORT)
134 smtp.sock.close()
135
136 def testEHLO(self):
137 smtp = smtplib.SMTP(HOST, PORT)
138 self.assertEqual(smtp.ehlo(), (502, 'Error: command "EHLO" not implemented'))
139 smtp.sock.close()
140
141 def testHELP(self):
142 smtp = smtplib.SMTP(HOST, PORT)
143 self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
144 smtp.sock.close()
145
146 def testSend(self):
147 # connect and send mail
148 m = 'A test message'
149 smtp = smtplib.SMTP(HOST, PORT)
150 smtp.sendmail('John', 'Sally', m)
151 smtp.sock.close()
152
153 self.evt.wait()
154 self.output.flush()
155 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
156 self.assertEqual(self.output.getvalue(), mexpect)
157
158
159class BadHELOServerTests(TestCase):
160
161 def setUp(self):
162 self.old_stdout = sys.stdout
163 self.output = StringIO.StringIO()
164 sys.stdout = self.output
165
166 self.evt = threading.Event()
167 servargs = (self.evt, "199 no hello for you!\n")
168 threading.Thread(target=server, args=servargs).start()
169 time.sleep(.5)
170
171 def tearDown(self):
172 self.evt.wait()
173 sys.stdout = self.old_stdout
174
175 def testFailingHELO(self):
176 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, HOST, PORT)
Facundo Batista366d6262007-03-28 18:25:54 +0000177
178def test_main(verbose=None):
Facundo Batista16ed5b42007-07-24 21:20:42 +0000179 test_support.run_unittest(GeneralTests, DebuggingServerTests, BadHELOServerTests)
Facundo Batista366d6262007-03-28 18:25:54 +0000180
181if __name__ == '__main__':
182 test_main()