#9124: mailbox now accepts binary input and uses binary internally

Although this patch contains API changes and is rather weighty for an
RC phase, the mailbox module was essentially unusable without the patch
since it would produce UnicodeErrors when handling non-ascii input
at arbitrary and somewhat mysterious places, and any non-trivial amount
of email processing will encounter messages with non-ascii bytes.
The release manager approved the patch application.

The changes allow binary input, and reject non-ASCII string input early
with a useful message instead of failing mysteriously later.  Binary
is used internally for reading and writing the mailbox files.  StringIO
and Text file input are deprecated.

Initial patch by Victor Stinner, validated and expanded by R. David Murray.
diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
index de6d4bd..8e4c57a 100644
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -7,8 +7,10 @@
 import email.message
 import re
 import io
+import tempfile
 from test import support
 import unittest
+import textwrap
 import mailbox
 import glob
 try:
@@ -48,6 +50,8 @@
 
 class TestMailbox(TestBase):
 
+    maxDiff = None
+
     _factory = None     # Overridden by subclasses to reuse tests
     _template = 'From: foo\n\n%s'
 
@@ -69,14 +73,108 @@
         self.assertEqual(len(self._box), 2)
         keys.append(self._box.add(email.message_from_string(_sample_message)))
         self.assertEqual(len(self._box), 3)
-        keys.append(self._box.add(io.StringIO(_sample_message)))
+        keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
         self.assertEqual(len(self._box), 4)
         keys.append(self._box.add(_sample_message))
         self.assertEqual(len(self._box), 5)
+        keys.append(self._box.add(_bytes_sample_message))
+        self.assertEqual(len(self._box), 6)
+        with self.assertWarns(DeprecationWarning):
+            keys.append(self._box.add(
+                io.TextIOWrapper(io.BytesIO(_bytes_sample_message))))
+        self.assertEqual(len(self._box), 7)
         self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
-        for i in (1, 2, 3, 4):
+        for i in (1, 2, 3, 4, 5, 6):
             self._check_sample(self._box[keys[i]])
 
+    _nonascii_msg = textwrap.dedent("""\
+            From: foo
+            Subject: Falinaptár házhozszállítással. Már rendeltél?
+
+            0
+            """)
+
+    def test_add_invalid_8bit_bytes_header(self):
+        key = self._box.add(self._nonascii_msg.encode('latin1'))
+        self.assertEqual(len(self._box), 1)
+        self.assertEqual(self._box.get_bytes(key),
+            self._nonascii_msg.encode('latin1'))
+
+    def test_invalid_nonascii_header_as_string(self):
+        subj = self._nonascii_msg.splitlines()[1]
+        key = self._box.add(subj.encode('latin1'))
+        self.assertEqual(self._box.get_string(key),
+            'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
+            'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
+
+    def test_add_nonascii_header_raises(self):
+        with self.assertRaisesRegex(ValueError, "ASCII-only"):
+            self._box.add(self._nonascii_msg)
+
+    _non_latin_bin_msg = textwrap.dedent("""\
+        From: foo@bar.com
+        To: báz
+        Subject: Maintenant je vous présente mon collègue, le pouf célèbre
+        \tJean de Baddie
+        Mime-Version: 1.0
+        Content-Type: text/plain; charset="utf-8"
+        Content-Transfer-Encoding: 8bit
+
+        Да, они летят.
+        """).encode('utf-8')
+
+    def test_add_8bit_body(self):
+        key = self._box.add(self._non_latin_bin_msg)
+        self.assertEqual(self._box.get_bytes(key),
+                         self._non_latin_bin_msg)
+        with self._box.get_file(key) as f:
+            self.assertEqual(f.read(),
+                             self._non_latin_bin_msg.replace(b'\n',
+                                os.linesep.encode()))
+        self.assertEqual(self._box[key].get_payload(),
+                        "Да, они летят.\n")
+
+    def test_add_binary_file(self):
+        with tempfile.TemporaryFile('wb+') as f:
+            f.write(_bytes_sample_message)
+            f.seek(0)
+            key = self._box.add(f)
+        # See issue 11062
+        if not isinstance(self._box, mailbox.Babyl):
+            self.assertEqual(self._box.get_bytes(key).split(b'\n'),
+                _bytes_sample_message.split(b'\n'))
+
+    def test_add_binary_nonascii_file(self):
+        with tempfile.TemporaryFile('wb+') as f:
+            f.write(self._non_latin_bin_msg)
+            f.seek(0)
+            key = self._box.add(f)
+        # See issue 11062
+        if not isinstance(self._box, mailbox.Babyl):
+            self.assertEqual(self._box.get_bytes(key).split(b'\n'),
+                self._non_latin_bin_msg.split(b'\n'))
+
+    def test_add_text_file_warns(self):
+        with tempfile.TemporaryFile('w+') as f:
+            f.write(_sample_message)
+            f.seek(0)
+            with self.assertWarns(DeprecationWarning):
+                key = self._box.add(f)
+        # See issue 11062
+        if not isinstance(self._box, mailbox.Babyl):
+            self.assertEqual(self._box.get_bytes(key).split(b'\n'),
+                _bytes_sample_message.split(b'\n'))
+
+    def test_add_StringIO_warns(self):
+        with self.assertWarns(DeprecationWarning):
+            key = self._box.add(io.StringIO(self._template % "0"))
+        self.assertEqual(self._box.get_string(key), self._template % "0")
+
+    def test_add_nonascii_StringIO_raises(self):
+        with self.assertWarns(DeprecationWarning):
+            with self.assertRaisesRegex(ValueError, "ASCII-only"):
+                self._box.add(io.StringIO(self._nonascii_msg))
+
     def test_remove(self):
         # Remove messages using remove()
         self._test_remove_or_delitem(self._box.remove)
@@ -154,12 +252,21 @@
         self.assertEqual(msg0.get_payload(), '0')
         self._check_sample(self._box.get_message(key1))
 
+    def test_get_bytes(self):
+        # Get bytes representations of messages
+        key0 = self._box.add(self._template % 0)
+        key1 = self._box.add(_sample_message)
+        self.assertEqual(self._box.get_bytes(key0),
+            (self._template % 0).encode('ascii'))
+        self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
+
     def test_get_string(self):
         # Get string representations of messages
         key0 = self._box.add(self._template % 0)
         key1 = self._box.add(_sample_message)
         self.assertEqual(self._box.get_string(key0), self._template % 0)
-        self.assertEqual(self._box.get_string(key1), _sample_message)
+        self.assertEqual(self._box.get_string(key1).split('\n'),
+                         _sample_message.split('\n'))
 
     def test_get_file(self):
         # Get file representations of messages
@@ -169,9 +276,9 @@
             data0 = file.read()
         with self._box.get_file(key1) as file:
             data1 = file.read()
-        self.assertEqual(data0.replace(os.linesep, '\n'),
+        self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
                          self._template % 0)
-        self.assertEqual(data1.replace(os.linesep, '\n'),
+        self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
                          _sample_message)
 
     def test_iterkeys(self):
@@ -405,11 +512,12 @@
     def test_dump_message(self):
         # Write message representations to disk
         for input in (email.message_from_string(_sample_message),
-                      _sample_message, io.StringIO(_sample_message)):
-            output = io.StringIO()
+                      _sample_message, io.BytesIO(_bytes_sample_message)):
+            output = io.BytesIO()
             self._box._dump_message(input, output)
