#4661: add bytes parsing and generation to email (email version bump to 5.1.0)

The work on this is not 100% complete, but everything is present to
allow real-world testing of the code.  The only remaining major todo
item is to (hopefully!) enhance the handling of non-ASCII bytes in headers
converted to unicode by RFC2047 encoding them rather than replacing them with
'?'s.
diff --git a/Lib/email/test/test_email.py b/Lib/email/test/test_email.py
index 8f95125..e5e51c6 100644
--- a/Lib/email/test/test_email.py
+++ b/Lib/email/test/test_email.py
@@ -9,8 +9,9 @@
 import difflib
 import unittest
 import warnings
+import textwrap
 
-from io import StringIO
+from io import StringIO, BytesIO
 from itertools import chain
 
 import email
@@ -34,7 +35,7 @@
 from email import base64mime
 from email import quoprimime
 
-from test.support import findfile, run_unittest
+from test.support import findfile, run_unittest, unlink
 from email.test import __file__ as landmark
 
 
@@ -2070,6 +2071,10 @@
         msg, text = self._msgobj('msg_36.txt')
         self._idempotent(msg, text)
 
+    def test_message_signed_idempotent(self):
+        msg, text = self._msgobj('msg_45.txt')
+        self._idempotent(msg, text)
+
     def test_content_type(self):
         eq = self.assertEquals
         unless = self.assertTrue
@@ -2186,7 +2191,8 @@
         all.sort()
         self.assertEqual(all, [
             'base64mime', 'charset', 'encoders', 'errors', 'generator',
-            'header', 'iterators', 'message', 'message_from_file',
+            'header', 'iterators', 'message', 'message_from_binary_file',
+            'message_from_bytes', 'message_from_file',
             'message_from_string', 'mime', 'parser',
             'quoprimime', 'utils',
             ])
@@ -2687,6 +2693,266 @@
         self.assertTrue(msg.get_payload(0).get_payload().endswith('\r\n'))
 
 
