blob: 6265258493d17797d6d66bf3a4e9b4fe3385f6b1 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Antoine Pitrou87695762008-08-14 22:44:29 +00007import threading
8import random
Guido van Rossum28524c72007-02-27 05:47:44 +00009import unittest
Antoine Pitrou87695762008-08-14 22:44:29 +000010from itertools import chain, cycle
Benjamin Petersonee8712c2008-05-20 21:35:26 +000011from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000012
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000013import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000014import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000015
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum53807da2007-04-10 19:01:47 +000017class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000018
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000019 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000022
23 def read(self, n=None):
24 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000025 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000027 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000028
Guido van Rossum01a27522007-03-07 01:00:12 +000029 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000030 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000031 return len(b)
32
33 def writable(self):
34 return True
35
Guido van Rossum68bbcd22007-02-27 17:19:33 +000036 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000043 return True
44
Guido van Rossum01a27522007-03-07 01:00:12 +000045 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000049 return 42
50
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Guido van Rossum78892e42007-04-06 17:31:18 +000052class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
Guido van Rossum01a27522007-03-07 01:00:12 +000064class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000065
Guido van Rossum53807da2007-04-10 19:01:47 +000066 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000068 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000069
Guido van Rossum01a27522007-03-07 01:00:12 +000070 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000072 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000074 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000075 else:
76 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000077
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def writable(self):
79 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080
Guido van Rossuma9e20242007-03-08 00:43:48 +000081
Guido van Rossum28524c72007-02-27 05:47:44 +000082class IOTest(unittest.TestCase):
83
Neal Norwitze7789b12008-03-24 06:18:09 +000084 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000085 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +000086
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000087 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000088 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000089
Guido van Rossum28524c72007-02-27 05:47:44 +000090 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000091 self.assertEqual(f.write(b"blah."), 5)
92 self.assertEqual(f.seek(0), 0)
93 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000094 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000095 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000096 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000097 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000098 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000099 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000100 self.assertEqual(f.seek(-1, 2), 13)
101 self.assertEqual(f.tell(), 13)
102 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000103 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000104 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000105
Guido van Rossum9b76da62007-04-11 01:09:03 +0000106 def read_ops(self, f, buffered=False):
107 data = f.read(5)
108 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000109 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000110 self.assertEqual(f.readinto(data), 5)
111 self.assertEqual(data, b" worl")
112 self.assertEqual(f.readinto(data), 2)
113 self.assertEqual(len(data), 5)
114 self.assertEqual(data[:2], b"d\n")
115 self.assertEqual(f.seek(0), 0)
116 self.assertEqual(f.read(20), b"hello world\n")
117 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000118 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000119 self.assertEqual(f.seek(-6, 2), 6)
120 self.assertEqual(f.read(5), b"world")
121 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000122 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000123 self.assertEqual(f.seek(-6, 1), 5)
124 self.assertEqual(f.read(5), b" worl")
125 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000126 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000127 if buffered:
128 f.seek(0)
129 self.assertEqual(f.read(), b"hello world\n")
130 f.seek(6)
131 self.assertEqual(f.read(), b"world\n")
132 self.assertEqual(f.read(), b"")
133
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 LARGE = 2**31
135
Guido van Rossum53807da2007-04-10 19:01:47 +0000136 def large_file_ops(self, f):
137 assert f.readable()
138 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.seek(self.LARGE), self.LARGE)
140 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000141 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000142 self.assertEqual(f.tell(), self.LARGE + 3)
143 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000144 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000145 self.assertEqual(f.tell(), self.LARGE + 2)
146 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000147 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000148 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000149 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
150 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000151 self.assertEqual(f.read(2), b"x")
152
Guido van Rossum28524c72007-02-27 05:47:44 +0000153 def test_raw_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 f = io.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000155 self.assertEqual(f.readable(), False)
156 self.assertEqual(f.writable(), True)
157 self.assertEqual(f.seekable(), True)
158 self.write_ops(f)
159 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000160 f = io.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000161 self.assertEqual(f.readable(), True)
162 self.assertEqual(f.writable(), False)
163 self.assertEqual(f.seekable(), True)
164 self.read_ops(f)
165 f.close()
166
Guido van Rossum87429772007-04-10 21:06:59 +0000167 def test_buffered_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000168 f = io.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000169 self.assertEqual(f.readable(), False)
170 self.assertEqual(f.writable(), True)
171 self.assertEqual(f.seekable(), True)
172 self.write_ops(f)
173 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000174 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000175 self.assertEqual(f.readable(), True)
176 self.assertEqual(f.writable(), False)
177 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000178 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000179 f.close()
180
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000181 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000182 f = io.open(support.TESTFN, "wb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000183 f.write(b"abc\ndef\nxyzzy\nfoo")
184 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000185 f = io.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000186 self.assertEqual(f.readline(), b"abc\n")
187 self.assertEqual(f.readline(10), b"def\n")
188 self.assertEqual(f.readline(2), b"xy")
189 self.assertEqual(f.readline(4), b"zzy\n")
190 self.assertEqual(f.readline(), b"foo")
191 f.close()
192
Guido van Rossum28524c72007-02-27 05:47:44 +0000193 def test_raw_bytes_io(self):
194 f = io.BytesIO()
195 self.write_ops(f)
196 data = f.getvalue()
197 self.assertEqual(data, b"hello world\n")
198 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000199 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000200
Guido van Rossum53807da2007-04-10 19:01:47 +0000201 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000202 # On Windows and Mac OSX this test comsumes large resources; It takes
203 # a long time to build the >2GB file and takes >2GB of disk space
204 # therefore the resource must be enabled to run this test.
205 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000206 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000207 print("\nTesting large file ops skipped on %s." % sys.platform,
208 file=sys.stderr)
209 print("It requires %d bytes and a long time." % self.LARGE,
210 file=sys.stderr)
211 print("Use 'regrtest.py -u largefile test_io' to run it.",
212 file=sys.stderr)
213 return
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000214 f = io.open(support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000215 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000216 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000217 f = io.open(support.TESTFN, "w+b")
Guido van Rossum87429772007-04-10 21:06:59 +0000218 self.large_file_ops(f)
219 f.close()
220
221 def test_with_open(self):
222 for bufsize in (0, 1, 100):
223 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000225 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000226 self.assertEqual(f.closed, True)
227 f = None
228 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000230 1/0
231 except ZeroDivisionError:
232 self.assertEqual(f.closed, True)
233 else:
234 self.fail("1/0 didn't raise an exception")
235
Antoine Pitrou08838b62009-01-21 00:55:13 +0000236 # issue 5008
237 def test_append_mode_tell(self):
238 with io.open(support.TESTFN, "wb") as f:
239 f.write(b"xxx")
240 with io.open(support.TESTFN, "ab", buffering=0) as f:
241 self.assertEqual(f.tell(), 3)
242 with io.open(support.TESTFN, "ab") as f:
243 self.assertEqual(f.tell(), 3)
244 with io.open(support.TESTFN, "a") as f:
245 self.assert_(f.tell() > 0)
246
Guido van Rossum87429772007-04-10 21:06:59 +0000247 def test_destructor(self):
248 record = []
249 class MyFileIO(io.FileIO):
250 def __del__(self):
251 record.append(1)
252 io.FileIO.__del__(self)
253 def close(self):
254 record.append(2)
255 io.FileIO.close(self)
256 def flush(self):
257 record.append(3)
258 io.FileIO.flush(self)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000259 f = MyFileIO(support.TESTFN, "w")
Guido van Rossum87429772007-04-10 21:06:59 +0000260 f.write("xxx")
261 del f
262 self.assertEqual(record, [1, 2, 3])
263
264 def test_close_flushes(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000265 f = io.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000266 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000267 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000268 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000269 self.assertEqual(f.read(), b"xxx")
270 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000271
Guido van Rossumd4103952007-04-12 05:44:49 +0000272 def test_array_writes(self):
273 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000274 n = len(a.tostring())
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000275 f = io.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000276 self.assertEqual(f.write(a), n)
277 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000278 f = io.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000279 self.assertEqual(f.write(a), n)
280 f.close()
281
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000282 def test_closefd(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000283 self.assertRaises(ValueError, io.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000284 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000285
Christian Heimesecc42a22008-11-05 19:30:32 +0000286 def testReadClosed(self):
287 with io.open(support.TESTFN, "w") as f:
288 f.write("egg\n")
289 with io.open(support.TESTFN, "r") as f:
290 file = io.open(f.fileno(), "r", closefd=False)
291 self.assertEqual(file.read(), "egg\n")
292 file.seek(0)
293 file.close()
294 self.assertRaises(ValueError, file.read)
295
296 def test_no_closefd_with_filename(self):
297 # can't use closefd in combination with a file name
298 self.assertRaises(ValueError, io.open, support.TESTFN, "r", closefd=False)
299
300 def test_closefd_attr(self):
301 with io.open(support.TESTFN, "wb") as f:
302 f.write(b"egg\n")
303 with io.open(support.TESTFN, "r") as f:
304 self.assertEqual(f.buffer.raw.closefd, True)
305 file = io.open(f.fileno(), "r", closefd=False)
306 self.assertEqual(file.buffer.raw.closefd, False)
307
308
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000309class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000310
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000311 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000312 buf = self.buftype("1234567890")
313 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000314
315 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000316 buf = self.buftype("1234567890")
317 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000318
319 self.assertEquals(buf[:1], bytesIo.read(1))
320 self.assertEquals(buf[1:5], bytesIo.read(4))
321 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000322 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000323
324 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000325 buf = self.buftype("1234567890")
326 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000327
328 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000329 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000330
331 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000332 buf = self.buftype("1234567890")
333 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000334
335 bytesIo.read(5)
336 bytesIo.seek(0)
337 self.assertEquals(buf, bytesIo.read())
338
339 bytesIo.seek(3)
340 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000341 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000342
343 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000344 buf = self.buftype("1234567890")
345 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000346
347 self.assertEquals(0, bytesIo.tell())
348 bytesIo.seek(5)
349 self.assertEquals(5, bytesIo.tell())
350 bytesIo.seek(10000)
351 self.assertEquals(10000, bytesIo.tell())
352
Guido van Rossuma9e20242007-03-08 00:43:48 +0000353
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000354class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000355 @staticmethod
356 def buftype(s):
357 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000358 ioclass = io.BytesIO
359 EOF = b""
360
361
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000362class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000363 buftype = str
364 ioclass = io.StringIO
365 EOF = ""
366
367
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000368class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000369
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000371 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000372 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000373
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000374 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000375
Guido van Rossum78892e42007-04-06 17:31:18 +0000376 def testBuffering(self):
377 data = b"abcdefghi"
378 dlen = len(data)
379
380 tests = [
381 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
382 [ 100, [ 3, 3, 3], [ dlen ] ],
383 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
384 ]
385
386 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000387 rawio = MockFileIO(data)
388 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000389 pos = 0
390 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000391 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000392 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000393 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000394
Guido van Rossum01a27522007-03-07 01:00:12 +0000395 def testReadNonBlocking(self):
396 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000397 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000398 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000399
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000400 self.assertEquals(b"abcd", bufio.read(6))
401 self.assertEquals(b"e", bufio.read(1))
402 self.assertEquals(b"fg", bufio.read())
403 self.assert_(None is bufio.read())
404 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000405
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000406 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000407 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000408 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000409
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000410 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000411
412 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000413 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000414 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000415
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000416 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000417
418 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000419 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000420 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000421
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000422 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000423
424 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000425 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000426 # this test. Else, write it.
427 pass
428
Antoine Pitrou87695762008-08-14 22:44:29 +0000429 def testThreads(self):
430 try:
431 # Write out many bytes with exactly the same number of 0's,
432 # 1's... 255's. This will help us check that concurrent reading
433 # doesn't duplicate or forget contents.
434 N = 1000
435 l = list(range(256)) * N
436 random.shuffle(l)
437 s = bytes(bytearray(l))
438 with io.open(support.TESTFN, "wb") as f:
439 f.write(s)
440 with io.open(support.TESTFN, "rb", buffering=0) as raw:
441 bufio = io.BufferedReader(raw, 8)
442 errors = []
443 results = []
444 def f():
445 try:
446 # Intra-buffer read then buffer-flushing read
447 for n in cycle([1, 19]):
448 s = bufio.read(n)
449 if not s:
450 break
451 # list.append() is atomic
452 results.append(s)
453 except Exception as e:
454 errors.append(e)
455 raise
456 threads = [threading.Thread(target=f) for x in range(20)]
457 for t in threads:
458 t.start()
459 time.sleep(0.02) # yield
460 for t in threads:
461 t.join()
462 self.assertFalse(errors,
463 "the following exceptions were caught: %r" % errors)
464 s = b''.join(results)
465 for i in range(256):
466 c = bytes(bytearray([i]))
467 self.assertEqual(s.count(c), N)
468 finally:
469 support.unlink(support.TESTFN)
470
471
Guido van Rossuma9e20242007-03-08 00:43:48 +0000472
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000473class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000474
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000475 def testWrite(self):
476 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000477 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000478 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000479
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000480 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000481
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000482 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000483
484 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000485 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000486 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000487
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000488 bufio.write(b"abc")
489 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000490
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000491 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000492
Guido van Rossum01a27522007-03-07 01:00:12 +0000493 def testWriteNonBlocking(self):
494 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000495 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000496
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000497 bufio.write(b"asdf")
498 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000499 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
500
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000501 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000502 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000503 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000504 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
505 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
506
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000507 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000508
Guido van Rossum78892e42007-04-06 17:31:18 +0000509 # XXX I don't like this test. It relies too heavily on how the
510 # algorithm actually works, which we might change. Refactor
511 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000512
513 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000514 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000515 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000516
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000517 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000518
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000519 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000520 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000521 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000522
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000523 bufio.write(b"abc")
524 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000525
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000526 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000527
Antoine Pitrou87695762008-08-14 22:44:29 +0000528 def testThreads(self):
529 # BufferedWriter should not raise exceptions or crash
530 # when called from multiple threads.
531 try:
532 # We use a real file object because it allows us to
533 # exercise situations where the GIL is released before
534 # writing the buffer to the raw streams. This is in addition
535 # to concurrency issues due to switching threads in the middle
536 # of Python code.
537 with io.open(support.TESTFN, "wb", buffering=0) as raw:
538 bufio = io.BufferedWriter(raw, 8)
539 errors = []
540 def f():
541 try:
542 # Write enough bytes to flush the buffer
543 s = b"a" * 19
544 for i in range(50):
545 bufio.write(s)
546 except Exception as e:
547 errors.append(e)
548 raise
549 threads = [threading.Thread(target=f) for x in range(20)]
550 for t in threads:
551 t.start()
552 time.sleep(0.02) # yield
553 for t in threads:
554 t.join()
555 self.assertFalse(errors,
556 "the following exceptions were caught: %r" % errors)
557 finally:
558 support.unlink(support.TESTFN)
559
Guido van Rossuma9e20242007-03-08 00:43:48 +0000560
Guido van Rossum01a27522007-03-07 01:00:12 +0000561class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000562
Guido van Rossum01a27522007-03-07 01:00:12 +0000563 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000564 r = MockRawIO(())
565 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000566 pair = io.BufferedRWPair(r, w)
Benjamin Peterson92035012008-12-27 16:00:54 +0000567 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +0000568
Benjamin Peterson92035012008-12-27 16:00:54 +0000569 # XXX More Tests
Guido van Rossum01a27522007-03-07 01:00:12 +0000570
Guido van Rossuma9e20242007-03-08 00:43:48 +0000571
Guido van Rossum78892e42007-04-06 17:31:18 +0000572class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000573
Guido van Rossum01a27522007-03-07 01:00:12 +0000574 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000575 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000576 rw = io.BufferedRandom(raw, 8, 12)
577
578 self.assertEqual(b"as", rw.read(2))
579 rw.write(b"ddd")
580 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000581 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000582 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000583 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000584
585 def testSeekAndTell(self):
586 raw = io.BytesIO(b"asdfghjkl")
587 rw = io.BufferedRandom(raw)
588
589 self.assertEquals(b"as", rw.read(2))
590 self.assertEquals(2, rw.tell())
591 rw.seek(0, 0)
592 self.assertEquals(b"asdf", rw.read(4))
593
594 rw.write(b"asdf")
595 rw.seek(0, 0)
596 self.assertEquals(b"asdfasdfl", rw.read())
597 self.assertEquals(9, rw.tell())
598 rw.seek(-4, 2)
599 self.assertEquals(5, rw.tell())
600 rw.seek(2, 1)
601 self.assertEquals(7, rw.tell())
602 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000603 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000604
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000605# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
606# properties:
607# - A single output character can correspond to many bytes of input.
608# - The number of input bytes to complete the character can be
609# undetermined until the last input byte is received.
610# - The number of input bytes can vary depending on previous input.
611# - A single input byte can correspond to many characters of output.
612# - The number of output characters can be undetermined until the
613# last input byte is received.
614# - The number of output characters can vary depending on previous input.
615
616class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
617 """
618 For testing seek/tell behavior with a stateful, buffering decoder.
619
620 Input is a sequence of words. Words may be fixed-length (length set
621 by input) or variable-length (period-terminated). In variable-length
622 mode, extra periods are ignored. Possible words are:
623 - 'i' followed by a number sets the input length, I (maximum 99).
624 When I is set to 0, words are space-terminated.
625 - 'o' followed by a number sets the output length, O (maximum 99).
626 - Any other word is converted into a word followed by a period on
627 the output. The output word consists of the input word truncated
628 or padded out with hyphens to make its length equal to O. If O
629 is 0, the word is output verbatim without truncating or padding.
630 I and O are initially set to 1. When I changes, any buffered input is
631 re-scanned according to the new I. EOF also terminates the last word.
632 """
633
634 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +0000635 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000636 self.reset()
637
638 def __repr__(self):
639 return '<SID %x>' % id(self)
640
641 def reset(self):
642 self.i = 1
643 self.o = 1
644 self.buffer = bytearray()
645
646 def getstate(self):
647 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
648 return bytes(self.buffer), i*100 + o
649
650 def setstate(self, state):
651 buffer, io = state
652 self.buffer = bytearray(buffer)
653 i, o = divmod(io, 100)
654 self.i, self.o = i ^ 1, o ^ 1
655
656 def decode(self, input, final=False):
657 output = ''
658 for b in input:
659 if self.i == 0: # variable-length, terminated with period
660 if b == ord('.'):
661 if self.buffer:
662 output += self.process_word()
663 else:
664 self.buffer.append(b)
665 else: # fixed-length, terminate after self.i bytes
666 self.buffer.append(b)
667 if len(self.buffer) == self.i:
668 output += self.process_word()
669 if final and self.buffer: # EOF terminates the last word
670 output += self.process_word()
671 return output
672
673 def process_word(self):
674 output = ''
675 if self.buffer[0] == ord('i'):
676 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
677 elif self.buffer[0] == ord('o'):
678 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
679 else:
680 output = self.buffer.decode('ascii')
681 if len(output) < self.o:
682 output += '-'*self.o # pad out with hyphens
683 if self.o:
684 output = output[:self.o] # truncate to output length
685 output += '.'
686 self.buffer = bytearray()
687 return output
688
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000689 codecEnabled = False
690
691 @classmethod
692 def lookupTestDecoder(cls, name):
693 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +0000694 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000695 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +0000696 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000697 incrementalencoder=None,
698 streamreader=None, streamwriter=None,
699 incrementaldecoder=cls)
700
701# Register the previous decoder for testing.
702# Disabled by default, tests will enable it.
703codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
704
705
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000706class StatefulIncrementalDecoderTest(unittest.TestCase):
707 """
708 Make sure the StatefulIncrementalDecoder actually works.
709 """
710
711 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000712 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000713 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000714 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000715 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000716 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000717 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000718 # I=0, O=6 (variable-length input, fixed-length output)
719 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
720 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000721 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000722 # I=6, O=3 (fixed-length input > fixed-length output)
723 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
724 # I=0, then 3; O=29, then 15 (with longer output)
725 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
726 'a----------------------------.' +
727 'b----------------------------.' +
728 'cde--------------------------.' +
729 'abcdefghijabcde.' +
730 'a.b------------.' +
731 '.c.------------.' +
732 'd.e------------.' +
733 'k--------------.' +
734 'l--------------.' +
735 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000736 ]
737
738 def testDecoder(self):
739 # Try a few one-shot test cases.
740 for input, eof, output in self.test_cases:
741 d = StatefulIncrementalDecoder()
742 self.assertEquals(d.decode(input, eof), output)
743
744 # Also test an unfinished decode, followed by forcing EOF.
745 d = StatefulIncrementalDecoder()
746 self.assertEquals(d.decode(b'oiabcd'), '')
747 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000748
749class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000750
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000751 def setUp(self):
752 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
753 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000754 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000755
Guido van Rossumd0712812007-04-11 16:32:43 +0000756 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000757 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000758
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000759 def testLineBuffering(self):
760 r = io.BytesIO()
761 b = io.BufferedWriter(r, 1000)
762 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
763 t.write("X")
764 self.assertEquals(r.getvalue(), b"") # No flush happened
765 t.write("Y\nZ")
766 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
767 t.write("A\rB")
768 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
769
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000770 def testEncodingErrorsReading(self):
771 # (1) default
772 b = io.BytesIO(b"abc\n\xff\n")
773 t = io.TextIOWrapper(b, encoding="ascii")
774 self.assertRaises(UnicodeError, t.read)
775 # (2) explicit strict
776 b = io.BytesIO(b"abc\n\xff\n")
777 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
778 self.assertRaises(UnicodeError, t.read)
779 # (3) ignore
780 b = io.BytesIO(b"abc\n\xff\n")
781 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
782 self.assertEquals(t.read(), "abc\n\n")
783 # (4) replace
784 b = io.BytesIO(b"abc\n\xff\n")
785 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
786 self.assertEquals(t.read(), "abc\n\ufffd\n")
787
788 def testEncodingErrorsWriting(self):
789 # (1) default
790 b = io.BytesIO()
791 t = io.TextIOWrapper(b, encoding="ascii")
792 self.assertRaises(UnicodeError, t.write, "\xff")
793 # (2) explicit strict
794 b = io.BytesIO()
795 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
796 self.assertRaises(UnicodeError, t.write, "\xff")
797 # (3) ignore
798 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000799 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
800 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000801 t.write("abc\xffdef\n")
802 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000803 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000804 # (4) replace
805 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000806 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
807 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000808 t.write("abc\xffdef\n")
809 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000810 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000811
Guido van Rossum8358db22007-08-18 21:39:55 +0000812 def testNewlinesInput(self):
813 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
814 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
815 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000816 (None, normalized.decode("ascii").splitlines(True)),
817 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000818 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
819 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
820 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
821 ]:
822 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000823 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000824 self.assertEquals(txt.readlines(), expected)
825 txt.seek(0)
826 self.assertEquals(txt.read(), "".join(expected))
827
828 def testNewlinesOutput(self):
829 testdict = {
830 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
831 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
832 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
833 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
834 }
835 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
836 for newline, expected in tests:
837 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000838 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000839 txt.write("AAA\nB")
840 txt.write("BB\nCCC\n")
841 txt.write("X\rY\r\nZ")
842 txt.flush()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000843 self.assertEquals(buf.closed, False)
Guido van Rossum8358db22007-08-18 21:39:55 +0000844 self.assertEquals(buf.getvalue(), expected)
845
Guido van Rossum78892e42007-04-06 17:31:18 +0000846 def testNewlines(self):
847 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
848
849 tests = [
850 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000851 [ '', input_lines ],
852 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
853 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
854 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000855 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +0000856 encodings = (
857 'utf-8', 'latin-1',
858 'utf-16', 'utf-16-le', 'utf-16-be',
859 'utf-32', 'utf-32-le', 'utf-32-be',
860 )
Guido van Rossum78892e42007-04-06 17:31:18 +0000861
Guido van Rossum8358db22007-08-18 21:39:55 +0000862 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000863 # character in TextIOWrapper._pending_line.
864 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000865 # XXX: str.encode() should return bytes
866 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000867 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000868 for bufsize in range(1, 10):
869 for newline, exp_lines in tests:
870 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000871 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000872 encoding=encoding)
873 if do_reads:
874 got_lines = []
875 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000876 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000877 if c2 == '':
878 break
879 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000880 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000881 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000882 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000883
884 for got_line, exp_line in zip(got_lines, exp_lines):
885 self.assertEquals(got_line, exp_line)
886 self.assertEquals(len(got_lines), len(exp_lines))
887
Guido van Rossum8358db22007-08-18 21:39:55 +0000888 def testNewlinesInput(self):
889 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
890 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
891 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000892 (None, normalized.decode("ascii").splitlines(True)),
893 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000894 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
895 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
896 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
897 ]:
898 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000899 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000900 self.assertEquals(txt.readlines(), expected)
901 txt.seek(0)
902 self.assertEquals(txt.read(), "".join(expected))
903
904 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000905 data = "AAA\nBBB\rCCC\n"
906 data_lf = b"AAA\nBBB\rCCC\n"
907 data_cr = b"AAA\rBBB\rCCC\r"
908 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000909 save_linesep = os.linesep
910 try:
911 for os.linesep, newline, expected in [
912 ("\n", None, data_lf),
913 ("\r\n", None, data_crlf),
914 ("\n", "", data_lf),
915 ("\r\n", "", data_lf),
916 ("\n", "\n", data_lf),
917 ("\r\n", "\n", data_lf),
918 ("\n", "\r", data_cr),
919 ("\r\n", "\r", data_cr),
920 ("\n", "\r\n", data_crlf),
921 ("\r\n", "\r\n", data_crlf),
922 ]:
923 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000924 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000925 txt.write(data)
926 txt.close()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000927 self.assertEquals(buf.closed, True)
928 self.assertRaises(ValueError, buf.getvalue)
Guido van Rossum18363582007-08-18 22:05:58 +0000929 finally:
930 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000931
Guido van Rossum9b76da62007-04-11 01:09:03 +0000932 # Systematic tests of the text I/O API
933
934 def testBasicIO(self):
935 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
936 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000937 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000938 f._CHUNK_SIZE = chunksize
939 self.assertEquals(f.write("abc"), 3)
940 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000941 f = io.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000942 f._CHUNK_SIZE = chunksize
943 self.assertEquals(f.tell(), 0)
944 self.assertEquals(f.read(), "abc")
945 cookie = f.tell()
946 self.assertEquals(f.seek(0), 0)
947 self.assertEquals(f.read(2), "ab")
948 self.assertEquals(f.read(1), "c")
949 self.assertEquals(f.read(1), "")
950 self.assertEquals(f.read(), "")
951 self.assertEquals(f.tell(), cookie)
952 self.assertEquals(f.seek(0), 0)
953 self.assertEquals(f.seek(0, 2), cookie)
954 self.assertEquals(f.write("def"), 3)
955 self.assertEquals(f.seek(cookie), cookie)
956 self.assertEquals(f.read(), "def")
957 if enc.startswith("utf"):
958 self.multi_line_test(f, enc)
959 f.close()
960
961 def multi_line_test(self, f, enc):
962 f.seek(0)
963 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000964 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000965 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000966 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000967 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000968 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000969 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000970 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000971 wlines.append((f.tell(), line))
972 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000973 f.seek(0)
974 rlines = []
975 while True:
976 pos = f.tell()
977 line = f.readline()
978 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000979 break
980 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000981 self.assertEquals(rlines, wlines)
982
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000983 def testTelling(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000984 f = io.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000985 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000986 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000987 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000988 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000989 p2 = f.tell()
990 f.seek(0)
991 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000992 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000993 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000994 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000995 self.assertEquals(f.tell(), p2)
996 f.seek(0)
997 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000998 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000999 self.assertRaises(IOError, f.tell)
1000 self.assertEquals(f.tell(), p2)
1001 f.close()
1002
Guido van Rossumd76e7792007-04-17 02:38:04 +00001003 def testSeeking(self):
1004 chunk_size = io.TextIOWrapper._CHUNK_SIZE
1005 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001006 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001007 prefix = bytes(u_prefix.encode("utf-8"))
1008 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001009 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001010 suffix = bytes(u_suffix.encode("utf-8"))
1011 line = prefix + suffix
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001012 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001013 f.write(line*2)
1014 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001015 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001016 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001017 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001018 self.assertEquals(f.tell(), prefix_size)
1019 self.assertEquals(f.readline(), u_suffix)
1020
1021 def testSeekingToo(self):
1022 # Regression test for a specific bug
1023 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001024 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001025 f.write(data)
1026 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001027 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001028 f._CHUNK_SIZE # Just test that it exists
1029 f._CHUNK_SIZE = 2
1030 f.readline()
1031 f.tell()
1032
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001033 def testSeekAndTell(self):
1034 """Test seek/tell using the StatefulIncrementalDecoder."""
Antoine Pitrouc10c34d2009-01-17 02:40:13 +00001035 # Make this test faster by forcing a smaller (but large enough)
1036 # chunk size. The bigger the chunker size, the slower seek() is,
1037 # as it tries to replay character decoding one byte at a time.
1038 CHUNK_SIZE = 256
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001039
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001040 def testSeekAndTellWithData(data, min_pos=0):
1041 """Tell/seek to various points within a data stream and ensure
1042 that the decoded data returned by read() is consistent."""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001043 f = io.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001044 f.write(data)
1045 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001046 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001047 decoded = f.read()
1048 f.close()
1049
Neal Norwitze2b07052008-03-18 19:52:05 +00001050 for i in range(min_pos, len(decoded) + 1): # seek positions
1051 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001052 f = io.open(support.TESTFN, encoding='test_decoder')
Antoine Pitrouc10c34d2009-01-17 02:40:13 +00001053 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001054 self.assertEquals(f.read(i), decoded[:i])
1055 cookie = f.tell()
1056 self.assertEquals(f.read(j), decoded[i:i + j])
1057 f.seek(cookie)
1058 self.assertEquals(f.read(), decoded[i:])
1059 f.close()
1060
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001061 # Enable the test decoder.
1062 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001063
1064 # Run the tests.
1065 try:
1066 # Try each test case.
1067 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1068 testSeekAndTellWithData(input)
1069
1070 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001071 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1072 offset = CHUNK_SIZE - len(input)//2
1073 prefix = b'.'*offset
1074 # Don't bother seeking into the prefix (takes too long).
1075 min_pos = offset*2
1076 testSeekAndTellWithData(prefix + input, min_pos)
1077
1078 # Ensure our test decoder won't interfere with subsequent tests.
1079 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001080 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001081
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001082 def testEncodedWrites(self):
1083 data = "1234567890"
1084 tests = ("utf-16",
1085 "utf-16-le",
1086 "utf-16-be",
1087 "utf-32",
1088 "utf-32-le",
1089 "utf-32-be")
1090 for encoding in tests:
1091 buf = io.BytesIO()
1092 f = io.TextIOWrapper(buf, encoding=encoding)
1093 # Check if the BOM is written only once (see issue1753).
1094 f.write(data)
1095 f.write(data)
1096 f.seek(0)
1097 self.assertEquals(f.read(), data * 2)
1098 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1099
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001100 def timingTest(self):
1101 timer = time.time
1102 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001103 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001104 nlines = 10000
1105 nchars = len(line)
1106 nbytes = len(line.encode(enc))
1107 for chunk_size in (32, 64, 128, 256):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001108 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001109 f._CHUNK_SIZE = chunk_size
1110 t0 = timer()
1111 for i in range(nlines):
1112 f.write(line)
1113 f.flush()
1114 t1 = timer()
1115 f.seek(0)
1116 for line in f:
1117 pass
1118 t2 = timer()
1119 f.seek(0)
1120 while f.readline():
1121 pass
1122 t3 = timer()
1123 f.seek(0)
1124 while f.readline():
1125 f.tell()
1126 t4 = timer()
1127 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001128 if support.verbose:
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001129 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1130 (nlines, nchars, nbytes))
1131 print("File chunk size: %6s" % f._CHUNK_SIZE)
1132 print("Writing: %6.3f seconds" % (t1-t0))
1133 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1134 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1135 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1136
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001137 def testReadOneByOne(self):
1138 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1139 reads = ""
1140 while True:
1141 c = txt.read(1)
1142 if not c:
1143 break
1144 reads += c
1145 self.assertEquals(reads, "AA\nBB")
1146
1147 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1148 def testReadByChunk(self):
1149 # make sure "\r\n" straddles 128 char boundary.
1150 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1151 reads = ""
1152 while True:
1153 c = txt.read(128)
1154 if not c:
1155 break
1156 reads += c
1157 self.assertEquals(reads, "A"*127+"\nB")
1158
1159 def test_issue1395_1(self):
1160 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1161
1162 # read one char at a time
1163 reads = ""
1164 while True:
1165 c = txt.read(1)
1166 if not c:
1167 break
1168 reads += c
1169 self.assertEquals(reads, self.normalized)
1170
1171 def test_issue1395_2(self):
1172 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1173 txt._CHUNK_SIZE = 4
1174
1175 reads = ""
1176 while True:
1177 c = txt.read(4)
1178 if not c:
1179 break
1180 reads += c
1181 self.assertEquals(reads, self.normalized)
1182
1183 def test_issue1395_3(self):
1184 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1185 txt._CHUNK_SIZE = 4
1186
1187 reads = txt.read(4)
1188 reads += txt.read(4)
1189 reads += txt.readline()
1190 reads += txt.readline()
1191 reads += txt.readline()
1192 self.assertEquals(reads, self.normalized)
1193
1194 def test_issue1395_4(self):
1195 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1196 txt._CHUNK_SIZE = 4
1197
1198 reads = txt.read(4)
1199 reads += txt.read()
1200 self.assertEquals(reads, self.normalized)
1201
1202 def test_issue1395_5(self):
1203 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1204 txt._CHUNK_SIZE = 4
1205
1206 reads = txt.read(4)
1207 pos = txt.tell()
1208 txt.seek(0)
1209 txt.seek(pos)
1210 self.assertEquals(txt.read(4), "BBB\n")
1211
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001212 def test_issue2282(self):
1213 buffer = io.BytesIO(self.testdata)
1214 txt = io.TextIOWrapper(buffer, encoding="ascii")
1215
1216 self.assertEqual(buffer.seekable(), txt.seekable())
1217
Antoine Pitrou180a3362008-12-14 16:36:46 +00001218 def check_newline_decoder_utf8(self, decoder):
1219 # UTF-8 specific tests for a newline decoder
1220 def _check_decode(b, s, **kwargs):
1221 # We exercise getstate() / setstate() as well as decode()
1222 state = decoder.getstate()
1223 self.assertEquals(decoder.decode(b, **kwargs), s)
1224 decoder.setstate(state)
1225 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001226
Antoine Pitrou180a3362008-12-14 16:36:46 +00001227 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001228
Antoine Pitrou180a3362008-12-14 16:36:46 +00001229 _check_decode(b'\xe8', "")
1230 _check_decode(b'\xa2', "")
1231 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001232
Antoine Pitrou180a3362008-12-14 16:36:46 +00001233 _check_decode(b'\xe8', "")
1234 _check_decode(b'\xa2', "")
1235 _check_decode(b'\x88', "\u8888")
1236
1237 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001238 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1239
Antoine Pitrou180a3362008-12-14 16:36:46 +00001240 decoder.reset()
1241 _check_decode(b'\n', "\n")
1242 _check_decode(b'\r', "")
1243 _check_decode(b'', "\n", final=True)
1244 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001245
Antoine Pitrou180a3362008-12-14 16:36:46 +00001246 _check_decode(b'\r', "")
1247 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001248
Antoine Pitrou180a3362008-12-14 16:36:46 +00001249 _check_decode(b'\r\r\n', "\n\n")
1250 _check_decode(b'\r', "")
1251 _check_decode(b'\r', "\n")
1252 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001253
Antoine Pitrou180a3362008-12-14 16:36:46 +00001254 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1255 _check_decode(b'\xe8\xa2\x88', "\u8888")
1256 _check_decode(b'\n', "\n")
1257 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1258 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001259
Antoine Pitrou180a3362008-12-14 16:36:46 +00001260 def check_newline_decoder(self, decoder, encoding):
1261 result = []
1262 encoder = codecs.getincrementalencoder(encoding)()
1263 def _decode_bytewise(s):
1264 for b in encoder.encode(s):
1265 result.append(decoder.decode(bytes([b])))
1266 self.assertEquals(decoder.newlines, None)
1267 _decode_bytewise("abc\n\r")
1268 self.assertEquals(decoder.newlines, '\n')
1269 _decode_bytewise("\nabc")
1270 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1271 _decode_bytewise("abc\r")
1272 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1273 _decode_bytewise("abc")
1274 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1275 _decode_bytewise("abc\r")
1276 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1277 decoder.reset()
1278 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1279 self.assertEquals(decoder.newlines, None)
1280
1281 def test_newline_decoder(self):
1282 encodings = (
1283 'utf-8', 'latin-1',
1284 'utf-16', 'utf-16-le', 'utf-16-be',
1285 'utf-32', 'utf-32-le', 'utf-32-be',
1286 )
1287 for enc in encodings:
1288 decoder = codecs.getincrementaldecoder(enc)()
1289 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1290 self.check_newline_decoder(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001291 decoder = codecs.getincrementaldecoder("utf-8")()
1292 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrou180a3362008-12-14 16:36:46 +00001293 self.check_newline_decoder_utf8(decoder)
1294
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001295
Guido van Rossum01a27522007-03-07 01:00:12 +00001296# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001297
Guido van Rossum5abbf752007-08-27 17:39:33 +00001298class MiscIOTest(unittest.TestCase):
1299
Barry Warsaw40e82462008-11-20 20:14:50 +00001300 def tearDown(self):
1301 support.unlink(support.TESTFN)
1302
Guido van Rossum5abbf752007-08-27 17:39:33 +00001303 def testImport__all__(self):
1304 for name in io.__all__:
1305 obj = getattr(io, name, None)
1306 self.assert_(obj is not None, name)
1307 if name == "open":
1308 continue
1309 elif "error" in name.lower():
1310 self.assert_(issubclass(obj, Exception), name)
1311 else:
1312 self.assert_(issubclass(obj, io.IOBase))
1313
Benjamin Peterson65676e42008-11-05 21:42:45 +00001314
Barry Warsaw40e82462008-11-20 20:14:50 +00001315 def test_attributes(self):
1316 f = io.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001317 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001318 f.close()
1319
1320 f = io.open(support.TESTFN, "U")
1321 self.assertEquals(f.name, support.TESTFN)
1322 self.assertEquals(f.buffer.name, support.TESTFN)
1323 self.assertEquals(f.buffer.raw.name, support.TESTFN)
1324 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001325 self.assertEquals(f.buffer.mode, "rb")
1326 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001327 f.close()
1328
1329 f = io.open(support.TESTFN, "w+")
1330 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001331 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1332 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00001333
1334 g = io.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001335 self.assertEquals(g.mode, "wb")
1336 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001337 self.assertEquals(g.name, f.fileno())
1338 self.assertEquals(g.raw.name, f.fileno())
1339 f.close()
1340 g.close()
1341
Antoine Pitrou8043cf82009-01-09 19:54:29 +00001342 def test_io_after_close(self):
1343 for kwargs in [
1344 {"mode": "w"},
1345 {"mode": "wb"},
1346 {"mode": "w", "buffering": 1},
1347 {"mode": "w", "buffering": 2},
1348 {"mode": "wb", "buffering": 0},
1349 {"mode": "r"},
1350 {"mode": "rb"},
1351 {"mode": "r", "buffering": 1},
1352 {"mode": "r", "buffering": 2},
1353 {"mode": "rb", "buffering": 0},
1354 {"mode": "w+"},
1355 {"mode": "w+b"},
1356 {"mode": "w+", "buffering": 1},
1357 {"mode": "w+", "buffering": 2},
1358 {"mode": "w+b", "buffering": 0},
1359 ]:
1360 f = io.open(support.TESTFN, **kwargs)
1361 f.close()
1362 self.assertRaises(ValueError, f.flush)
1363 self.assertRaises(ValueError, f.fileno)
1364 self.assertRaises(ValueError, f.isatty)
1365 self.assertRaises(ValueError, f.__iter__)
1366 if hasattr(f, "peek"):
1367 self.assertRaises(ValueError, f.peek, 1)
1368 self.assertRaises(ValueError, f.read)
1369 if hasattr(f, "read1"):
1370 self.assertRaises(ValueError, f.read1, 1024)
1371 if hasattr(f, "readinto"):
1372 self.assertRaises(ValueError, f.readinto, bytearray(1024))
1373 self.assertRaises(ValueError, f.readline)
1374 self.assertRaises(ValueError, f.readlines)
1375 self.assertRaises(ValueError, f.seek, 0)
1376 self.assertRaises(ValueError, f.tell)
1377 self.assertRaises(ValueError, f.truncate)
1378 self.assertRaises(ValueError, f.write, "")
1379 self.assertRaises(ValueError, f.writelines, [])
1380
Barry Warsaw40e82462008-11-20 20:14:50 +00001381
Guido van Rossum28524c72007-02-27 05:47:44 +00001382def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001383 support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Christian Heimes81ee3ef2008-05-04 22:42:01 +00001384 BufferedReaderTest, BufferedWriterTest,
1385 BufferedRWPairTest, BufferedRandomTest,
1386 StatefulIncrementalDecoderTest,
1387 TextIOWrapperTest, MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001388
1389if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001390 unittest.main()