Revert my commit 3555cf6f9c98: "Issue #8796: codecs.open() calls the builtin
open() function instead of using StreamReaderWriter. Deprecate StreamReader,
StreamWriter, StreamReaderWriter, StreamRecoder and EncodedFile() of the codec
module. Use the builtin open() function or io.TextIOWrapper instead."
"It has not been approved !" wrote Marc-Andre Lemburg.
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
index 1f46560..c0450e7 100644
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -1,10 +1,7 @@
from test import support
-import _testcapi
-import codecs
-import io
-import sys
import unittest
-import warnings
+import codecs
+import sys, _testcapi, io
class Queue(object):
"""
@@ -66,9 +63,7 @@
# the StreamReader and check that the results equal the appropriate
# entries from partialresults.
q = Queue(b"")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- r = codecs.getreader(self.encoding)(q)
+ r = codecs.getreader(self.encoding)(q)
result = ""
for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
q.write(bytes([c]))
@@ -111,9 +106,7 @@
return codecs.getreader(self.encoding)(stream)
def readalllines(input, keepends=True, size=None):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- reader = getreader(input)
+ reader = getreader(input)
lines = []
while True:
line = reader.readline(size=size, keepends=keepends)
@@ -222,18 +215,14 @@
' \r\n',
]
stream = io.BytesIO("".join(s).encode(self.encoding))
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- reader = codecs.getreader(self.encoding)(stream)
+ reader = codecs.getreader(self.encoding)(stream)
for (i, line) in enumerate(reader):
self.assertEqual(line, s[i])
def test_readlinequeue(self):
q = Queue(b"")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- writer = codecs.getwriter(self.encoding)(q)
- reader = codecs.getreader(self.encoding)(q)
+ writer = codecs.getwriter(self.encoding)(q)
+ reader = codecs.getreader(self.encoding)(q)
# No lineends
writer.write("foo\r")
@@ -264,9 +253,7 @@
s = (s1+s2+s3).encode(self.encoding)
stream = io.BytesIO(s)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- reader = codecs.getreader(self.encoding)(stream)
+ reader = codecs.getreader(self.encoding)(stream)
self.assertEqual(reader.readline(), s1)
self.assertEqual(reader.readline(), s2)
self.assertEqual(reader.readline(), s3)
@@ -281,9 +268,7 @@
s = (s1+s2+s3+s4+s5).encode(self.encoding)
stream = io.BytesIO(s)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- reader = codecs.getreader(self.encoding)(stream)
+ reader = codecs.getreader(self.encoding)(stream)
self.assertEqual(reader.readline(), s1)
self.assertEqual(reader.readline(), s2)
self.assertEqual(reader.readline(), s3)
@@ -305,9 +290,7 @@
_,_,reader,writer = codecs.lookup(self.encoding)
# encode some stream
s = io.BytesIO()
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f = writer(s)
+ f = writer(s)
f.write("spam")
f.write("spam")
d = s.getvalue()
@@ -315,22 +298,16 @@
self.assertTrue(d == self.spamle or d == self.spambe)
# try to read it back
s = io.BytesIO(d)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f = reader(s)
+ f = reader(s)
self.assertEqual(f.read(), "spamspam")
def test_badbom(self):
s = io.BytesIO(4*b"\xff")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f = codecs.getreader(self.encoding)(s)
+ f = codecs.getreader(self.encoding)(s)
self.assertRaises(UnicodeError, f.read)
s = io.BytesIO(8*b"\xff")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f = codecs.getreader(self.encoding)(s)
+ f = codecs.getreader(self.encoding)(s)
self.assertRaises(UnicodeError, f.read)
def test_partial(self):
@@ -477,9 +454,7 @@
_,_,reader,writer = codecs.lookup(self.encoding)
# encode some stream
s = io.BytesIO()
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f = writer(s)
+ f = writer(s)
f.write("spam")
f.write("spam")
d = s.getvalue()
@@ -487,22 +462,16 @@
self.assertTrue(d == self.spamle or d == self.spambe)
# try to read it back
s = io.BytesIO(d)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f = reader(s)
+ f = reader(s)
self.assertEqual(f.read(), "spamspam")
def test_badbom(self):
s = io.BytesIO(b"\xff\xff")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f = codecs.getreader(self.encoding)(s)
+ f = codecs.getreader(self.encoding)(s)
self.assertRaises(UnicodeError, f.read)
s = io.BytesIO(b"\xff\xff\xff\xff")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f = codecs.getreader(self.encoding)(s)
+ f = codecs.getreader(self.encoding)(s)
self.assertRaises(UnicodeError, f.read)
def test_partial(self):
@@ -548,8 +517,7 @@
self.addCleanup(support.unlink, support.TESTFN)
with open(support.TESTFN, 'wb') as fp:
fp.write(s)
- with codecs.open(support.TESTFN, 'U',
- encoding=self.encoding) as reader:
+ with codecs.open(support.TESTFN, 'U', encoding=self.encoding) as reader:
self.assertEqual(reader.read(), s1)
class UTF16LETest(ReadTest):
@@ -737,9 +705,7 @@
reader = codecs.getreader("utf-8-sig")
for sizehint in [None] + list(range(1, 11)) + \
[64, 128, 256, 512, 1024]:
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- istream = reader(io.BytesIO(bytestring))
+ istream = reader(io.BytesIO(bytestring))
ostream = io.StringIO()
while 1:
if sizehint is not None:
@@ -761,9 +727,7 @@
reader = codecs.getreader("utf-8-sig")
for sizehint in [None] + list(range(1, 11)) + \
[64, 128, 256, 512, 1024]:
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- istream = reader(io.BytesIO(bytestring))
+ istream = reader(io.BytesIO(bytestring))
ostream = io.StringIO()
while 1:
if sizehint is not None:
@@ -785,9 +749,7 @@
class RecodingTest(unittest.TestCase):
def test_recoding(self):
f = io.BytesIO()
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
+ f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
f2.write("a")
f2.close()
# Python used to crash on this at exit because of a refcount
@@ -1164,9 +1126,7 @@
self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
def test_stream(self):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- r = codecs.getreader("idna")(io.BytesIO(b"abc"))
+ r = codecs.getreader("idna")(io.BytesIO(b"abc"))
r.read(3)
self.assertEqual(r.read(), "")
@@ -1273,24 +1233,18 @@
class StreamReaderTest(unittest.TestCase):
def setUp(self):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- self.reader = codecs.getreader('utf-8')
+ self.reader = codecs.getreader('utf-8')
self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
def test_readlines(self):
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- f = self.reader(self.stream)
+ f = self.reader(self.stream)
self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
class EncodedFileTest(unittest.TestCase):
def test_basic(self):
f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
+ ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
f = io.BytesIO()
@@ -1434,9 +1388,7 @@
if encoding not in broken_unicode_with_streams:
# check stream reader/writer
q = Queue(b"")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- writer = codecs.getwriter(encoding)(q)
+ writer = codecs.getwriter(encoding)(q)
encodedresult = b""
for c in s:
writer.write(c)
@@ -1444,9 +1396,7 @@
self.assertTrue(type(chunk) is bytes, type(chunk))
encodedresult += chunk
q = Queue(b"")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- reader = codecs.getreader(encoding)(q)
+ reader = codecs.getreader(encoding)(q)
decodedresult = ""
for c in encodedresult:
q.write(bytes([c]))
@@ -1520,9 +1470,7 @@
continue
if encoding in broken_unicode_with_streams:
continue
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
+ reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
for t in range(5):
# Test that calling seek resets the internal codec state and buffers
reader.seek(0, 0)
@@ -1591,19 +1539,15 @@
class WithStmtTest(unittest.TestCase):
def test_encodedfile(self):
f = io.BytesIO(b"\xc3\xbc")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
- self.assertEqual(ef.read(), b"\xfc")
+ with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
+ self.assertEqual(ef.read(), b"\xfc")
def test_streamreaderwriter(self):
f = io.BytesIO(b"\xc3\xbc")
info = codecs.lookup("utf-8")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- with codecs.StreamReaderWriter(f, info.streamreader,
- info.streamwriter, 'strict') as srw:
- self.assertEqual(srw.read(), "\xfc")
+ with codecs.StreamReaderWriter(f, info.streamreader,
+ info.streamwriter, 'strict') as srw:
+ self.assertEqual(srw.read(), "\xfc")
class TypesTest(unittest.TestCase):
def test_decode_unicode(self):
@@ -1700,15 +1644,15 @@
# (StreamWriter) Check that the BOM is written after a seek(0)
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
- f.write(data[0])
- self.assertNotEqual(f.tell(), 0)
- f.seek(0)
- f.write(data)
+ f.writer.write(data[0])
+ self.assertNotEqual(f.writer.tell(), 0)
+ f.writer.seek(0)
+ f.writer.write(data)
f.seek(0)
self.assertEqual(f.read(), data)
- # Check that the BOM is not written after a seek() at a
- # position different than the start
+ # Check that the BOM is not written after a seek() at a position
+ # different than the start
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
f.write(data)
f.seek(f.tell())
@@ -1716,12 +1660,12 @@
f.seek(0)
self.assertEqual(f.read(), data * 2)
- # (StreamWriter) Check that the BOM is not written after a
- # seek() at a position different than the start
+ # (StreamWriter) Check that the BOM is not written after a seek()
+ # at a position different than the start
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
- f.write(data)
- f.seek(f.tell())
- f.write(data)
+ f.writer.write(data)
+ f.writer.seek(f.writer.tell())
+ f.writer.write(data)
f.seek(0)
self.assertEqual(f.read(), data * 2)
@@ -1760,9 +1704,7 @@
def test_read(self):
for encoding in bytes_transform_encodings:
sin = codecs.encode(b"\x80", encoding)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- reader = codecs.getreader(encoding)(io.BytesIO(sin))
+ reader = codecs.getreader(encoding)(io.BytesIO(sin))
sout = reader.read()
self.assertEqual(sout, b"\x80")
@@ -1771,9 +1713,7 @@
if encoding in ['uu_codec', 'zlib_codec']:
continue
sin = codecs.encode(b"\x80", encoding)
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", DeprecationWarning)
- reader = codecs.getreader(encoding)(io.BytesIO(sin))
+ reader = codecs.getreader(encoding)(io.BytesIO(sin))
sout = reader.readline()
self.assertEqual(sout, b"\x80")