+class Test8BitBytesHandling(unittest.TestCase):
+    # In Python3 all input is string, but that doesn't work if the actual input
+    # uses an 8bit transfer encoding.  To hack around that, in email 5.1 we
+    # decode byte streams using the surrogateescape error handler, and
+    # reconvert to binary at appropriate places if we detect surrogates.  This
+    # doesn't allow us to transform headers with 8bit bytes (they get munged),
+    # but it does allow us to parse and preserve them, and to decode body
+    # parts that use an 8bit CTE.
+
+    bodytest_msg = textwrap.dedent("""\
+        From: foo@bar.com
+        To: baz
+        Mime-Version: 1.0
+        Content-Type: text/plain; charset={charset}
+        Content-Transfer-Encoding: {cte}
+
+        {bodyline}
+        """)
+
+    def test_known_8bit_CTE(self):
+        m = self.bodytest_msg.format(charset='utf-8',
+                                     cte='8bit',
+                                     bodyline='pöstal').encode('utf-8')
+        msg = email.message_from_bytes(m)
+        self.assertEqual(msg.get_payload(), "pöstal\n")
+        self.assertEqual(msg.get_payload(decode=True),
+                         "pöstal\n".encode('utf-8'))
+
+    def test_unknown_8bit_CTE(self):
+        m = self.bodytest_msg.format(charset='notavalidcharset',
+                                     cte='8bit',
+                                     bodyline='pöstal').encode('utf-8')
+        msg = email.message_from_bytes(m)
+        self.assertEqual(msg.get_payload(), "p��stal\n")
+        self.assertEqual(msg.get_payload(decode=True),
+                         "pöstal\n".encode('utf-8'))
+
+    def test_8bit_in_quopri_body(self):
+        # This is non-RFC compliant data...without 'decode' the library code
+        # decodes the body using the charset from the headers, and because the
+        # source byte really is utf-8 this works.  This is likely to fail
+        # against real dirty data (ie: produce mojibake), but the data is
+        # invalid anyway so it is as good a guess as any.  But this means that
+        # this test just confirms the current behavior; that behavior is not
+        # necessarily the best possible behavior.  With 'decode' it is
+        # returning the raw bytes, so that test should be of correct behavior,
+        # or at least produce the same result that email4 did.
+        m = self.bodytest_msg.format(charset='utf-8',
+                                     cte='quoted-printable',
+                                     bodyline='p=C3=B6stál').encode('utf-8')
+        msg = email.message_from_bytes(m)
+        self.assertEqual(msg.get_payload(), 'p=C3=B6stál\n')
+        self.assertEqual(msg.get_payload(decode=True),
+                         'pöstál\n'.encode('utf-8'))
+
+    def test_invalid_8bit_in_non_8bit_cte_uses_replace(self):
+        # This is similar to the previous test, but proves that if the 8bit
+        # byte is undecodeable in the specified charset, it gets replaced
+        # by the unicode 'unknown' character.  Again, this may or may not
+        # be the ideal behavior.  Note that if decode=False none of the
+        # decoders will get involved, so this is the only test we need
+        # for this behavior.
+        m = self.bodytest_msg.format(charset='ascii',
+                                     cte='quoted-printable',
+                                     bodyline='p=C3=B6stál').encode('utf-8')
+        msg = email.message_from_bytes(m)
+        self.assertEqual(msg.get_payload(), 'p=C3=B6st��l\n')
+        self.assertEqual(msg.get_payload(decode=True),
+                        'pöstál\n'.encode('utf-8'))
+
+    def test_8bit_in_base64_body(self):
+        # Sticking an 8bit byte in a base64 block makes it undecodable by
+        # normal means, so the block is returned undecoded, but as bytes.
+        m = self.bodytest_msg.format(charset='utf-8',
+                                     cte='base64',
+                                     bodyline='cMO2c3RhbAá=').encode('utf-8')
+        msg = email.message_from_bytes(m)
+        self.assertEqual(msg.get_payload(decode=True),
+                         'cMO2c3RhbAá=\n'.encode('utf-8'))
+
+    def test_8bit_in_uuencode_body(self):
+        # Sticking an 8bit byte in a uuencode block makes it undecodable by
+        # normal means, so the block is returned undecoded, but as bytes.
+        m = self.bodytest_msg.format(charset='utf-8',
+                                     cte='uuencode',
+                                     bodyline='<,.V<W1A; á ').encode('utf-8')
+        msg = email.message_from_bytes(m)
+        self.assertEqual(msg.get_payload(decode=True),
+                         '<,.V<W1A; á \n'.encode('utf-8'))
+
+
+    headertest_msg = textwrap.dedent("""\
+        From: foo@bar.com
+        To: báz
+        Subject: Maintenant je vous présente mon collègue, le pouf célèbre
+        \tJean de Baddie
+        From: göst
+
+        Yes, they are flying.
+        """).encode('utf-8')
+
+    def test_get_8bit_header(self):
+        msg = email.message_from_bytes(self.headertest_msg)
+        self.assertEqual(msg.get('to'), 'b??z')
+        self.assertEqual(msg['to'], 'b??z')
+
+    def test_print_8bit_headers(self):
+        msg = email.message_from_bytes(self.headertest_msg)
+        self.assertEqual(str(msg),
+                         self.headertest_msg.decode(
+                            'ascii', 'replace').replace('�', '?'))
+
+    def test_values_with_8bit_headers(self):
+        msg = email.message_from_bytes(self.headertest_msg)
+        self.assertListEqual(msg.values(),
+                              ['foo@bar.com',
+                               'b??z',
+                               'Maintenant je vous pr??sente mon '
+                                   'coll??gue, le pouf c??l??bre\n'
+                                   '\tJean de Baddie',
+                               "g??st"])
+
+    def test_items_with_8bit_headers(self):
+        msg = email.message_from_bytes(self.headertest_msg)
+        self.assertListEqual(msg.items(),
+                              [('From', 'foo@bar.com'),
+                               ('To', 'b??z'),
+                               ('Subject', 'Maintenant je vous pr??sente mon '
+                                              'coll??gue, le pouf c??l??bre\n'
+                                              '\tJean de Baddie'),
+                               ('From', 'g??st')])
+
+    def test_get_all_with_8bit_headers(self):
+        msg = email.message_from_bytes(self.headertest_msg)
+        self.assertListEqual(msg.get_all('from'),
+                              ['foo@bar.com',
+                               'g??st'])
+
+    non_latin_bin_msg = textwrap.dedent("""\
+        From: foo@bar.com
+        To: báz
+        Subject: Maintenant je vous présente mon collègue, le pouf célèbre
+        \tJean de Baddie
+        Mime-Version: 1.0
+        Content-Type: text/plain; charset="utf-8"
+        Content-Transfer-Encoding: 8bit
+
+        Да, они летят.
+        """).encode('utf-8')
+
+    def test_bytes_generator(self):
+        msg = email.message_from_bytes(self.non_latin_bin_msg)
+        out = BytesIO()
+        email.generator.BytesGenerator(out).flatten(msg)
+        self.assertEqual(out.getvalue(), self.non_latin_bin_msg)
+
+    # XXX: ultimately the '?' should turn into CTE encoded bytes
+    # using 'unknown-8bit' charset.
+    non_latin_bin_msg_as7bit = textwrap.dedent("""\
+        From: foo@bar.com
+        To: b??z
+        Subject: Maintenant je vous pr??sente mon coll??gue, le pouf c??l??bre
+        \tJean de Baddie
+        Mime-Version: 1.0
+        Content-Type: text/plain; charset="utf-8"
+        Content-Transfer-Encoding: base64
+
+        0JTQsCwg0L7QvdC4INC70LXRgtGP0YIuCg==
+        """)
+
+    def test_generator_handles_8bit(self):
+        msg = email.message_from_bytes(self.non_latin_bin_msg)
+        out = StringIO()
+        email.generator.Generator(out).flatten(msg)
+        self.assertEqual(out.getvalue(), self.non_latin_bin_msg_as7bit)
+
+    def test_bytes_generator_with_unix_from(self):
+        # The unixfrom contains a current date, so we can't check it
+        # literally.  Just make sure the first word is 'From' and the
+        # rest of the message matches the input.
+        msg = email.message_from_bytes(self.non_latin_bin_msg)
+        out = BytesIO()
+        email.generator.BytesGenerator(out).flatten(msg, unixfrom=True)
+        lines = out.getvalue().split(b'\n')
+        self.assertEqual(lines[0].split()[0], b'From')
+        self.assertEqual(b'\n'.join(lines[1:]), self.non_latin_bin_msg)
+
+    def test_message_from_binary_file(self):
+        fn = 'test.msg'
+        self.addCleanup(unlink, fn)
+        with open(fn, 'wb') as testfile:
+            testfile.write(self.non_latin_bin_msg)
+        m = email.parser.BytesParser().parse(open(fn, 'rb'))
+        self.assertEqual(str(m), self.non_latin_bin_msg_as7bit)
+
+    latin_bin_msg = textwrap.dedent("""\
+        From: foo@bar.com
+        To: Dinsdale
+        Subject: Nudge nudge, wink, wink
+        Mime-Version: 1.0
+        Content-Type: text/plain; charset="latin-1"
+        Content-Transfer-Encoding: 8bit
+
+        oh là là, know what I mean, know what I mean?
+        """).encode('latin-1')
+
+    latin_bin_msg_as7bit = textwrap.dedent("""\
+        From: foo@bar.com
+        To: Dinsdale
+        Subject: Nudge nudge, wink, wink
+        Mime-Version: 1.0
+        Content-Type: text/plain; charset="iso-8859-1"
+        Content-Transfer-Encoding: quoted-printable
+
+        oh l=E0 l=E0, know what I mean, know what I mean?
+        """)
+
+    def test_string_generator_reencodes_to_quopri_when_appropriate(self):
+        m = email.message_from_bytes(self.latin_bin_msg)
+        self.assertEqual(str(m), self.latin_bin_msg_as7bit)
+
+    def test_decoded_generator_emits_unicode_body(self):
+        m = email.message_from_bytes(self.latin_bin_msg)
+        out = StringIO()
+        email.generator.DecodedGenerator(out).flatten(m)
+        #DecodedHeader output contains an extra blank line compared
+        #to the input message.  RDM: not sure if this is a bug or not,
+        #but it is not specific to the 8bit->7bit conversion.
+        self.assertEqual(out.getvalue(),
+            self.latin_bin_msg.decode('latin-1')+'\n')
+
+    def test_bytes_feedparser(self):
+        bfp = email.feedparser.BytesFeedParser()
+        for i in range(0, len(self.latin_bin_msg), 10):
+            bfp.feed(self.latin_bin_msg[i:i+10])
+        m = bfp.close()
+        self.assertEqual(str(m), self.latin_bin_msg_as7bit)
+
+
+class TestBytesGeneratorIdempotent(TestIdempotent):
+
+    def _msgobj(self, filename):
+        with openfile(filename, 'rb') as fp:
+            data = fp.read()
+        msg = email.message_from_bytes(data)
+        return msg, data
+
+    def _idempotent(self, msg, data):
+        b = BytesIO()
+        g = email.generator.BytesGenerator(b, maxheaderlen=0)
+        g.flatten(msg)
+        self.assertEqual(data, b.getvalue())
+
+    maxDiff = None
+
+    def assertEqual(self, str1, str2):
+        self.assertListEqual(str1.split(b'\n'), str2.split(b'\n'))
+
+
+
 class TestBase64(unittest.TestCase):
     def test_len(self):
         eq = self.assertEqual