blob: bcb37d7d839712bb862bb4e32faf9b5d55a34532 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Antoine Pitrou87695762008-08-14 22:44:29 +00007import threading
8import random
Guido van Rossum28524c72007-02-27 05:47:44 +00009import unittest
Antoine Pitrou87695762008-08-14 22:44:29 +000010from itertools import chain, cycle
Benjamin Petersonee8712c2008-05-20 21:35:26 +000011from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000012
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000013import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000014import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000015
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum53807da2007-04-10 19:01:47 +000017class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000018
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000019 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000022
23 def read(self, n=None):
24 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000025 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000027 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000028
Guido van Rossum01a27522007-03-07 01:00:12 +000029 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000030 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000031 return len(b)
32
33 def writable(self):
34 return True
35
Guido van Rossum68bbcd22007-02-27 17:19:33 +000036 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000043 return True
44
Guido van Rossum01a27522007-03-07 01:00:12 +000045 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000049 return 42
50
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Guido van Rossum78892e42007-04-06 17:31:18 +000052class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
Guido van Rossum01a27522007-03-07 01:00:12 +000064class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000065
Guido van Rossum53807da2007-04-10 19:01:47 +000066 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000068 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000069
Guido van Rossum01a27522007-03-07 01:00:12 +000070 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000072 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000074 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000075 else:
76 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000077
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def writable(self):
79 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080
Guido van Rossuma9e20242007-03-08 00:43:48 +000081
Guido van Rossum28524c72007-02-27 05:47:44 +000082class IOTest(unittest.TestCase):
83
Neal Norwitze7789b12008-03-24 06:18:09 +000084 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000085 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +000086
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000087 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000088 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000089
Guido van Rossum28524c72007-02-27 05:47:44 +000090 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000091 self.assertEqual(f.write(b"blah."), 5)
92 self.assertEqual(f.seek(0), 0)
93 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000094 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000095 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000096 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000097 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000098 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000099 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000100 self.assertEqual(f.seek(-1, 2), 13)
101 self.assertEqual(f.tell(), 13)
102 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000103 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000104 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000105
Guido van Rossum9b76da62007-04-11 01:09:03 +0000106 def read_ops(self, f, buffered=False):
107 data = f.read(5)
108 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000109 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000110 self.assertEqual(f.readinto(data), 5)
111 self.assertEqual(data, b" worl")
112 self.assertEqual(f.readinto(data), 2)
113 self.assertEqual(len(data), 5)
114 self.assertEqual(data[:2], b"d\n")
115 self.assertEqual(f.seek(0), 0)
116 self.assertEqual(f.read(20), b"hello world\n")
117 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000118 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000119 self.assertEqual(f.seek(-6, 2), 6)
120 self.assertEqual(f.read(5), b"world")
121 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000122 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000123 self.assertEqual(f.seek(-6, 1), 5)
124 self.assertEqual(f.read(5), b" worl")
125 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000126 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000127 if buffered:
128 f.seek(0)
129 self.assertEqual(f.read(), b"hello world\n")
130 f.seek(6)
131 self.assertEqual(f.read(), b"world\n")
132 self.assertEqual(f.read(), b"")
133
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 LARGE = 2**31
135
Guido van Rossum53807da2007-04-10 19:01:47 +0000136 def large_file_ops(self, f):
137 assert f.readable()
138 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.seek(self.LARGE), self.LARGE)
140 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000141 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000142 self.assertEqual(f.tell(), self.LARGE + 3)
143 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000144 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000145 self.assertEqual(f.tell(), self.LARGE + 2)
146 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000147 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000148 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000149 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
150 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000151 self.assertEqual(f.read(2), b"x")
152
Guido van Rossum28524c72007-02-27 05:47:44 +0000153 def test_raw_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 f = io.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000155 self.assertEqual(f.readable(), False)
156 self.assertEqual(f.writable(), True)
157 self.assertEqual(f.seekable(), True)
158 self.write_ops(f)
159 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000160 f = io.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000161 self.assertEqual(f.readable(), True)
162 self.assertEqual(f.writable(), False)
163 self.assertEqual(f.seekable(), True)
164 self.read_ops(f)
165 f.close()
166
Guido van Rossum87429772007-04-10 21:06:59 +0000167 def test_buffered_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000168 f = io.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000169 self.assertEqual(f.readable(), False)
170 self.assertEqual(f.writable(), True)
171 self.assertEqual(f.seekable(), True)
172 self.write_ops(f)
173 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000174 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000175 self.assertEqual(f.readable(), True)
176 self.assertEqual(f.writable(), False)
177 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000178 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000179 f.close()
180
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000181 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000182 f = io.open(support.TESTFN, "wb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000183 f.write(b"abc\ndef\nxyzzy\nfoo")
184 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000185 f = io.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000186 self.assertEqual(f.readline(), b"abc\n")
187 self.assertEqual(f.readline(10), b"def\n")
188 self.assertEqual(f.readline(2), b"xy")
189 self.assertEqual(f.readline(4), b"zzy\n")
190 self.assertEqual(f.readline(), b"foo")
191 f.close()
192
Guido van Rossum28524c72007-02-27 05:47:44 +0000193 def test_raw_bytes_io(self):
194 f = io.BytesIO()
195 self.write_ops(f)
196 data = f.getvalue()
197 self.assertEqual(data, b"hello world\n")
198 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000199 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000200
Guido van Rossum53807da2007-04-10 19:01:47 +0000201 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000202 # On Windows and Mac OSX this test comsumes large resources; It takes
203 # a long time to build the >2GB file and takes >2GB of disk space
204 # therefore the resource must be enabled to run this test.
205 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000206 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000207 print("\nTesting large file ops skipped on %s." % sys.platform,
208 file=sys.stderr)
209 print("It requires %d bytes and a long time." % self.LARGE,
210 file=sys.stderr)
211 print("Use 'regrtest.py -u largefile test_io' to run it.",
212 file=sys.stderr)
213 return
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000214 f = io.open(support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000215 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000216 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000217 f = io.open(support.TESTFN, "w+b")
Guido van Rossum87429772007-04-10 21:06:59 +0000218 self.large_file_ops(f)
219 f.close()
220
221 def test_with_open(self):
222 for bufsize in (0, 1, 100):
223 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000225 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000226 self.assertEqual(f.closed, True)
227 f = None
228 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000230 1/0
231 except ZeroDivisionError:
232 self.assertEqual(f.closed, True)
233 else:
234 self.fail("1/0 didn't raise an exception")
235
236 def test_destructor(self):
237 record = []
238 class MyFileIO(io.FileIO):
239 def __del__(self):
240 record.append(1)
241 io.FileIO.__del__(self)
242 def close(self):
243 record.append(2)
244 io.FileIO.close(self)
245 def flush(self):
246 record.append(3)
247 io.FileIO.flush(self)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000248 f = MyFileIO(support.TESTFN, "w")
Guido van Rossum87429772007-04-10 21:06:59 +0000249 f.write("xxx")
250 del f
251 self.assertEqual(record, [1, 2, 3])
252
253 def test_close_flushes(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000254 f = io.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000255 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000256 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000257 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000258 self.assertEqual(f.read(), b"xxx")
259 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000260
Guido van Rossumd4103952007-04-12 05:44:49 +0000261 def test_array_writes(self):
262 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000263 n = len(memoryview(a))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 f = io.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000265 self.assertEqual(f.write(a), n)
266 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 f = io.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000268 self.assertEqual(f.write(a), n)
269 f.close()
270
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000271 def test_closefd(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 self.assertRaises(ValueError, io.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000273 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000275class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000276
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000277 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000278 buf = self.buftype("1234567890")
279 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000280
281 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000282 buf = self.buftype("1234567890")
283 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000284
285 self.assertEquals(buf[:1], bytesIo.read(1))
286 self.assertEquals(buf[1:5], bytesIo.read(4))
287 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000288 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000289
290 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000291 buf = self.buftype("1234567890")
292 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000293
294 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000295 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000296
297 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000298 buf = self.buftype("1234567890")
299 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000300
301 bytesIo.read(5)
302 bytesIo.seek(0)
303 self.assertEquals(buf, bytesIo.read())
304
305 bytesIo.seek(3)
306 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000307 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000308
309 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000310 buf = self.buftype("1234567890")
311 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000312
313 self.assertEquals(0, bytesIo.tell())
314 bytesIo.seek(5)
315 self.assertEquals(5, bytesIo.tell())
316 bytesIo.seek(10000)
317 self.assertEquals(10000, bytesIo.tell())
318
Guido van Rossuma9e20242007-03-08 00:43:48 +0000319
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000320class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000321 @staticmethod
322 def buftype(s):
323 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000324 ioclass = io.BytesIO
325 EOF = b""
326
327
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000328class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000329 buftype = str
330 ioclass = io.StringIO
331 EOF = ""
332
333
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000334class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000335
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000336 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000337 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000338 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000339
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000340 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000341
Guido van Rossum78892e42007-04-06 17:31:18 +0000342 def testBuffering(self):
343 data = b"abcdefghi"
344 dlen = len(data)
345
346 tests = [
347 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
348 [ 100, [ 3, 3, 3], [ dlen ] ],
349 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
350 ]
351
352 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000353 rawio = MockFileIO(data)
354 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000355 pos = 0
356 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000357 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000358 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000359 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000360
Guido van Rossum01a27522007-03-07 01:00:12 +0000361 def testReadNonBlocking(self):
362 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000363 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000364 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000365
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000366 self.assertEquals(b"abcd", bufio.read(6))
367 self.assertEquals(b"e", bufio.read(1))
368 self.assertEquals(b"fg", bufio.read())
369 self.assert_(None is bufio.read())
370 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000371
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000372 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000373 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000374 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000375
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000376 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000377
378 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000379 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000381
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000382 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000383
384 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000385 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000386 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000388 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000389
390 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000391 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000392 # this test. Else, write it.
393 pass
394
Antoine Pitrou87695762008-08-14 22:44:29 +0000395 def testThreads(self):
396 try:
397 # Write out many bytes with exactly the same number of 0's,
398 # 1's... 255's. This will help us check that concurrent reading
399 # doesn't duplicate or forget contents.
400 N = 1000
401 l = list(range(256)) * N
402 random.shuffle(l)
403 s = bytes(bytearray(l))
404 with io.open(support.TESTFN, "wb") as f:
405 f.write(s)
406 with io.open(support.TESTFN, "rb", buffering=0) as raw:
407 bufio = io.BufferedReader(raw, 8)
408 errors = []
409 results = []
410 def f():
411 try:
412 # Intra-buffer read then buffer-flushing read
413 for n in cycle([1, 19]):
414 s = bufio.read(n)
415 if not s:
416 break
417 # list.append() is atomic
418 results.append(s)
419 except Exception as e:
420 errors.append(e)
421 raise
422 threads = [threading.Thread(target=f) for x in range(20)]
423 for t in threads:
424 t.start()
425 time.sleep(0.02) # yield
426 for t in threads:
427 t.join()
428 self.assertFalse(errors,
429 "the following exceptions were caught: %r" % errors)
430 s = b''.join(results)
431 for i in range(256):
432 c = bytes(bytearray([i]))
433 self.assertEqual(s.count(c), N)
434 finally:
435 support.unlink(support.TESTFN)
436
437
Guido van Rossuma9e20242007-03-08 00:43:48 +0000438
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000439class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000440
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000441 def testWrite(self):
442 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000443 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000444 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000445
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000446 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000447
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000448 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000449
450 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000451 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000452 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000453
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000454 bufio.write(b"abc")
455 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000456
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000457 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000458
Guido van Rossum01a27522007-03-07 01:00:12 +0000459 def testWriteNonBlocking(self):
460 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000461 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000462
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000463 bufio.write(b"asdf")
464 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000465 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
466
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000467 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000468 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000469 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000470 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
471 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
472
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000473 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000474
Guido van Rossum78892e42007-04-06 17:31:18 +0000475 # XXX I don't like this test. It relies too heavily on how the
476 # algorithm actually works, which we might change. Refactor
477 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000478
479 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000480 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000481 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000482
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000483 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000484
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000485 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000486 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000487 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000488
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000489 bufio.write(b"abc")
490 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000491
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000492 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000493
Antoine Pitrou87695762008-08-14 22:44:29 +0000494 def testThreads(self):
495 # BufferedWriter should not raise exceptions or crash
496 # when called from multiple threads.
497 try:
498 # We use a real file object because it allows us to
499 # exercise situations where the GIL is released before
500 # writing the buffer to the raw streams. This is in addition
501 # to concurrency issues due to switching threads in the middle
502 # of Python code.
503 with io.open(support.TESTFN, "wb", buffering=0) as raw:
504 bufio = io.BufferedWriter(raw, 8)
505 errors = []
506 def f():
507 try:
508 # Write enough bytes to flush the buffer
509 s = b"a" * 19
510 for i in range(50):
511 bufio.write(s)
512 except Exception as e:
513 errors.append(e)
514 raise
515 threads = [threading.Thread(target=f) for x in range(20)]
516 for t in threads:
517 t.start()
518 time.sleep(0.02) # yield
519 for t in threads:
520 t.join()
521 self.assertFalse(errors,
522 "the following exceptions were caught: %r" % errors)
523 finally:
524 support.unlink(support.TESTFN)
525
Guido van Rossuma9e20242007-03-08 00:43:48 +0000526
Guido van Rossum01a27522007-03-07 01:00:12 +0000527class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000528
Guido van Rossum01a27522007-03-07 01:00:12 +0000529 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000530 r = MockRawIO(())
531 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000532 pair = io.BufferedRWPair(r, w)
533
534 # XXX need implementation
535
Guido van Rossuma9e20242007-03-08 00:43:48 +0000536
Guido van Rossum78892e42007-04-06 17:31:18 +0000537class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000538
Guido van Rossum01a27522007-03-07 01:00:12 +0000539 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000540 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000541 rw = io.BufferedRandom(raw, 8, 12)
542
543 self.assertEqual(b"as", rw.read(2))
544 rw.write(b"ddd")
545 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000546 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000547 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000548 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000549
550 def testSeekAndTell(self):
551 raw = io.BytesIO(b"asdfghjkl")
552 rw = io.BufferedRandom(raw)
553
554 self.assertEquals(b"as", rw.read(2))
555 self.assertEquals(2, rw.tell())
556 rw.seek(0, 0)
557 self.assertEquals(b"asdf", rw.read(4))
558
559 rw.write(b"asdf")
560 rw.seek(0, 0)
561 self.assertEquals(b"asdfasdfl", rw.read())
562 self.assertEquals(9, rw.tell())
563 rw.seek(-4, 2)
564 self.assertEquals(5, rw.tell())
565 rw.seek(2, 1)
566 self.assertEquals(7, rw.tell())
567 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000568 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000569
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000570# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
571# properties:
572# - A single output character can correspond to many bytes of input.
573# - The number of input bytes to complete the character can be
574# undetermined until the last input byte is received.
575# - The number of input bytes can vary depending on previous input.
576# - A single input byte can correspond to many characters of output.
577# - The number of output characters can be undetermined until the
578# last input byte is received.
579# - The number of output characters can vary depending on previous input.
580
581class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
582 """
583 For testing seek/tell behavior with a stateful, buffering decoder.
584
585 Input is a sequence of words. Words may be fixed-length (length set
586 by input) or variable-length (period-terminated). In variable-length
587 mode, extra periods are ignored. Possible words are:
588 - 'i' followed by a number sets the input length, I (maximum 99).
589 When I is set to 0, words are space-terminated.
590 - 'o' followed by a number sets the output length, O (maximum 99).
591 - Any other word is converted into a word followed by a period on
592 the output. The output word consists of the input word truncated
593 or padded out with hyphens to make its length equal to O. If O
594 is 0, the word is output verbatim without truncating or padding.
595 I and O are initially set to 1. When I changes, any buffered input is
596 re-scanned according to the new I. EOF also terminates the last word.
597 """
598
599 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +0000600 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000601 self.reset()
602
603 def __repr__(self):
604 return '<SID %x>' % id(self)
605
606 def reset(self):
607 self.i = 1
608 self.o = 1
609 self.buffer = bytearray()
610
611 def getstate(self):
612 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
613 return bytes(self.buffer), i*100 + o
614
615 def setstate(self, state):
616 buffer, io = state
617 self.buffer = bytearray(buffer)
618 i, o = divmod(io, 100)
619 self.i, self.o = i ^ 1, o ^ 1
620
621 def decode(self, input, final=False):
622 output = ''
623 for b in input:
624 if self.i == 0: # variable-length, terminated with period
625 if b == ord('.'):
626 if self.buffer:
627 output += self.process_word()
628 else:
629 self.buffer.append(b)
630 else: # fixed-length, terminate after self.i bytes
631 self.buffer.append(b)
632 if len(self.buffer) == self.i:
633 output += self.process_word()
634 if final and self.buffer: # EOF terminates the last word
635 output += self.process_word()
636 return output
637
638 def process_word(self):
639 output = ''
640 if self.buffer[0] == ord('i'):
641 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
642 elif self.buffer[0] == ord('o'):
643 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
644 else:
645 output = self.buffer.decode('ascii')
646 if len(output) < self.o:
647 output += '-'*self.o # pad out with hyphens
648 if self.o:
649 output = output[:self.o] # truncate to output length
650 output += '.'
651 self.buffer = bytearray()
652 return output
653
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000654 codecEnabled = False
655
656 @classmethod
657 def lookupTestDecoder(cls, name):
658 if cls.codecEnabled and name == 'test_decoder':
659 return codecs.CodecInfo(
660 name='test_decoder', encode=None, decode=None,
661 incrementalencoder=None,
662 streamreader=None, streamwriter=None,
663 incrementaldecoder=cls)
664
665# Register the previous decoder for testing.
666# Disabled by default, tests will enable it.
667codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
668
669
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000670class StatefulIncrementalDecoderTest(unittest.TestCase):
671 """
672 Make sure the StatefulIncrementalDecoder actually works.
673 """
674
675 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000676 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000677 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000678 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000679 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000680 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000681 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000682 # I=0, O=6 (variable-length input, fixed-length output)
683 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
684 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000685 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000686 # I=6, O=3 (fixed-length input > fixed-length output)
687 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
688 # I=0, then 3; O=29, then 15 (with longer output)
689 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
690 'a----------------------------.' +
691 'b----------------------------.' +
692 'cde--------------------------.' +
693 'abcdefghijabcde.' +
694 'a.b------------.' +
695 '.c.------------.' +
696 'd.e------------.' +
697 'k--------------.' +
698 'l--------------.' +
699 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000700 ]
701
702 def testDecoder(self):
703 # Try a few one-shot test cases.
704 for input, eof, output in self.test_cases:
705 d = StatefulIncrementalDecoder()
706 self.assertEquals(d.decode(input, eof), output)
707
708 # Also test an unfinished decode, followed by forcing EOF.
709 d = StatefulIncrementalDecoder()
710 self.assertEquals(d.decode(b'oiabcd'), '')
711 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000712
713class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000714
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000715 def setUp(self):
716 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
717 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000718 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000719
Guido van Rossumd0712812007-04-11 16:32:43 +0000720 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000721 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000722
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000723 def testLineBuffering(self):
724 r = io.BytesIO()
725 b = io.BufferedWriter(r, 1000)
726 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
727 t.write("X")
728 self.assertEquals(r.getvalue(), b"") # No flush happened
729 t.write("Y\nZ")
730 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
731 t.write("A\rB")
732 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
733
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000734 def testEncodingErrorsReading(self):
735 # (1) default
736 b = io.BytesIO(b"abc\n\xff\n")
737 t = io.TextIOWrapper(b, encoding="ascii")
738 self.assertRaises(UnicodeError, t.read)
739 # (2) explicit strict
740 b = io.BytesIO(b"abc\n\xff\n")
741 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
742 self.assertRaises(UnicodeError, t.read)
743 # (3) ignore
744 b = io.BytesIO(b"abc\n\xff\n")
745 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
746 self.assertEquals(t.read(), "abc\n\n")
747 # (4) replace
748 b = io.BytesIO(b"abc\n\xff\n")
749 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
750 self.assertEquals(t.read(), "abc\n\ufffd\n")
751
752 def testEncodingErrorsWriting(self):
753 # (1) default
754 b = io.BytesIO()
755 t = io.TextIOWrapper(b, encoding="ascii")
756 self.assertRaises(UnicodeError, t.write, "\xff")
757 # (2) explicit strict
758 b = io.BytesIO()
759 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
760 self.assertRaises(UnicodeError, t.write, "\xff")
761 # (3) ignore
762 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000763 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
764 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000765 t.write("abc\xffdef\n")
766 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000767 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000768 # (4) replace
769 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000770 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
771 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000772 t.write("abc\xffdef\n")
773 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000774 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000775
Guido van Rossum8358db22007-08-18 21:39:55 +0000776 def testNewlinesInput(self):
777 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
778 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
779 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000780 (None, normalized.decode("ascii").splitlines(True)),
781 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000782 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
783 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
784 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
785 ]:
786 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000787 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000788 self.assertEquals(txt.readlines(), expected)
789 txt.seek(0)
790 self.assertEquals(txt.read(), "".join(expected))
791
792 def testNewlinesOutput(self):
793 testdict = {
794 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
795 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
796 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
797 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
798 }
799 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
800 for newline, expected in tests:
801 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000802 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000803 txt.write("AAA\nB")
804 txt.write("BB\nCCC\n")
805 txt.write("X\rY\r\nZ")
806 txt.flush()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000807 self.assertEquals(buf.closed, False)
Guido van Rossum8358db22007-08-18 21:39:55 +0000808 self.assertEquals(buf.getvalue(), expected)
809
Guido van Rossum78892e42007-04-06 17:31:18 +0000810 def testNewlines(self):
811 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
812
813 tests = [
814 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000815 [ '', input_lines ],
816 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
817 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
818 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000819 ]
820
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000821 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000822
Guido van Rossum8358db22007-08-18 21:39:55 +0000823 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000824 # character in TextIOWrapper._pending_line.
825 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000826 # XXX: str.encode() should return bytes
827 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000828 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000829 for bufsize in range(1, 10):
830 for newline, exp_lines in tests:
831 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000832 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000833 encoding=encoding)
834 if do_reads:
835 got_lines = []
836 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000837 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000838 if c2 == '':
839 break
840 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000841 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000842 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000843 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000844
845 for got_line, exp_line in zip(got_lines, exp_lines):
846 self.assertEquals(got_line, exp_line)
847 self.assertEquals(len(got_lines), len(exp_lines))
848
Guido van Rossum8358db22007-08-18 21:39:55 +0000849 def testNewlinesInput(self):
850 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
851 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
852 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000853 (None, normalized.decode("ascii").splitlines(True)),
854 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000855 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
856 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
857 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
858 ]:
859 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000860 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000861 self.assertEquals(txt.readlines(), expected)
862 txt.seek(0)
863 self.assertEquals(txt.read(), "".join(expected))
864
865 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000866 data = "AAA\nBBB\rCCC\n"
867 data_lf = b"AAA\nBBB\rCCC\n"
868 data_cr = b"AAA\rBBB\rCCC\r"
869 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000870 save_linesep = os.linesep
871 try:
872 for os.linesep, newline, expected in [
873 ("\n", None, data_lf),
874 ("\r\n", None, data_crlf),
875 ("\n", "", data_lf),
876 ("\r\n", "", data_lf),
877 ("\n", "\n", data_lf),
878 ("\r\n", "\n", data_lf),
879 ("\n", "\r", data_cr),
880 ("\r\n", "\r", data_cr),
881 ("\n", "\r\n", data_crlf),
882 ("\r\n", "\r\n", data_crlf),
883 ]:
884 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000885 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000886 txt.write(data)
887 txt.close()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000888 self.assertEquals(buf.closed, True)
889 self.assertRaises(ValueError, buf.getvalue)
Guido van Rossum18363582007-08-18 22:05:58 +0000890 finally:
891 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000892
Guido van Rossum9b76da62007-04-11 01:09:03 +0000893 # Systematic tests of the text I/O API
894
895 def testBasicIO(self):
896 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
897 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000898 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000899 f._CHUNK_SIZE = chunksize
900 self.assertEquals(f.write("abc"), 3)
901 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000902 f = io.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000903 f._CHUNK_SIZE = chunksize
904 self.assertEquals(f.tell(), 0)
905 self.assertEquals(f.read(), "abc")
906 cookie = f.tell()
907 self.assertEquals(f.seek(0), 0)
908 self.assertEquals(f.read(2), "ab")
909 self.assertEquals(f.read(1), "c")
910 self.assertEquals(f.read(1), "")
911 self.assertEquals(f.read(), "")
912 self.assertEquals(f.tell(), cookie)
913 self.assertEquals(f.seek(0), 0)
914 self.assertEquals(f.seek(0, 2), cookie)
915 self.assertEquals(f.write("def"), 3)
916 self.assertEquals(f.seek(cookie), cookie)
917 self.assertEquals(f.read(), "def")
918 if enc.startswith("utf"):
919 self.multi_line_test(f, enc)
920 f.close()
921
922 def multi_line_test(self, f, enc):
923 f.seek(0)
924 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000925 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000926 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000927 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000928 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000929 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000930 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000931 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000932 wlines.append((f.tell(), line))
933 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000934 f.seek(0)
935 rlines = []
936 while True:
937 pos = f.tell()
938 line = f.readline()
939 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000940 break
941 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000942 self.assertEquals(rlines, wlines)
943
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000944 def testTelling(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000945 f = io.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000946 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000947 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000948 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000949 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000950 p2 = f.tell()
951 f.seek(0)
952 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000953 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000954 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000955 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000956 self.assertEquals(f.tell(), p2)
957 f.seek(0)
958 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000959 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000960 self.assertRaises(IOError, f.tell)
961 self.assertEquals(f.tell(), p2)
962 f.close()
963
Guido van Rossumd76e7792007-04-17 02:38:04 +0000964 def testSeeking(self):
965 chunk_size = io.TextIOWrapper._CHUNK_SIZE
966 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000967 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000968 prefix = bytes(u_prefix.encode("utf-8"))
969 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000970 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000971 suffix = bytes(u_suffix.encode("utf-8"))
972 line = prefix + suffix
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000973 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +0000974 f.write(line*2)
975 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000976 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +0000977 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000978 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000979 self.assertEquals(f.tell(), prefix_size)
980 self.assertEquals(f.readline(), u_suffix)
981
982 def testSeekingToo(self):
983 # Regression test for a specific bug
984 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000985 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +0000986 f.write(data)
987 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000988 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +0000989 f._CHUNK_SIZE # Just test that it exists
990 f._CHUNK_SIZE = 2
991 f.readline()
992 f.tell()
993
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000994 def testSeekAndTell(self):
995 """Test seek/tell using the StatefulIncrementalDecoder."""
996
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000997 def testSeekAndTellWithData(data, min_pos=0):
998 """Tell/seek to various points within a data stream and ensure
999 that the decoded data returned by read() is consistent."""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001000 f = io.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001001 f.write(data)
1002 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001003 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001004 decoded = f.read()
1005 f.close()
1006
Neal Norwitze2b07052008-03-18 19:52:05 +00001007 for i in range(min_pos, len(decoded) + 1): # seek positions
1008 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001009 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001010 self.assertEquals(f.read(i), decoded[:i])
1011 cookie = f.tell()
1012 self.assertEquals(f.read(j), decoded[i:i + j])
1013 f.seek(cookie)
1014 self.assertEquals(f.read(), decoded[i:])
1015 f.close()
1016
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001017 # Enable the test decoder.
1018 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001019
1020 # Run the tests.
1021 try:
1022 # Try each test case.
1023 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1024 testSeekAndTellWithData(input)
1025
1026 # Position each test case so that it crosses a chunk boundary.
1027 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1028 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1029 offset = CHUNK_SIZE - len(input)//2
1030 prefix = b'.'*offset
1031 # Don't bother seeking into the prefix (takes too long).
1032 min_pos = offset*2
1033 testSeekAndTellWithData(prefix + input, min_pos)
1034
1035 # Ensure our test decoder won't interfere with subsequent tests.
1036 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001037 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001038
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001039 def testEncodedWrites(self):
1040 data = "1234567890"
1041 tests = ("utf-16",
1042 "utf-16-le",
1043 "utf-16-be",
1044 "utf-32",
1045 "utf-32-le",
1046 "utf-32-be")
1047 for encoding in tests:
1048 buf = io.BytesIO()
1049 f = io.TextIOWrapper(buf, encoding=encoding)
1050 # Check if the BOM is written only once (see issue1753).
1051 f.write(data)
1052 f.write(data)
1053 f.seek(0)
1054 self.assertEquals(f.read(), data * 2)
1055 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1056
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001057 def timingTest(self):
1058 timer = time.time
1059 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001060 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001061 nlines = 10000
1062 nchars = len(line)
1063 nbytes = len(line.encode(enc))
1064 for chunk_size in (32, 64, 128, 256):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001065 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001066 f._CHUNK_SIZE = chunk_size
1067 t0 = timer()
1068 for i in range(nlines):
1069 f.write(line)
1070 f.flush()
1071 t1 = timer()
1072 f.seek(0)
1073 for line in f:
1074 pass
1075 t2 = timer()
1076 f.seek(0)
1077 while f.readline():
1078 pass
1079 t3 = timer()
1080 f.seek(0)
1081 while f.readline():
1082 f.tell()
1083 t4 = timer()
1084 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001085 if support.verbose:
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001086 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1087 (nlines, nchars, nbytes))
1088 print("File chunk size: %6s" % f._CHUNK_SIZE)
1089 print("Writing: %6.3f seconds" % (t1-t0))
1090 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1091 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1092 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1093
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001094 def testReadOneByOne(self):
1095 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1096 reads = ""
1097 while True:
1098 c = txt.read(1)
1099 if not c:
1100 break
1101 reads += c
1102 self.assertEquals(reads, "AA\nBB")
1103
1104 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1105 def testReadByChunk(self):
1106 # make sure "\r\n" straddles 128 char boundary.
1107 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1108 reads = ""
1109 while True:
1110 c = txt.read(128)
1111 if not c:
1112 break
1113 reads += c
1114 self.assertEquals(reads, "A"*127+"\nB")
1115
1116 def test_issue1395_1(self):
1117 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1118
1119 # read one char at a time
1120 reads = ""
1121 while True:
1122 c = txt.read(1)
1123 if not c:
1124 break
1125 reads += c
1126 self.assertEquals(reads, self.normalized)
1127
1128 def test_issue1395_2(self):
1129 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1130 txt._CHUNK_SIZE = 4
1131
1132 reads = ""
1133 while True:
1134 c = txt.read(4)
1135 if not c:
1136 break
1137 reads += c
1138 self.assertEquals(reads, self.normalized)
1139
1140 def test_issue1395_3(self):
1141 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1142 txt._CHUNK_SIZE = 4
1143
1144 reads = txt.read(4)
1145 reads += txt.read(4)
1146 reads += txt.readline()
1147 reads += txt.readline()
1148 reads += txt.readline()
1149 self.assertEquals(reads, self.normalized)
1150
1151 def test_issue1395_4(self):
1152 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1153 txt._CHUNK_SIZE = 4
1154
1155 reads = txt.read(4)
1156 reads += txt.read()
1157 self.assertEquals(reads, self.normalized)
1158
1159 def test_issue1395_5(self):
1160 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1161 txt._CHUNK_SIZE = 4
1162
1163 reads = txt.read(4)
1164 pos = txt.tell()
1165 txt.seek(0)
1166 txt.seek(pos)
1167 self.assertEquals(txt.read(4), "BBB\n")
1168
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001169 def test_issue2282(self):
1170 buffer = io.BytesIO(self.testdata)
1171 txt = io.TextIOWrapper(buffer, encoding="ascii")
1172
1173 self.assertEqual(buffer.seekable(), txt.seekable())
1174
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001175 def test_newline_decoder(self):
1176 import codecs
1177 decoder = codecs.getincrementaldecoder("utf-8")()
1178 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1179
1180 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1181
1182 self.assertEquals(decoder.decode(b'\xe8'), "")
1183 self.assertEquals(decoder.decode(b'\xa2'), "")
1184 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
1185
1186 self.assertEquals(decoder.decode(b'\xe8'), "")
1187 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1188
1189 decoder.setstate((b'', 0))
1190 self.assertEquals(decoder.decode(b'\n'), "\n")
1191 self.assertEquals(decoder.decode(b'\r'), "")
1192 self.assertEquals(decoder.decode(b'', final=True), "\n")
1193 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
1194
1195 self.assertEquals(decoder.decode(b'\r'), "")
1196 self.assertEquals(decoder.decode(b'a'), "\na")
1197
1198 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
1199 self.assertEquals(decoder.decode(b'\r'), "")
1200 self.assertEquals(decoder.decode(b'\r'), "\n")
1201 self.assertEquals(decoder.decode(b'\na'), "\na")
1202
1203 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
1204 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1205 self.assertEquals(decoder.decode(b'\n'), "\n")
1206 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
1207 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001208
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001209 decoder = codecs.getincrementaldecoder("utf-8")()
1210 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1211 self.assertEquals(decoder.newlines, None)
1212 decoder.decode(b"abc\n\r")
1213 self.assertEquals(decoder.newlines, '\n')
1214 decoder.decode(b"\nabc")
1215 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1216 decoder.decode(b"abc\r")
1217 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1218 decoder.decode(b"abc")
1219 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1220 decoder.decode(b"abc\r")
1221 decoder.reset()
1222 self.assertEquals(decoder.decode(b"abc"), "abc")
1223 self.assertEquals(decoder.newlines, None)
1224
Guido van Rossum01a27522007-03-07 01:00:12 +00001225# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001226
Guido van Rossum5abbf752007-08-27 17:39:33 +00001227class MiscIOTest(unittest.TestCase):
1228
1229 def testImport__all__(self):
1230 for name in io.__all__:
1231 obj = getattr(io, name, None)
1232 self.assert_(obj is not None, name)
1233 if name == "open":
1234 continue
1235 elif "error" in name.lower():
1236 self.assert_(issubclass(obj, Exception), name)
1237 else:
1238 self.assert_(issubclass(obj, io.IOBase))
1239
1240
Guido van Rossum28524c72007-02-27 05:47:44 +00001241def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001242 support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Christian Heimes81ee3ef2008-05-04 22:42:01 +00001243 BufferedReaderTest, BufferedWriterTest,
1244 BufferedRWPairTest, BufferedRandomTest,
1245 StatefulIncrementalDecoderTest,
1246 TextIOWrapperTest, MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001247
1248if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001249 unittest.main()