-            self.assertEqual(output.getvalue(), _sample_message)
-        output = io.StringIO()
+            self.assertEqual(output.getvalue(),
+                _bytes_sample_message.replace(b'\n', os.linesep.encode()))
+        output = io.BytesIO()
         self.assertRaises(TypeError,
                           lambda: self._box._dump_message(None, output))
 
@@ -439,6 +547,7 @@
         self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
         self.assertRaises(NotImplementedError, lambda: box.get_message(''))
         self.assertRaises(NotImplementedError, lambda: box.get_string(''))
+        self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
         self.assertRaises(NotImplementedError, lambda: box.get_file(''))
         self.assertRaises(NotImplementedError, lambda: '' in box)
         self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
@@ -640,9 +749,9 @@
                              "Host name mismatch: '%s' should be '%s'" %
                              (groups[4], hostname))
             previous_groups = groups
-            tmp_file.write(_sample_message)
+            tmp_file.write(_bytes_sample_message)
             tmp_file.seek(0)
-            self.assertEqual(tmp_file.read(), _sample_message)
+            self.assertEqual(tmp_file.read(), _bytes_sample_message)
             tmp_file.close()
         file_count = len(os.listdir(os.path.join(self._path, "tmp")))
         self.assertEqual(file_count, repetitions,
@@ -787,6 +896,12 @@
         self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
         self.assertEqual(self._box[key].get_payload(), '0')
 
+    def test_add_from_bytes(self):
+        # Add a byte string starting with 'From ' to the mailbox
+        key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0')
+        self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
+        self.assertEqual(self._box[key].get_payload(), '0')
+
     def test_add_mbox_or_mmdf_message(self):
         # Add an mboxMessage or MMDFMessage
         for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
@@ -817,7 +932,7 @@
         self._box._file.seek(0)
         contents = self._box._file.read()
         self._box.close()
-        with open(self._path, 'r', newline='') as f:
+        with open(self._path, 'rb') as f:
             self.assertEqual(contents, f.read())
         self._box = self._factory(self._path)
 
@@ -1087,6 +1202,15 @@
             self._post_initialize_hook(msg)
             self._check_sample(msg)
 
+    def test_initialize_with_binary_file(self):
+        # Initialize based on contents of binary file
+        with open(self._path, 'wb+') as f:
+            f.write(_bytes_sample_message)
+            f.seek(0)
+            msg = self._factory(f)
+            self._post_initialize_hook(msg)
+            self._check_sample(msg)
+
     def test_initialize_with_nothing(self):
         # Initialize without arguments
         msg = self._factory()
@@ -1363,6 +1487,14 @@
             msg_plain = mailbox.Message(msg)
             self._check_sample(msg_plain)
 
+    def test_x_from_bytes(self):
+        # Convert all formats to Message
+        for class_ in (mailbox.Message, mailbox.MaildirMessage,
+                       mailbox.mboxMessage, mailbox.MHMessage,
+                       mailbox.BabylMessage, mailbox.MMDFMessage):
+            msg = class_(_bytes_sample_message)
+            self._check_sample(msg)
+
     def test_x_to_invalid(self):
         # Convert all formats to an invalid format
         for class_ in (mailbox.Message, mailbox.MaildirMessage,
@@ -1908,6 +2040,8 @@
 --NMuMz9nt05w80d4+--
 """
 
+_bytes_sample_message = _sample_message.encode('ascii')
+
 _sample_headers = {
     "Return-Path":"<gkj@gregorykjohnson.com>",
     "X-Original-To":"gkj+person@localhost",