Patch #1443155: Add the incremental codecs support for CJK codecs.
(reviewed by Walter Dörwald)
diff --git a/Lib/test/test_multibytecodec.py b/Lib/test/test_multibytecodec.py
index aef7931..8f9f6e9 100644
--- a/Lib/test/test_multibytecodec.py
+++ b/Lib/test/test_multibytecodec.py
@@ -9,11 +9,106 @@
 from test import test_multibytecodec_support
 import unittest, StringIO, codecs
 
+class Test_MultibyteCodec(unittest.TestCase):
+
+    def test_nullcoding(self):
+        self.assertEqual(''.decode('gb18030'), u'')
+        self.assertEqual(unicode('', 'gb18030'), u'')
+        self.assertEqual(u''.encode('gb18030'), '')
+
+    def test_str_decode(self):
+        self.assertEqual('abcd'.encode('gb18030'), 'abcd')
+
+
+class Test_IncrementalEncoder(unittest.TestCase):
+
+    def test_stateless(self):
+        # cp949 encoder isn't stateful at all.
+        encoder = codecs.getincrementalencoder('cp949')()
+        self.assertEqual(encoder.encode(u'\ud30c\uc774\uc36c \ub9c8\uc744'),
+                         '\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
+        self.assertEqual(encoder.reset(), None)
+        self.assertEqual(encoder.encode(u'\u2606\u223c\u2606', True),
+                         '\xa1\xd9\xa1\xad\xa1\xd9')
+        self.assertEqual(encoder.reset(), None)
+        self.assertEqual(encoder.encode(u'', True), '')
+        self.assertEqual(encoder.encode(u'', False), '')
+        self.assertEqual(encoder.reset(), None)
+
+    def test_stateful(self):
+        # jisx0213 encoder is stateful for a few codepoints. eg)
+        #   U+00E6 => A9DC
+        #   U+00E6 U+0300 => ABC4
+        #   U+0300 => ABDC
+
+        encoder = codecs.getincrementalencoder('jisx0213')()
+        self.assertEqual(encoder.encode(u'\u00e6\u0300'), '\xab\xc4')
+        self.assertEqual(encoder.encode(u'\u00e6'), '')
+        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xc4')
+        self.assertEqual(encoder.encode(u'\u00e6', True), '\xa9\xdc')
+
+        self.assertEqual(encoder.reset(), None)
+        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xdc')
+
+        self.assertEqual(encoder.encode(u'\u00e6'), '')
+        self.assertEqual(encoder.encode('', True), '\xa9\xdc')
+        self.assertEqual(encoder.encode('', True), '')
+
+    def test_stateful_keep_buffer(self):
+        encoder = codecs.getincrementalencoder('jisx0213')()
+        self.assertEqual(encoder.encode(u'\u00e6'), '')
+        self.assertRaises(UnicodeEncodeError, encoder.encode, u'\u0123')
+        self.assertEqual(encoder.encode(u'\u0300\u00e6'), '\xab\xc4')
+        self.assertRaises(UnicodeEncodeError, encoder.encode, u'\u0123')
+        self.assertEqual(encoder.reset(), None)
+        self.assertEqual(encoder.encode(u'\u0300'), '\xab\xdc')
+        self.assertEqual(encoder.encode(u'\u00e6'), '')
+        self.assertRaises(UnicodeEncodeError, encoder.encode, u'\u0123')
+        self.assertEqual(encoder.encode(u'', True), '\xa9\xdc')
+
+
+class Test_IncrementalDecoder(unittest.TestCase):
+
+    def test_dbcs(self):
+        # cp949 decoder is simple with only 1 or 2 bytes sequences.
+        decoder = codecs.getincrementaldecoder('cp949')()
+        self.assertEqual(decoder.decode('\xc6\xc4\xc0\xcc\xbd'),
+                         u'\ud30c\uc774')
+        self.assertEqual(decoder.decode('\xe3 \xb8\xb6\xc0\xbb'),
+                         u'\uc36c \ub9c8\uc744')
+        self.assertEqual(decoder.decode(''), u'')
+
+    def test_dbcs_keep_buffer(self):
+        decoder = codecs.getincrementaldecoder('cp949')()
+        self.assertEqual(decoder.decode('\xc6\xc4\xc0'), u'\ud30c')
+        self.assertRaises(UnicodeDecodeError, decoder.decode, '', True)
+        self.assertEqual(decoder.decode('\xcc'), u'\uc774')
+
+        self.assertEqual(decoder.decode('\xc6\xc4\xc0'), u'\ud30c')
+        self.assertRaises(UnicodeDecodeError, decoder.decode, '\xcc\xbd', True)
+        self.assertEqual(decoder.decode('\xcc'), u'\uc774')
+
+    def test_iso2022(self):
+        decoder = codecs.getincrementaldecoder('iso2022-jp')()
+        ESC = '\x1b'
+        self.assertEqual(decoder.decode(ESC + '('), u'')
+        self.assertEqual(decoder.decode('B', True), u'')
+        self.assertEqual(decoder.decode(ESC + '$'), u'')
+        self.assertEqual(decoder.decode('B@$'), u'\u4e16')
+        self.assertEqual(decoder.decode('@$@'), u'\u4e16')
+        self.assertEqual(decoder.decode('$', True), u'\u4e16')
+        self.assertEqual(decoder.reset(), None)
+        self.assertEqual(decoder.decode('@$'), u'@$')
+        self.assertEqual(decoder.decode(ESC + '$'), u'')
+        self.assertRaises(UnicodeDecodeError, decoder.decode, '', True)
+        self.assertEqual(decoder.decode('B@$'), u'\u4e16')
+
+
 class Test_StreamWriter(unittest.TestCase):
     if len(u'\U00012345') == 2: # UCS2
         def test_gb18030(self):
             s= StringIO.StringIO()
-            c = codecs.lookup('gb18030')[3](s)
+            c = codecs.getwriter('gb18030')(s)
             c.write(u'123')
             self.assertEqual(s.getvalue(), '123')
             c.write(u'\U00012345')
@@ -30,15 +125,16 @@
             self.assertEqual(s.getvalue(),
                     '123\x907\x959\x907\x959\x907\x959\x827\xcf5\x810\x851')
 
-        # standard utf-8 codecs has broken StreamReader
-        if test_multibytecodec_support.__cjkcodecs__:
-            def test_utf_8(self):
-                s= StringIO.StringIO()
-                c = codecs.lookup('utf-8')[3](s)
-                c.write(u'123')
-                self.assertEqual(s.getvalue(), '123')
-                c.write(u'\U00012345')
-                self.assertEqual(s.getvalue(), '123\xf0\x92\x8d\x85')
+        def test_utf_8(self):
+            s= StringIO.StringIO()
+            c = codecs.getwriter('utf-8')(s)
+            c.write(u'123')
+            self.assertEqual(s.getvalue(), '123')
+            c.write(u'\U00012345')
+            self.assertEqual(s.getvalue(), '123\xf0\x92\x8d\x85')
+
+            # Python utf-8 codec can't buffer surrogate pairs yet.
+            if 0:
                 c.write(u'\U00012345'[0])
                 self.assertEqual(s.getvalue(), '123\xf0\x92\x8d\x85')
                 c.write(u'\U00012345'[1] + u'\U00012345' + u'\uac00\u00ac')
@@ -61,14 +157,6 @@
     else: # UCS4
         pass
 
-    def test_nullcoding(self):
-        self.assertEqual(''.decode('gb18030'), u'')
-        self.assertEqual(unicode('', 'gb18030'), u'')
-        self.assertEqual(u''.encode('gb18030'), '')
-
-    def test_str_decode(self):
-        self.assertEqual('abcd'.encode('gb18030'), 'abcd')
-
     def test_streamwriter_strwrite(self):
         s = StringIO.StringIO()
         wr = codecs.getwriter('gb18030')(s)
@@ -83,6 +171,9 @@
 
 def test_main():
     suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(Test_MultibyteCodec))
