blob: 320c7c3c6d3d7e6f836f2a12ffb8065f692a1af4 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Benjamin Petersonee8712c2008-05-20 21:35:26 +00009from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000010
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000011import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000012import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000013
Guido van Rossuma9e20242007-03-08 00:43:48 +000014
Guido van Rossum53807da2007-04-10 19:01:47 +000015class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000017 def __init__(self, read_stack=()):
18 self._read_stack = list(read_stack)
19 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000020
21 def read(self, n=None):
22 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000023 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000024 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000025 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026
Guido van Rossum01a27522007-03-07 01:00:12 +000027 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000028 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000029 return len(b)
30
31 def writable(self):
32 return True
33
Guido van Rossum68bbcd22007-02-27 17:19:33 +000034 def fileno(self):
35 return 42
36
37 def readable(self):
38 return True
39
Guido van Rossum01a27522007-03-07 01:00:12 +000040 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000041 return True
42
Guido van Rossum01a27522007-03-07 01:00:12 +000043 def seek(self, pos, whence):
44 pass
45
46 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000047 return 42
48
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum78892e42007-04-06 17:31:18 +000050class MockFileIO(io.BytesIO):
51
52 def __init__(self, data):
53 self.read_history = []
54 io.BytesIO.__init__(self, data)
55
56 def read(self, n=None):
57 res = io.BytesIO.read(self, n)
58 self.read_history.append(None if res is None else len(res))
59 return res
60
61
Guido van Rossum01a27522007-03-07 01:00:12 +000062class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000063
Guido van Rossum53807da2007-04-10 19:01:47 +000064 def __init__(self, blocking_script):
65 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000066 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000069 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000070 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000071 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000072 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 else:
74 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000075
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def writable(self):
77 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078
Guido van Rossuma9e20242007-03-08 00:43:48 +000079
Guido van Rossum28524c72007-02-27 05:47:44 +000080class IOTest(unittest.TestCase):
81
Neal Norwitze7789b12008-03-24 06:18:09 +000082 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000083 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +000084
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000085 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000086 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000087
Guido van Rossum28524c72007-02-27 05:47:44 +000088 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.write(b"blah."), 5)
90 self.assertEqual(f.seek(0), 0)
91 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000092 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000093 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000094 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000095 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000096 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000097 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000098 self.assertEqual(f.seek(-1, 2), 13)
99 self.assertEqual(f.tell(), 13)
100 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000101 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000102 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000103
Guido van Rossum9b76da62007-04-11 01:09:03 +0000104 def read_ops(self, f, buffered=False):
105 data = f.read(5)
106 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000107 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000108 self.assertEqual(f.readinto(data), 5)
109 self.assertEqual(data, b" worl")
110 self.assertEqual(f.readinto(data), 2)
111 self.assertEqual(len(data), 5)
112 self.assertEqual(data[:2], b"d\n")
113 self.assertEqual(f.seek(0), 0)
114 self.assertEqual(f.read(20), b"hello world\n")
115 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000116 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000117 self.assertEqual(f.seek(-6, 2), 6)
118 self.assertEqual(f.read(5), b"world")
119 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000120 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000121 self.assertEqual(f.seek(-6, 1), 5)
122 self.assertEqual(f.read(5), b" worl")
123 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000124 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000125 if buffered:
126 f.seek(0)
127 self.assertEqual(f.read(), b"hello world\n")
128 f.seek(6)
129 self.assertEqual(f.read(), b"world\n")
130 self.assertEqual(f.read(), b"")
131
Guido van Rossum34d69e52007-04-10 20:08:41 +0000132 LARGE = 2**31
133
Guido van Rossum53807da2007-04-10 19:01:47 +0000134 def large_file_ops(self, f):
135 assert f.readable()
136 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000137 self.assertEqual(f.seek(self.LARGE), self.LARGE)
138 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000139 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000140 self.assertEqual(f.tell(), self.LARGE + 3)
141 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000142 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000143 self.assertEqual(f.tell(), self.LARGE + 2)
144 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000145 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000146 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000147 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
148 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000149 self.assertEqual(f.read(2), b"x")
150
Guido van Rossum28524c72007-02-27 05:47:44 +0000151 def test_raw_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000152 f = io.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000153 self.assertEqual(f.readable(), False)
154 self.assertEqual(f.writable(), True)
155 self.assertEqual(f.seekable(), True)
156 self.write_ops(f)
157 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000158 f = io.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000159 self.assertEqual(f.readable(), True)
160 self.assertEqual(f.writable(), False)
161 self.assertEqual(f.seekable(), True)
162 self.read_ops(f)
163 f.close()
164
Guido van Rossum87429772007-04-10 21:06:59 +0000165 def test_buffered_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000166 f = io.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000167 self.assertEqual(f.readable(), False)
168 self.assertEqual(f.writable(), True)
169 self.assertEqual(f.seekable(), True)
170 self.write_ops(f)
171 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000172 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000173 self.assertEqual(f.readable(), True)
174 self.assertEqual(f.writable(), False)
175 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000176 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000177 f.close()
178
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000179 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000180 f = io.open(support.TESTFN, "wb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000181 f.write(b"abc\ndef\nxyzzy\nfoo")
182 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000183 f = io.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000184 self.assertEqual(f.readline(), b"abc\n")
185 self.assertEqual(f.readline(10), b"def\n")
186 self.assertEqual(f.readline(2), b"xy")
187 self.assertEqual(f.readline(4), b"zzy\n")
188 self.assertEqual(f.readline(), b"foo")
189 f.close()
190
Guido van Rossum28524c72007-02-27 05:47:44 +0000191 def test_raw_bytes_io(self):
192 f = io.BytesIO()
193 self.write_ops(f)
194 data = f.getvalue()
195 self.assertEqual(data, b"hello world\n")
196 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000197 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000198
Guido van Rossum53807da2007-04-10 19:01:47 +0000199 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000200 # On Windows and Mac OSX this test comsumes large resources; It takes
201 # a long time to build the >2GB file and takes >2GB of disk space
202 # therefore the resource must be enabled to run this test.
203 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000204 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000205 print("\nTesting large file ops skipped on %s." % sys.platform,
206 file=sys.stderr)
207 print("It requires %d bytes and a long time." % self.LARGE,
208 file=sys.stderr)
209 print("Use 'regrtest.py -u largefile test_io' to run it.",
210 file=sys.stderr)
211 return
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000212 f = io.open(support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000213 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000214 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000215 f = io.open(support.TESTFN, "w+b")
Guido van Rossum87429772007-04-10 21:06:59 +0000216 self.large_file_ops(f)
217 f.close()
218
219 def test_with_open(self):
220 for bufsize in (0, 1, 100):
221 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000222 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000223 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000224 self.assertEqual(f.closed, True)
225 f = None
226 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000227 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000228 1/0
229 except ZeroDivisionError:
230 self.assertEqual(f.closed, True)
231 else:
232 self.fail("1/0 didn't raise an exception")
233
234 def test_destructor(self):
235 record = []
236 class MyFileIO(io.FileIO):
237 def __del__(self):
238 record.append(1)
239 io.FileIO.__del__(self)
240 def close(self):
241 record.append(2)
242 io.FileIO.close(self)
243 def flush(self):
244 record.append(3)
245 io.FileIO.flush(self)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000246 f = MyFileIO(support.TESTFN, "w")
Guido van Rossum87429772007-04-10 21:06:59 +0000247 f.write("xxx")
248 del f
249 self.assertEqual(record, [1, 2, 3])
250
251 def test_close_flushes(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000252 f = io.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000253 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000254 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000255 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000256 self.assertEqual(f.read(), b"xxx")
257 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000258
Guido van Rossumd4103952007-04-12 05:44:49 +0000259 def test_array_writes(self):
260 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000261 n = len(memoryview(a))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000262 f = io.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000263 self.assertEqual(f.write(a), n)
264 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000265 f = io.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000266 self.assertEqual(f.write(a), n)
267 f.close()
268
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000269 def test_closefd(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000270 self.assertRaises(ValueError, io.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000271 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000272
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000273class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000275 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000278
279 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000280 buf = self.buftype("1234567890")
281 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000282
283 self.assertEquals(buf[:1], bytesIo.read(1))
284 self.assertEquals(buf[1:5], bytesIo.read(4))
285 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000286 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000287
288 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000289 buf = self.buftype("1234567890")
290 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000291
292 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000293 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000294
295 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000296 buf = self.buftype("1234567890")
297 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000298
299 bytesIo.read(5)
300 bytesIo.seek(0)
301 self.assertEquals(buf, bytesIo.read())
302
303 bytesIo.seek(3)
304 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000305 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000306
307 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000308 buf = self.buftype("1234567890")
309 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000310
311 self.assertEquals(0, bytesIo.tell())
312 bytesIo.seek(5)
313 self.assertEquals(5, bytesIo.tell())
314 bytesIo.seek(10000)
315 self.assertEquals(10000, bytesIo.tell())
316
Guido van Rossuma9e20242007-03-08 00:43:48 +0000317
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000318class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000319 @staticmethod
320 def buftype(s):
321 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000322 ioclass = io.BytesIO
323 EOF = b""
324
325
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000326class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000327 buftype = str
328 ioclass = io.StringIO
329 EOF = ""
330
331
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000332class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000333
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000334 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000335 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000336 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000337
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000338 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000339
Guido van Rossum78892e42007-04-06 17:31:18 +0000340 def testBuffering(self):
341 data = b"abcdefghi"
342 dlen = len(data)
343
344 tests = [
345 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
346 [ 100, [ 3, 3, 3], [ dlen ] ],
347 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
348 ]
349
350 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351 rawio = MockFileIO(data)
352 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000353 pos = 0
354 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000355 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000356 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000357 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000358
Guido van Rossum01a27522007-03-07 01:00:12 +0000359 def testReadNonBlocking(self):
360 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000361 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000362 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000363
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000364 self.assertEquals(b"abcd", bufio.read(6))
365 self.assertEquals(b"e", bufio.read(1))
366 self.assertEquals(b"fg", bufio.read())
367 self.assert_(None is bufio.read())
368 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000369
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000371 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000372 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000373
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000374 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000375
376 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000377 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000378 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000379
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000381
382 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000383 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000384 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000385
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000386 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387
388 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000389 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000390 # this test. Else, write it.
391 pass
392
Guido van Rossuma9e20242007-03-08 00:43:48 +0000393
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000394class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000395
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000396 def testWrite(self):
397 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000398 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000401 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000402
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
405 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000406 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000407 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000408
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 bufio.write(b"abc")
410 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000411
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000412 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000413
Guido van Rossum01a27522007-03-07 01:00:12 +0000414 def testWriteNonBlocking(self):
415 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000416 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000417
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000418 bufio.write(b"asdf")
419 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000420 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
421
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000422 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000424 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000425 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
426 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
427
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000428 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000429
Guido van Rossum78892e42007-04-06 17:31:18 +0000430 # XXX I don't like this test. It relies too heavily on how the
431 # algorithm actually works, which we might change. Refactor
432 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000433
434 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000435 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000436 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000437
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000438 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000439
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000440 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000441 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000442 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000443
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000444 bufio.write(b"abc")
445 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000446
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000447 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000448
Guido van Rossuma9e20242007-03-08 00:43:48 +0000449
Guido van Rossum01a27522007-03-07 01:00:12 +0000450class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000451
Guido van Rossum01a27522007-03-07 01:00:12 +0000452 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000453 r = MockRawIO(())
454 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000455 pair = io.BufferedRWPair(r, w)
456
457 # XXX need implementation
458
Guido van Rossuma9e20242007-03-08 00:43:48 +0000459
Guido van Rossum78892e42007-04-06 17:31:18 +0000460class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000461
Guido van Rossum01a27522007-03-07 01:00:12 +0000462 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000463 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000464 rw = io.BufferedRandom(raw, 8, 12)
465
466 self.assertEqual(b"as", rw.read(2))
467 rw.write(b"ddd")
468 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000469 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000470 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000471 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000472
473 def testSeekAndTell(self):
474 raw = io.BytesIO(b"asdfghjkl")
475 rw = io.BufferedRandom(raw)
476
477 self.assertEquals(b"as", rw.read(2))
478 self.assertEquals(2, rw.tell())
479 rw.seek(0, 0)
480 self.assertEquals(b"asdf", rw.read(4))
481
482 rw.write(b"asdf")
483 rw.seek(0, 0)
484 self.assertEquals(b"asdfasdfl", rw.read())
485 self.assertEquals(9, rw.tell())
486 rw.seek(-4, 2)
487 self.assertEquals(5, rw.tell())
488 rw.seek(2, 1)
489 self.assertEquals(7, rw.tell())
490 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000491 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000492
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000493# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
494# properties:
495# - A single output character can correspond to many bytes of input.
496# - The number of input bytes to complete the character can be
497# undetermined until the last input byte is received.
498# - The number of input bytes can vary depending on previous input.
499# - A single input byte can correspond to many characters of output.
500# - The number of output characters can be undetermined until the
501# last input byte is received.
502# - The number of output characters can vary depending on previous input.
503
504class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
505 """
506 For testing seek/tell behavior with a stateful, buffering decoder.
507
508 Input is a sequence of words. Words may be fixed-length (length set
509 by input) or variable-length (period-terminated). In variable-length
510 mode, extra periods are ignored. Possible words are:
511 - 'i' followed by a number sets the input length, I (maximum 99).
512 When I is set to 0, words are space-terminated.
513 - 'o' followed by a number sets the output length, O (maximum 99).
514 - Any other word is converted into a word followed by a period on
515 the output. The output word consists of the input word truncated
516 or padded out with hyphens to make its length equal to O. If O
517 is 0, the word is output verbatim without truncating or padding.
518 I and O are initially set to 1. When I changes, any buffered input is
519 re-scanned according to the new I. EOF also terminates the last word.
520 """
521
522 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +0000523 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000524 self.reset()
525
526 def __repr__(self):
527 return '<SID %x>' % id(self)
528
529 def reset(self):
530 self.i = 1
531 self.o = 1
532 self.buffer = bytearray()
533
534 def getstate(self):
535 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
536 return bytes(self.buffer), i*100 + o
537
538 def setstate(self, state):
539 buffer, io = state
540 self.buffer = bytearray(buffer)
541 i, o = divmod(io, 100)
542 self.i, self.o = i ^ 1, o ^ 1
543
544 def decode(self, input, final=False):
545 output = ''
546 for b in input:
547 if self.i == 0: # variable-length, terminated with period
548 if b == ord('.'):
549 if self.buffer:
550 output += self.process_word()
551 else:
552 self.buffer.append(b)
553 else: # fixed-length, terminate after self.i bytes
554 self.buffer.append(b)
555 if len(self.buffer) == self.i:
556 output += self.process_word()
557 if final and self.buffer: # EOF terminates the last word
558 output += self.process_word()
559 return output
560
561 def process_word(self):
562 output = ''
563 if self.buffer[0] == ord('i'):
564 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
565 elif self.buffer[0] == ord('o'):
566 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
567 else:
568 output = self.buffer.decode('ascii')
569 if len(output) < self.o:
570 output += '-'*self.o # pad out with hyphens
571 if self.o:
572 output = output[:self.o] # truncate to output length
573 output += '.'
574 self.buffer = bytearray()
575 return output
576
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000577 codecEnabled = False
578
579 @classmethod
580 def lookupTestDecoder(cls, name):
581 if cls.codecEnabled and name == 'test_decoder':
582 return codecs.CodecInfo(
583 name='test_decoder', encode=None, decode=None,
584 incrementalencoder=None,
585 streamreader=None, streamwriter=None,
586 incrementaldecoder=cls)
587
588# Register the previous decoder for testing.
589# Disabled by default, tests will enable it.
590codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
591
592
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000593class StatefulIncrementalDecoderTest(unittest.TestCase):
594 """
595 Make sure the StatefulIncrementalDecoder actually works.
596 """
597
598 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000599 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000600 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000601 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000602 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000603 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000604 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000605 # I=0, O=6 (variable-length input, fixed-length output)
606 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
607 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000608 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000609 # I=6, O=3 (fixed-length input > fixed-length output)
610 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
611 # I=0, then 3; O=29, then 15 (with longer output)
612 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
613 'a----------------------------.' +
614 'b----------------------------.' +
615 'cde--------------------------.' +
616 'abcdefghijabcde.' +
617 'a.b------------.' +
618 '.c.------------.' +
619 'd.e------------.' +
620 'k--------------.' +
621 'l--------------.' +
622 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000623 ]
624
625 def testDecoder(self):
626 # Try a few one-shot test cases.
627 for input, eof, output in self.test_cases:
628 d = StatefulIncrementalDecoder()
629 self.assertEquals(d.decode(input, eof), output)
630
631 # Also test an unfinished decode, followed by forcing EOF.
632 d = StatefulIncrementalDecoder()
633 self.assertEquals(d.decode(b'oiabcd'), '')
634 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000635
636class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000637
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000638 def setUp(self):
639 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
640 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000641 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000642
Guido van Rossumd0712812007-04-11 16:32:43 +0000643 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000644 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000645
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000646 def testLineBuffering(self):
647 r = io.BytesIO()
648 b = io.BufferedWriter(r, 1000)
649 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
650 t.write("X")
651 self.assertEquals(r.getvalue(), b"") # No flush happened
652 t.write("Y\nZ")
653 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
654 t.write("A\rB")
655 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
656
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000657 def testEncodingErrorsReading(self):
658 # (1) default
659 b = io.BytesIO(b"abc\n\xff\n")
660 t = io.TextIOWrapper(b, encoding="ascii")
661 self.assertRaises(UnicodeError, t.read)
662 # (2) explicit strict
663 b = io.BytesIO(b"abc\n\xff\n")
664 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
665 self.assertRaises(UnicodeError, t.read)
666 # (3) ignore
667 b = io.BytesIO(b"abc\n\xff\n")
668 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
669 self.assertEquals(t.read(), "abc\n\n")
670 # (4) replace
671 b = io.BytesIO(b"abc\n\xff\n")
672 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
673 self.assertEquals(t.read(), "abc\n\ufffd\n")
674
675 def testEncodingErrorsWriting(self):
676 # (1) default
677 b = io.BytesIO()
678 t = io.TextIOWrapper(b, encoding="ascii")
679 self.assertRaises(UnicodeError, t.write, "\xff")
680 # (2) explicit strict
681 b = io.BytesIO()
682 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
683 self.assertRaises(UnicodeError, t.write, "\xff")
684 # (3) ignore
685 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000686 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
687 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000688 t.write("abc\xffdef\n")
689 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000690 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000691 # (4) replace
692 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000693 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
694 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000695 t.write("abc\xffdef\n")
696 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000697 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000698
Guido van Rossum8358db22007-08-18 21:39:55 +0000699 def testNewlinesInput(self):
700 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
701 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
702 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000703 (None, normalized.decode("ascii").splitlines(True)),
704 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000705 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
706 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
707 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
708 ]:
709 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000710 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000711 self.assertEquals(txt.readlines(), expected)
712 txt.seek(0)
713 self.assertEquals(txt.read(), "".join(expected))
714
715 def testNewlinesOutput(self):
716 testdict = {
717 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
718 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
719 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
720 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
721 }
722 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
723 for newline, expected in tests:
724 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000725 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000726 txt.write("AAA\nB")
727 txt.write("BB\nCCC\n")
728 txt.write("X\rY\r\nZ")
729 txt.flush()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000730 self.assertEquals(buf.closed, False)
Guido van Rossum8358db22007-08-18 21:39:55 +0000731 self.assertEquals(buf.getvalue(), expected)
732
Guido van Rossum78892e42007-04-06 17:31:18 +0000733 def testNewlines(self):
734 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
735
736 tests = [
737 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000738 [ '', input_lines ],
739 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
740 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
741 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000742 ]
743
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000744 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000745
Guido van Rossum8358db22007-08-18 21:39:55 +0000746 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000747 # character in TextIOWrapper._pending_line.
748 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000749 # XXX: str.encode() should return bytes
750 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000751 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000752 for bufsize in range(1, 10):
753 for newline, exp_lines in tests:
754 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000755 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000756 encoding=encoding)
757 if do_reads:
758 got_lines = []
759 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000760 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000761 if c2 == '':
762 break
763 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000764 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000765 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000766 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000767
768 for got_line, exp_line in zip(got_lines, exp_lines):
769 self.assertEquals(got_line, exp_line)
770 self.assertEquals(len(got_lines), len(exp_lines))
771
Guido van Rossum8358db22007-08-18 21:39:55 +0000772 def testNewlinesInput(self):
773 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
774 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
775 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000776 (None, normalized.decode("ascii").splitlines(True)),
777 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000778 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
779 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
780 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
781 ]:
782 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000783 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000784 self.assertEquals(txt.readlines(), expected)
785 txt.seek(0)
786 self.assertEquals(txt.read(), "".join(expected))
787
788 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000789 data = "AAA\nBBB\rCCC\n"
790 data_lf = b"AAA\nBBB\rCCC\n"
791 data_cr = b"AAA\rBBB\rCCC\r"
792 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000793 save_linesep = os.linesep
794 try:
795 for os.linesep, newline, expected in [
796 ("\n", None, data_lf),
797 ("\r\n", None, data_crlf),
798 ("\n", "", data_lf),
799 ("\r\n", "", data_lf),
800 ("\n", "\n", data_lf),
801 ("\r\n", "\n", data_lf),
802 ("\n", "\r", data_cr),
803 ("\r\n", "\r", data_cr),
804 ("\n", "\r\n", data_crlf),
805 ("\r\n", "\r\n", data_crlf),
806 ]:
807 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000808 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000809 txt.write(data)
810 txt.close()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000811 self.assertEquals(buf.closed, True)
812 self.assertRaises(ValueError, buf.getvalue)
Guido van Rossum18363582007-08-18 22:05:58 +0000813 finally:
814 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000815
Guido van Rossum9b76da62007-04-11 01:09:03 +0000816 # Systematic tests of the text I/O API
817
818 def testBasicIO(self):
819 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
820 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000821 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000822 f._CHUNK_SIZE = chunksize
823 self.assertEquals(f.write("abc"), 3)
824 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000825 f = io.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000826 f._CHUNK_SIZE = chunksize
827 self.assertEquals(f.tell(), 0)
828 self.assertEquals(f.read(), "abc")
829 cookie = f.tell()
830 self.assertEquals(f.seek(0), 0)
831 self.assertEquals(f.read(2), "ab")
832 self.assertEquals(f.read(1), "c")
833 self.assertEquals(f.read(1), "")
834 self.assertEquals(f.read(), "")
835 self.assertEquals(f.tell(), cookie)
836 self.assertEquals(f.seek(0), 0)
837 self.assertEquals(f.seek(0, 2), cookie)
838 self.assertEquals(f.write("def"), 3)
839 self.assertEquals(f.seek(cookie), cookie)
840 self.assertEquals(f.read(), "def")
841 if enc.startswith("utf"):
842 self.multi_line_test(f, enc)
843 f.close()
844
845 def multi_line_test(self, f, enc):
846 f.seek(0)
847 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000848 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000849 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000850 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000851 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000852 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000853 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000854 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000855 wlines.append((f.tell(), line))
856 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000857 f.seek(0)
858 rlines = []
859 while True:
860 pos = f.tell()
861 line = f.readline()
862 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000863 break
864 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000865 self.assertEquals(rlines, wlines)
866
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000867 def testTelling(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000868 f = io.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000869 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000870 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000871 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000872 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000873 p2 = f.tell()
874 f.seek(0)
875 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000876 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000877 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000878 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000879 self.assertEquals(f.tell(), p2)
880 f.seek(0)
881 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000882 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000883 self.assertRaises(IOError, f.tell)
884 self.assertEquals(f.tell(), p2)
885 f.close()
886
Guido van Rossumd76e7792007-04-17 02:38:04 +0000887 def testSeeking(self):
888 chunk_size = io.TextIOWrapper._CHUNK_SIZE
889 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000890 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000891 prefix = bytes(u_prefix.encode("utf-8"))
892 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000893 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000894 suffix = bytes(u_suffix.encode("utf-8"))
895 line = prefix + suffix
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000896 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +0000897 f.write(line*2)
898 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000899 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +0000900 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000901 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000902 self.assertEquals(f.tell(), prefix_size)
903 self.assertEquals(f.readline(), u_suffix)
904
905 def testSeekingToo(self):
906 # Regression test for a specific bug
907 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000908 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +0000909 f.write(data)
910 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000911 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +0000912 f._CHUNK_SIZE # Just test that it exists
913 f._CHUNK_SIZE = 2
914 f.readline()
915 f.tell()
916
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000917 def testSeekAndTell(self):
918 """Test seek/tell using the StatefulIncrementalDecoder."""
919
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000920 def testSeekAndTellWithData(data, min_pos=0):
921 """Tell/seek to various points within a data stream and ensure
922 that the decoded data returned by read() is consistent."""
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000923 f = io.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000924 f.write(data)
925 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000926 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000927 decoded = f.read()
928 f.close()
929
Neal Norwitze2b07052008-03-18 19:52:05 +0000930 for i in range(min_pos, len(decoded) + 1): # seek positions
931 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000932 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000933 self.assertEquals(f.read(i), decoded[:i])
934 cookie = f.tell()
935 self.assertEquals(f.read(j), decoded[i:i + j])
936 f.seek(cookie)
937 self.assertEquals(f.read(), decoded[i:])
938 f.close()
939
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000940 # Enable the test decoder.
941 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000942
943 # Run the tests.
944 try:
945 # Try each test case.
946 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
947 testSeekAndTellWithData(input)
948
949 # Position each test case so that it crosses a chunk boundary.
950 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
951 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
952 offset = CHUNK_SIZE - len(input)//2
953 prefix = b'.'*offset
954 # Don't bother seeking into the prefix (takes too long).
955 min_pos = offset*2
956 testSeekAndTellWithData(prefix + input, min_pos)
957
958 # Ensure our test decoder won't interfere with subsequent tests.
959 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000960 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000961
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +0000962 def testEncodedWrites(self):
963 data = "1234567890"
964 tests = ("utf-16",
965 "utf-16-le",
966 "utf-16-be",
967 "utf-32",
968 "utf-32-le",
969 "utf-32-be")
970 for encoding in tests:
971 buf = io.BytesIO()
972 f = io.TextIOWrapper(buf, encoding=encoding)
973 # Check if the BOM is written only once (see issue1753).
974 f.write(data)
975 f.write(data)
976 f.seek(0)
977 self.assertEquals(f.read(), data * 2)
978 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
979
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000980 def timingTest(self):
981 timer = time.time
982 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000983 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000984 nlines = 10000
985 nchars = len(line)
986 nbytes = len(line.encode(enc))
987 for chunk_size in (32, 64, 128, 256):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000988 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000989 f._CHUNK_SIZE = chunk_size
990 t0 = timer()
991 for i in range(nlines):
992 f.write(line)
993 f.flush()
994 t1 = timer()
995 f.seek(0)
996 for line in f:
997 pass
998 t2 = timer()
999 f.seek(0)
1000 while f.readline():
1001 pass
1002 t3 = timer()
1003 f.seek(0)
1004 while f.readline():
1005 f.tell()
1006 t4 = timer()
1007 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001008 if support.verbose:
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001009 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1010 (nlines, nchars, nbytes))
1011 print("File chunk size: %6s" % f._CHUNK_SIZE)
1012 print("Writing: %6.3f seconds" % (t1-t0))
1013 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1014 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1015 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1016
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001017 def testReadOneByOne(self):
1018 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1019 reads = ""
1020 while True:
1021 c = txt.read(1)
1022 if not c:
1023 break
1024 reads += c
1025 self.assertEquals(reads, "AA\nBB")
1026
1027 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1028 def testReadByChunk(self):
1029 # make sure "\r\n" straddles 128 char boundary.
1030 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1031 reads = ""
1032 while True:
1033 c = txt.read(128)
1034 if not c:
1035 break
1036 reads += c
1037 self.assertEquals(reads, "A"*127+"\nB")
1038
1039 def test_issue1395_1(self):
1040 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1041
1042 # read one char at a time
1043 reads = ""
1044 while True:
1045 c = txt.read(1)
1046 if not c:
1047 break
1048 reads += c
1049 self.assertEquals(reads, self.normalized)
1050
1051 def test_issue1395_2(self):
1052 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1053 txt._CHUNK_SIZE = 4
1054
1055 reads = ""
1056 while True:
1057 c = txt.read(4)
1058 if not c:
1059 break
1060 reads += c
1061 self.assertEquals(reads, self.normalized)
1062
1063 def test_issue1395_3(self):
1064 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1065 txt._CHUNK_SIZE = 4
1066
1067 reads = txt.read(4)
1068 reads += txt.read(4)
1069 reads += txt.readline()
1070 reads += txt.readline()
1071 reads += txt.readline()
1072 self.assertEquals(reads, self.normalized)
1073
1074 def test_issue1395_4(self):
1075 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1076 txt._CHUNK_SIZE = 4
1077
1078 reads = txt.read(4)
1079 reads += txt.read()
1080 self.assertEquals(reads, self.normalized)
1081
1082 def test_issue1395_5(self):
1083 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1084 txt._CHUNK_SIZE = 4
1085
1086 reads = txt.read(4)
1087 pos = txt.tell()
1088 txt.seek(0)
1089 txt.seek(pos)
1090 self.assertEquals(txt.read(4), "BBB\n")
1091
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001092 def test_issue2282(self):
1093 buffer = io.BytesIO(self.testdata)
1094 txt = io.TextIOWrapper(buffer, encoding="ascii")
1095
1096 self.assertEqual(buffer.seekable(), txt.seekable())
1097
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001098 def test_newline_decoder(self):
1099 import codecs
1100 decoder = codecs.getincrementaldecoder("utf-8")()
1101 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1102
1103 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1104
1105 self.assertEquals(decoder.decode(b'\xe8'), "")
1106 self.assertEquals(decoder.decode(b'\xa2'), "")
1107 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
1108
1109 self.assertEquals(decoder.decode(b'\xe8'), "")
1110 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1111
1112 decoder.setstate((b'', 0))
1113 self.assertEquals(decoder.decode(b'\n'), "\n")
1114 self.assertEquals(decoder.decode(b'\r'), "")
1115 self.assertEquals(decoder.decode(b'', final=True), "\n")
1116 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
1117
1118 self.assertEquals(decoder.decode(b'\r'), "")
1119 self.assertEquals(decoder.decode(b'a'), "\na")
1120
1121 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
1122 self.assertEquals(decoder.decode(b'\r'), "")
1123 self.assertEquals(decoder.decode(b'\r'), "\n")
1124 self.assertEquals(decoder.decode(b'\na'), "\na")
1125
1126 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
1127 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1128 self.assertEquals(decoder.decode(b'\n'), "\n")
1129 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
1130 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001131
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001132 decoder = codecs.getincrementaldecoder("utf-8")()
1133 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1134 self.assertEquals(decoder.newlines, None)
1135 decoder.decode(b"abc\n\r")
1136 self.assertEquals(decoder.newlines, '\n')
1137 decoder.decode(b"\nabc")
1138 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1139 decoder.decode(b"abc\r")
1140 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1141 decoder.decode(b"abc")
1142 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1143 decoder.decode(b"abc\r")
1144 decoder.reset()
1145 self.assertEquals(decoder.decode(b"abc"), "abc")
1146 self.assertEquals(decoder.newlines, None)
1147
Guido van Rossum01a27522007-03-07 01:00:12 +00001148# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001149
Guido van Rossum5abbf752007-08-27 17:39:33 +00001150class MiscIOTest(unittest.TestCase):
1151
1152 def testImport__all__(self):
1153 for name in io.__all__:
1154 obj = getattr(io, name, None)
1155 self.assert_(obj is not None, name)
1156 if name == "open":
1157 continue
1158 elif "error" in name.lower():
1159 self.assert_(issubclass(obj, Exception), name)
1160 else:
1161 self.assert_(issubclass(obj, io.IOBase))
1162
1163
Guido van Rossum28524c72007-02-27 05:47:44 +00001164def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001165 support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Christian Heimes81ee3ef2008-05-04 22:42:01 +00001166 BufferedReaderTest, BufferedWriterTest,
1167 BufferedRWPairTest, BufferedRandomTest,
1168 StatefulIncrementalDecoderTest,
1169 TextIOWrapperTest, MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001170
1171if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001172 unittest.main()