#10321: Add support for sending binary DATA and Message objects to smtplib
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index 6390a86..795586a 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -1,9 +1,11 @@
import asyncore
+import email.mime.text
import email.utils
import socket
import smtpd
import smtplib
import io
+import re
import sys
import time
import select
@@ -57,6 +59,13 @@
def tearDown(self):
smtplib.socket = socket
+ # This method is no longer used but is retained for backward compatibility,
+ # so test to make sure it still works.
+ def testQuoteData(self):
+ teststr = "abc\n.jkl\rfoo\r\n..blue"
+ expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
+ self.assertEqual(expected, smtplib.quotedata(teststr))
+
def testBasic1(self):
mock_socket.reply_with(b"220 Hola mundo")
# connects
@@ -150,6 +159,8 @@
@unittest.skipUnless(threading, 'Threading required for this test.')
class DebuggingServerTests(unittest.TestCase):
+ maxDiff = None
+
def setUp(self):
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
@@ -161,6 +172,9 @@
self._threads = support.threading_setup()
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
+ # Capture SMTPChannel debug output
+ self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
+ smtpd.DEBUGSTREAM = io.StringIO()
# Pick a random unused port by passing 0 for the port number
self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
# Keep a note of what port was assigned
@@ -183,6 +197,9 @@
support.threading_cleanup(*self._threads)
# restore sys.stdout
sys.stdout = self.old_stdout
+ # restore DEBUGSTREAM
+ smtpd.DEBUGSTREAM.close()
+ smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
def testBasic(self):
# connect
@@ -247,6 +264,95 @@
mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
self.assertEqual(self.output.getvalue(), mexpect)
+ def testSendBinary(self):
+ m = b'A test message'
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
+ smtp.sendmail('John', 'Sally', m)
+ # XXX (see comment in testSend)
+ time.sleep(0.01)
+ smtp.quit()
+
+ self.client_evt.set()
+ self.serv_evt.wait()
+ self.output.flush()
+ mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
+ self.assertEqual(self.output.getvalue(), mexpect)
+
+ def testSendMessage(self):
+ m = email.mime.text.MIMEText('A test message')
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
+ smtp.send_message(m, from_addr='John', to_addrs='Sally')
+ # XXX (see comment in testSend)
+ time.sleep(0.01)
+ smtp.quit()
+
+ self.client_evt.set()
+ self.serv_evt.wait()
+ self.output.flush()
+ # Add the X-Peer header that DebuggingServer adds
+ # XXX: I'm not sure hardcoding this IP will work on linux-vserver.
+ m['X-Peer'] = '127.0.0.1'
+ mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
+ self.assertEqual(self.output.getvalue(), mexpect)
+
+ def testSendMessageWithAddresses(self):
+ m = email.mime.text.MIMEText('A test message')
+ m['From'] = 'foo@bar.com'
+ m['To'] = 'John'
+ m['CC'] = 'Sally, Fred'
+ m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
+ smtp.send_message(m)
+ # XXX (see comment in testSend)
+ time.sleep(0.01)
+ smtp.quit()
+
+ self.client_evt.set()
+ self.serv_evt.wait()
+ self.output.flush()
+ # Add the X-Peer header that DebuggingServer adds
+ # XXX: I'm not sure hardcoding this IP will work on linux-vserver.
+ m['X-Peer'] = '127.0.0.1'
+ # The Bcc header is deleted before serialization.
+ del m['Bcc']
+ mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
+ self.assertEqual(self.output.getvalue(), mexpect)
+ debugout = smtpd.DEBUGSTREAM.getvalue()
+ sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
+ self.assertRegexpMatches(debugout, sender)
+ for addr in ('John', 'Sally', 'Fred', 'root@localhost',
+ 'warped@silly.walks.com'):
+ to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
+ re.MULTILINE)
+ self.assertRegexpMatches(debugout, to_addr)
+
+ def testSendMessageWithSomeAddresses(self):
+ # Make sure nothing breaks if not all of the three 'to' headers exist
+ m = email.mime.text.MIMEText('A test message')
+ m['From'] = 'foo@bar.com'
+ m['To'] = 'John, Dinsdale'
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
+ smtp.send_message(m)
+ # XXX (see comment in testSend)
+ time.sleep(0.01)
+ smtp.quit()
+
+ self.client_evt.set()
+ self.serv_evt.wait()
+ self.output.flush()
+ # Add the X-Peer header that DebuggingServer adds
+ # XXX: I'm not sure hardcoding this IP will work on linux-vserver.
+ m['X-Peer'] = '127.0.0.1'
+ mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
+ self.assertEqual(self.output.getvalue(), mexpect)
+ debugout = smtpd.DEBUGSTREAM.getvalue()
+ sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
+ self.assertRegexpMatches(debugout, sender)
+ for addr in ('John', 'Dinsdale'):
+ to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
+ re.MULTILINE)
+ self.assertRegexpMatches(debugout, to_addr)
+
class NonConnectingTests(unittest.TestCase):