+    suite.addTest(unittest.makeSuite(Test_IncrementalEncoder))
+    suite.addTest(unittest.makeSuite(Test_IncrementalDecoder))
     suite.addTest(unittest.makeSuite(Test_StreamWriter))
     suite.addTest(unittest.makeSuite(Test_ISO2022))
     test_support.run_suite(suite)
diff --git a/Lib/test/test_multibytecodec_support.py b/Lib/test/test_multibytecodec_support.py
index 45a63e7..563a3ea 100644
--- a/Lib/test/test_multibytecodec_support.py
+++ b/Lib/test/test_multibytecodec_support.py
@@ -3,15 +3,12 @@
 # test_multibytecodec_support.py
 #   Common Unittest Routines for CJK codecs
 #
-# $CJKCodecs: test_multibytecodec_support.py,v 1.6 2004/06/19 06:09:55 perky Exp $
 
 import sys, codecs, os.path
 import unittest
 from test import test_support
 from StringIO import StringIO
 
-__cjkcodecs__ = 0 # define this as 0 for python
-
 class TestBase:
     encoding        = ''   # codec name
     codec           = None # codec tuple (with 4 elements)
@@ -21,11 +18,17 @@
     roundtriptest   = 1    # set if roundtrip is possible with unicode
     has_iso10646    = 0    # set if this encoding contains whole iso10646 map
     xmlcharnametest = None # string to test xmlcharrefreplace
+    unmappedunicode = u'\udeee' # a unicode codepoint that is not mapped.
 
     def setUp(self):
         if self.codec is None:
             self.codec = codecs.lookup(self.encoding)
-        self.encode, self.decode, self.reader, self.writer = self.codec
+        self.encode = self.codec.encode
+        self.decode = self.codec.decode
+        self.reader = self.codec.streamreader
+        self.writer = self.codec.streamwriter
+        self.incrementalencoder = self.codec.incrementalencoder
+        self.incrementaldecoder = self.codec.incrementaldecoder
 
     def test_chunkcoding(self):
         for native, utf8 in zip(*[StringIO(f).readlines()
@@ -47,51 +50,142 @@
             else:
                 self.assertRaises(UnicodeError, func, source, scheme)
 
-    if sys.hexversion >= 0x02030000:
-        def test_xmlcharrefreplace(self):
-            if self.has_iso10646:
-                return
+    def test_xmlcharrefreplace(self):
+        if self.has_iso10646:
+            return
 
-            s = u"\u0b13\u0b23\u0b60 nd eggs"
-            self.assertEqual(
-                self.encode(s, "xmlcharrefreplace")[0],
-                "ଓଣୠ nd eggs"
-            )
+        s = u"\u0b13\u0b23\u0b60 nd eggs"
+        self.assertEqual(
+            self.encode(s, "xmlcharrefreplace")[0],
+            "ଓଣୠ nd eggs"
+        )
 
-        def test_customreplace(self):
-            if self.has_iso10646:
-                return
+    def test_customreplace(self):
+        if self.has_iso10646:
+            return
 
-            import htmlentitydefs
+        from htmlentitydefs import codepoint2name
 
-            names = {}
-            for (key, value) in htmlentitydefs.entitydefs.items():
-                if len(value)==1:
-                    names[value.decode('latin-1')] = self.decode(key)[0]
+        def xmlcharnamereplace(exc):
+            if not isinstance(exc, UnicodeEncodeError):
+                raise TypeError("don't know how to handle %r" % exc)
+            l = []
+            for c in exc.object[exc.start:exc.end]:
+                if ord(c) in codepoint2name:
+                    l.append(u"&%s;" % codepoint2name[ord(c)])
                 else:
-                    names[unichr(int(value[2:-1]))] = self.decode(key)[0]
+                    l.append(u"&#%d;" % ord(c))
+            return (u"".join(l), exc.end)
 
-            def xmlcharnamereplace(exc):
-                if not isinstance(exc, UnicodeEncodeError):
-                    raise TypeError("don't know how to handle %r" % exc)
-                l = []
-                for c in exc.object[exc.start:exc.end]:
-                    try:
-                        l.append(u"&%s;" % names[c])
-                    except KeyError:
-                        l.append(u"&#%d;" % ord(c))
-                return (u"".join(l), exc.end)
+        codecs.register_error("test.xmlcharnamereplace", xmlcharnamereplace)
 
-            codecs.register_error(
-                "test.xmlcharnamereplace", xmlcharnamereplace)
+        if self.xmlcharnametest:
+            sin, sout = self.xmlcharnametest
+        else:
+            sin = u"\xab\u211c\xbb = \u2329\u1234\u232a"
+            sout = "«ℜ» = ⟨ሴ⟩"
+        self.assertEqual(self.encode(sin,
+                                    "test.xmlcharnamereplace")[0], sout)
 
-            if self.xmlcharnametest:
-                sin, sout = self.xmlcharnametest
+    def test_callback_wrong_objects(self):
+        def myreplace(exc):
+            return (ret, exc.end)
+        codecs.register_error("test.cjktest", myreplace)
+
+        for ret in ([1, 2, 3], [], None, object(), 'string', ''):
+            self.assertRaises(TypeError, self.encode, self.unmappedunicode,
+                              'test.cjktest')
+
+    def test_callback_None_index(self):
+        def myreplace(exc):
+            return (u'x', None)
+        codecs.register_error("test.cjktest", myreplace)
+        self.assertRaises(TypeError, self.encode, self.unmappedunicode,
+                          'test.cjktest')
+
+    def test_callback_backward_index(self):
+        def myreplace(exc):
+            if myreplace.limit > 0:
+                myreplace.limit -= 1
+                return (u'REPLACED', 0)
             else:
-                sin = u"\xab\u211c\xbb = \u2329\u1234\u232a"
-                sout = "«ℜ» = ⟨ሴ⟩"
-            self.assertEqual(self.encode(sin,
-                                        "test.xmlcharnamereplace")[0], sout)
+                return (u'TERMINAL', exc.end)
+        myreplace.limit = 3
+        codecs.register_error("test.cjktest", myreplace)
+        self.assertEqual(self.encode(u'abcd' + self.unmappedunicode + u'efgh',
+                                     'test.cjktest'),
+                ('abcdREPLACEDabcdREPLACEDabcdREPLACEDabcdTERMINALefgh', 9))
+
+    def test_callback_forward_index(self):
+        def myreplace(exc):
+            return (u'REPLACED', exc.end + 2)
+        codecs.register_error("test.cjktest", myreplace)
+        self.assertEqual(self.encode(u'abcd' + self.unmappedunicode + u'efgh',
+                                     'test.cjktest'), ('abcdREPLACEDgh', 9))
+
+    def test_callback_index_outofbound(self):
+        def myreplace(exc):
+            return (u'TERM', 100)
+        codecs.register_error("test.cjktest", myreplace)
+        self.assertRaises(IndexError, self.encode, self.unmappedunicode,
+                          'test.cjktest')
+
+    def test_incrementalencoder(self):
+        UTF8Reader = codecs.getreader('utf-8')
+        for sizehint in [None] + range(1, 33) + \
+                        [64, 128, 256, 512, 1024]:
+            istream = UTF8Reader(StringIO(self.tstring[1]))
+            ostream = StringIO()
+            encoder = self.incrementalencoder()
+            while 1:
+                if sizehint is not None:
+                    data = istream.read(sizehint)
+                else:
+                    data = istream.read()
+
+                if not data:
+                    break
+                e = encoder.encode(data)
+                ostream.write(e)
+
+            self.assertEqual(ostream.getvalue(), self.tstring[0])
+
+    def test_incrementaldecoder(self):
+        UTF8Writer = codecs.getwriter('utf-8')
+        for sizehint in [None, -1] + range(1, 33) + \
+                        [64, 128, 256, 512, 1024]:
+            istream = StringIO(self.tstring[0])
+            ostream = UTF8Writer(StringIO())
+            decoder = self.incrementaldecoder()
+            while 1:
+                data = istream.read(sizehint)
+                if not data:
+                    break
+                else:
+                    u = decoder.decode(data)
+                    ostream.write(u)
+
+            self.assertEqual(ostream.getvalue(), self.tstring[1])
+
+    def test_incrementalencoder_error_callback(self):
+        inv = self.unmappedunicode
+
+        e = self.incrementalencoder()
+        self.assertRaises(UnicodeEncodeError, e.encode, inv, True)
+
+        e.errors = 'ignore'
+        self.assertEqual(e.encode(inv, True), '')
+
+        e.reset()
+        def tempreplace(exc):
+            return (u'called', exc.end)
+        codecs.register_error('test.incremental_error_callback', tempreplace)
+        e.errors = 'test.incremental_error_callback'
+        self.assertEqual(e.encode(inv, True), 'called')
+
+        # again
+        e.errors = 'ignore'
+        self.assertEqual(e.encode(inv, True), '')
 
     def test_streamreader(self):
         UTF8Writer = codecs.getwriter('utf-8')
@@ -113,11 +207,7 @@
                 self.assertEqual(ostream.getvalue(), self.tstring[1])
 
     def test_streamwriter(self):
-        if __cjkcodecs__:
-            readfuncs = ('read', 'readline', 'readlines')
-        else:
-            # standard utf8 codec has broken readline and readlines.
-            readfuncs = ('read',)
+        readfuncs = ('read', 'readline', 'readlines')
         UTF8Reader = codecs.getreader('utf-8')
         for name in readfuncs:
             for sizehint in [None] + range(1, 33) + \
@@ -211,10 +301,5 @@
             self.assertEqual(unicode(csetch, self.encoding), unich)
 
 def load_teststring(encoding):
-    if __cjkcodecs__:
-        etxt = open(os.path.join('sampletexts', encoding) + '.txt').read()
-        utxt = open(os.path.join('sampletexts', encoding) + '.utf8').read()
-        return (etxt, utxt)
-    else:
-        from test import cjkencodings_test
-        return cjkencodings_test.teststring[encoding]
+    from test import cjkencodings_test
+    return cjkencodings_test.teststring[encoding]