blob: 1733440cacc66b550ffba3c6c4f46a210dc4ad3c [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Antoine Pitrou87695762008-08-14 22:44:29 +00007import threading
8import random
Guido van Rossum28524c72007-02-27 05:47:44 +00009import unittest
Antoine Pitrou87695762008-08-14 22:44:29 +000010from itertools import chain, cycle
Benjamin Petersonee8712c2008-05-20 21:35:26 +000011from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000012
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000013import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000014import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000015
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum53807da2007-04-10 19:01:47 +000017class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000018
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000019 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000022
23 def read(self, n=None):
24 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000025 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000027 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000028
Guido van Rossum01a27522007-03-07 01:00:12 +000029 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000030 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000031 return len(b)
32
33 def writable(self):
34 return True
35
Guido van Rossum68bbcd22007-02-27 17:19:33 +000036 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000043 return True
44
Guido van Rossum01a27522007-03-07 01:00:12 +000045 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000049 return 42
50
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Guido van Rossum78892e42007-04-06 17:31:18 +000052class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
Guido van Rossum01a27522007-03-07 01:00:12 +000064class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000065
Guido van Rossum53807da2007-04-10 19:01:47 +000066 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000068 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000069
Guido van Rossum01a27522007-03-07 01:00:12 +000070 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000072 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000074 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000075 else:
76 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000077
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def writable(self):
79 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080
Guido van Rossuma9e20242007-03-08 00:43:48 +000081
Guido van Rossum28524c72007-02-27 05:47:44 +000082class IOTest(unittest.TestCase):
83
Neal Norwitze7789b12008-03-24 06:18:09 +000084 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000085 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +000086
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000087 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000088 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000089
Guido van Rossum28524c72007-02-27 05:47:44 +000090 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000091 self.assertEqual(f.write(b"blah."), 5)
92 self.assertEqual(f.seek(0), 0)
93 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000094 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000095 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000096 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000097 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000098 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000099 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000100 self.assertEqual(f.seek(-1, 2), 13)
101 self.assertEqual(f.tell(), 13)
102 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000103 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000104 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000105
Guido van Rossum9b76da62007-04-11 01:09:03 +0000106 def read_ops(self, f, buffered=False):
107 data = f.read(5)
108 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000109 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000110 self.assertEqual(f.readinto(data), 5)
111 self.assertEqual(data, b" worl")
112 self.assertEqual(f.readinto(data), 2)
113 self.assertEqual(len(data), 5)
114 self.assertEqual(data[:2], b"d\n")
115 self.assertEqual(f.seek(0), 0)
116 self.assertEqual(f.read(20), b"hello world\n")
117 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000118 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000119 self.assertEqual(f.seek(-6, 2), 6)
120 self.assertEqual(f.read(5), b"world")
121 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000122 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000123 self.assertEqual(f.seek(-6, 1), 5)
124 self.assertEqual(f.read(5), b" worl")
125 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000126 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000127 if buffered:
128 f.seek(0)
129 self.assertEqual(f.read(), b"hello world\n")
130 f.seek(6)
131 self.assertEqual(f.read(), b"world\n")
132 self.assertEqual(f.read(), b"")
133
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 LARGE = 2**31
135
Guido van Rossum53807da2007-04-10 19:01:47 +0000136 def large_file_ops(self, f):
137 assert f.readable()
138 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.seek(self.LARGE), self.LARGE)
140 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000141 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000142 self.assertEqual(f.tell(), self.LARGE + 3)
143 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000144 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000145 self.assertEqual(f.tell(), self.LARGE + 2)
146 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000147 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000148 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000149 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
150 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000151 self.assertEqual(f.read(2), b"x")
152
Guido van Rossum28524c72007-02-27 05:47:44 +0000153 def test_raw_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 f = io.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000155 self.assertEqual(f.readable(), False)
156 self.assertEqual(f.writable(), True)
157 self.assertEqual(f.seekable(), True)
158 self.write_ops(f)
159 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000160 f = io.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000161 self.assertEqual(f.readable(), True)
162 self.assertEqual(f.writable(), False)
163 self.assertEqual(f.seekable(), True)
164 self.read_ops(f)
165 f.close()
166
Guido van Rossum87429772007-04-10 21:06:59 +0000167 def test_buffered_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000168 f = io.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000169 self.assertEqual(f.readable(), False)
170 self.assertEqual(f.writable(), True)
171 self.assertEqual(f.seekable(), True)
172 self.write_ops(f)
173 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000174 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000175 self.assertEqual(f.readable(), True)
176 self.assertEqual(f.writable(), False)
177 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000178 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000179 f.close()
180
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000181 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000182 f = io.open(support.TESTFN, "wb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000183 f.write(b"abc\ndef\nxyzzy\nfoo")
184 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000185 f = io.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000186 self.assertEqual(f.readline(), b"abc\n")
187 self.assertEqual(f.readline(10), b"def\n")
188 self.assertEqual(f.readline(2), b"xy")
189 self.assertEqual(f.readline(4), b"zzy\n")
190 self.assertEqual(f.readline(), b"foo")
191 f.close()
192
Guido van Rossum28524c72007-02-27 05:47:44 +0000193 def test_raw_bytes_io(self):
194 f = io.BytesIO()
195 self.write_ops(f)
196 data = f.getvalue()
197 self.assertEqual(data, b"hello world\n")
198 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000199 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000200
Guido van Rossum53807da2007-04-10 19:01:47 +0000201 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000202 # On Windows and Mac OSX this test comsumes large resources; It takes
203 # a long time to build the >2GB file and takes >2GB of disk space
204 # therefore the resource must be enabled to run this test.
205 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000206 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000207 print("\nTesting large file ops skipped on %s." % sys.platform,
208 file=sys.stderr)
209 print("It requires %d bytes and a long time." % self.LARGE,
210 file=sys.stderr)
211 print("Use 'regrtest.py -u largefile test_io' to run it.",
212 file=sys.stderr)
213 return
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000214 f = io.open(support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000215 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000216 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000217 f = io.open(support.TESTFN, "w+b")
Guido van Rossum87429772007-04-10 21:06:59 +0000218 self.large_file_ops(f)
219 f.close()
220
221 def test_with_open(self):
222 for bufsize in (0, 1, 100):
223 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000225 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000226 self.assertEqual(f.closed, True)
227 f = None
228 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000230 1/0
231 except ZeroDivisionError:
232 self.assertEqual(f.closed, True)
233 else:
234 self.fail("1/0 didn't raise an exception")
235
236 def test_destructor(self):
237 record = []
238 class MyFileIO(io.FileIO):
239 def __del__(self):
240 record.append(1)
241 io.FileIO.__del__(self)
242 def close(self):
243 record.append(2)
244 io.FileIO.close(self)
245 def flush(self):
246 record.append(3)
247 io.FileIO.flush(self)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000248 f = MyFileIO(support.TESTFN, "w")
Guido van Rossum87429772007-04-10 21:06:59 +0000249 f.write("xxx")
250 del f
251 self.assertEqual(record, [1, 2, 3])
252
253 def test_close_flushes(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000254 f = io.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000255 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000256 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000257 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000258 self.assertEqual(f.read(), b"xxx")
259 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000260
Guido van Rossumd4103952007-04-12 05:44:49 +0000261 def test_array_writes(self):
262 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000263 n = len(memoryview(a))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 f = io.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000265 self.assertEqual(f.write(a), n)
266 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 f = io.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000268 self.assertEqual(f.write(a), n)
269 f.close()
270
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000271 def test_closefd(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 self.assertRaises(ValueError, io.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000273 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Christian Heimesecc42a22008-11-05 19:30:32 +0000275 def testReadClosed(self):
276 with io.open(support.TESTFN, "w") as f:
277 f.write("egg\n")
278 with io.open(support.TESTFN, "r") as f:
279 file = io.open(f.fileno(), "r", closefd=False)
280 self.assertEqual(file.read(), "egg\n")
281 file.seek(0)
282 file.close()
283 self.assertRaises(ValueError, file.read)
284
285 def test_no_closefd_with_filename(self):
286 # can't use closefd in combination with a file name
287 self.assertRaises(ValueError, io.open, support.TESTFN, "r", closefd=False)
288
289 def test_closefd_attr(self):
290 with io.open(support.TESTFN, "wb") as f:
291 f.write(b"egg\n")
292 with io.open(support.TESTFN, "r") as f:
293 self.assertEqual(f.buffer.raw.closefd, True)
294 file = io.open(f.fileno(), "r", closefd=False)
295 self.assertEqual(file.buffer.raw.closefd, False)
296
297
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000298class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000299
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000300 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000301 buf = self.buftype("1234567890")
302 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000303
304 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000305 buf = self.buftype("1234567890")
306 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000307
308 self.assertEquals(buf[:1], bytesIo.read(1))
309 self.assertEquals(buf[1:5], bytesIo.read(4))
310 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000311 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000312
313 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000314 buf = self.buftype("1234567890")
315 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000316
317 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000318 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000319
320 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000321 buf = self.buftype("1234567890")
322 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000323
324 bytesIo.read(5)
325 bytesIo.seek(0)
326 self.assertEquals(buf, bytesIo.read())
327
328 bytesIo.seek(3)
329 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000330 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000331
332 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000333 buf = self.buftype("1234567890")
334 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000335
336 self.assertEquals(0, bytesIo.tell())
337 bytesIo.seek(5)
338 self.assertEquals(5, bytesIo.tell())
339 bytesIo.seek(10000)
340 self.assertEquals(10000, bytesIo.tell())
341
Guido van Rossuma9e20242007-03-08 00:43:48 +0000342
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000343class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000344 @staticmethod
345 def buftype(s):
346 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000347 ioclass = io.BytesIO
348 EOF = b""
349
350
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000352 buftype = str
353 ioclass = io.StringIO
354 EOF = ""
355
356
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000357class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000358
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000359 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000360 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000362
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000363 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000364
Guido van Rossum78892e42007-04-06 17:31:18 +0000365 def testBuffering(self):
366 data = b"abcdefghi"
367 dlen = len(data)
368
369 tests = [
370 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
371 [ 100, [ 3, 3, 3], [ dlen ] ],
372 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
373 ]
374
375 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000376 rawio = MockFileIO(data)
377 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000378 pos = 0
379 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000381 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000382 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000383
Guido van Rossum01a27522007-03-07 01:00:12 +0000384 def testReadNonBlocking(self):
385 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000386 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000387 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000388
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000389 self.assertEquals(b"abcd", bufio.read(6))
390 self.assertEquals(b"e", bufio.read(1))
391 self.assertEquals(b"fg", bufio.read())
392 self.assert_(None is bufio.read())
393 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000394
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000395 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000396 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000397 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000398
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
401 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000406
407 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000408 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000410
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000411 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000412
413 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000414 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000415 # this test. Else, write it.
416 pass
417
Antoine Pitrou87695762008-08-14 22:44:29 +0000418 def testThreads(self):
419 try:
420 # Write out many bytes with exactly the same number of 0's,
421 # 1's... 255's. This will help us check that concurrent reading
422 # doesn't duplicate or forget contents.
423 N = 1000
424 l = list(range(256)) * N
425 random.shuffle(l)
426 s = bytes(bytearray(l))
427 with io.open(support.TESTFN, "wb") as f:
428 f.write(s)
429 with io.open(support.TESTFN, "rb", buffering=0) as raw:
430 bufio = io.BufferedReader(raw, 8)
431 errors = []
432 results = []
433 def f():
434 try:
435 # Intra-buffer read then buffer-flushing read
436 for n in cycle([1, 19]):
437 s = bufio.read(n)
438 if not s:
439 break
440 # list.append() is atomic
441 results.append(s)
442 except Exception as e:
443 errors.append(e)
444 raise
445 threads = [threading.Thread(target=f) for x in range(20)]
446 for t in threads:
447 t.start()
448 time.sleep(0.02) # yield
449 for t in threads:
450 t.join()
451 self.assertFalse(errors,
452 "the following exceptions were caught: %r" % errors)
453 s = b''.join(results)
454 for i in range(256):
455 c = bytes(bytearray([i]))
456 self.assertEqual(s.count(c), N)
457 finally:
458 support.unlink(support.TESTFN)
459
460
Guido van Rossuma9e20242007-03-08 00:43:48 +0000461
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000462class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000463
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000464 def testWrite(self):
465 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000466 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000467 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000468
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000469 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000470
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000471 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000472
473 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000474 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000475 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000476
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000477 bufio.write(b"abc")
478 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000479
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000480 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000481
Guido van Rossum01a27522007-03-07 01:00:12 +0000482 def testWriteNonBlocking(self):
483 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000484 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000485
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000486 bufio.write(b"asdf")
487 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000488 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
489
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000490 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000491 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000492 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000493 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
494 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
495
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000496 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000497
Guido van Rossum78892e42007-04-06 17:31:18 +0000498 # XXX I don't like this test. It relies too heavily on how the
499 # algorithm actually works, which we might change. Refactor
500 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000501
502 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000503 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000504 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000505
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000506 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000507
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000508 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000509 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000510 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000511
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000512 bufio.write(b"abc")
513 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000514
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000515 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000516
Antoine Pitrou87695762008-08-14 22:44:29 +0000517 def testThreads(self):
518 # BufferedWriter should not raise exceptions or crash
519 # when called from multiple threads.
520 try:
521 # We use a real file object because it allows us to
522 # exercise situations where the GIL is released before
523 # writing the buffer to the raw streams. This is in addition
524 # to concurrency issues due to switching threads in the middle
525 # of Python code.
526 with io.open(support.TESTFN, "wb", buffering=0) as raw:
527 bufio = io.BufferedWriter(raw, 8)
528 errors = []
529 def f():
530 try:
531 # Write enough bytes to flush the buffer
532 s = b"a" * 19
533 for i in range(50):
534 bufio.write(s)
535 except Exception as e:
536 errors.append(e)
537 raise
538 threads = [threading.Thread(target=f) for x in range(20)]
539 for t in threads:
540 t.start()
541 time.sleep(0.02) # yield
542 for t in threads:
543 t.join()
544 self.assertFalse(errors,
545 "the following exceptions were caught: %r" % errors)
546 finally:
547 support.unlink(support.TESTFN)
548
Guido van Rossuma9e20242007-03-08 00:43:48 +0000549
Guido van Rossum01a27522007-03-07 01:00:12 +0000550class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000551
Guido van Rossum01a27522007-03-07 01:00:12 +0000552 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000553 r = MockRawIO(())
554 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000555 pair = io.BufferedRWPair(r, w)
556
557 # XXX need implementation
558
Guido van Rossuma9e20242007-03-08 00:43:48 +0000559
Guido van Rossum78892e42007-04-06 17:31:18 +0000560class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000561
Guido van Rossum01a27522007-03-07 01:00:12 +0000562 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000563 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000564 rw = io.BufferedRandom(raw, 8, 12)
565
566 self.assertEqual(b"as", rw.read(2))
567 rw.write(b"ddd")
568 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000569 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000570 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000571 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000572
573 def testSeekAndTell(self):
574 raw = io.BytesIO(b"asdfghjkl")
575 rw = io.BufferedRandom(raw)
576
577 self.assertEquals(b"as", rw.read(2))
578 self.assertEquals(2, rw.tell())
579 rw.seek(0, 0)
580 self.assertEquals(b"asdf", rw.read(4))
581
582 rw.write(b"asdf")
583 rw.seek(0, 0)
584 self.assertEquals(b"asdfasdfl", rw.read())
585 self.assertEquals(9, rw.tell())
586 rw.seek(-4, 2)
587 self.assertEquals(5, rw.tell())
588 rw.seek(2, 1)
589 self.assertEquals(7, rw.tell())
590 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000591 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000592
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000593# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
594# properties:
595# - A single output character can correspond to many bytes of input.
596# - The number of input bytes to complete the character can be
597# undetermined until the last input byte is received.
598# - The number of input bytes can vary depending on previous input.
599# - A single input byte can correspond to many characters of output.
600# - The number of output characters can be undetermined until the
601# last input byte is received.
602# - The number of output characters can vary depending on previous input.
603
604class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
605 """
606 For testing seek/tell behavior with a stateful, buffering decoder.
607
608 Input is a sequence of words. Words may be fixed-length (length set
609 by input) or variable-length (period-terminated). In variable-length
610 mode, extra periods are ignored. Possible words are:
611 - 'i' followed by a number sets the input length, I (maximum 99).
612 When I is set to 0, words are space-terminated.
613 - 'o' followed by a number sets the output length, O (maximum 99).
614 - Any other word is converted into a word followed by a period on
615 the output. The output word consists of the input word truncated
616 or padded out with hyphens to make its length equal to O. If O
617 is 0, the word is output verbatim without truncating or padding.
618 I and O are initially set to 1. When I changes, any buffered input is
619 re-scanned according to the new I. EOF also terminates the last word.
620 """
621
622 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +0000623 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000624 self.reset()
625
626 def __repr__(self):
627 return '<SID %x>' % id(self)
628
629 def reset(self):
630 self.i = 1
631 self.o = 1
632 self.buffer = bytearray()
633
634 def getstate(self):
635 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
636 return bytes(self.buffer), i*100 + o
637
638 def setstate(self, state):
639 buffer, io = state
640 self.buffer = bytearray(buffer)
641 i, o = divmod(io, 100)
642 self.i, self.o = i ^ 1, o ^ 1
643
644 def decode(self, input, final=False):
645 output = ''
646 for b in input:
647 if self.i == 0: # variable-length, terminated with period
648 if b == ord('.'):
649 if self.buffer:
650 output += self.process_word()
651 else:
652 self.buffer.append(b)
653 else: # fixed-length, terminate after self.i bytes
654 self.buffer.append(b)
655 if len(self.buffer) == self.i:
656 output += self.process_word()
657 if final and self.buffer: # EOF terminates the last word
658 output += self.process_word()
659 return output
660
661 def process_word(self):
662 output = ''
663 if self.buffer[0] == ord('i'):
664 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
665 elif self.buffer[0] == ord('o'):
666 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
667 else:
668 output = self.buffer.decode('ascii')
669 if len(output) < self.o:
670 output += '-'*self.o # pad out with hyphens
671 if self.o:
672 output = output[:self.o] # truncate to output length
673 output += '.'
674 self.buffer = bytearray()
675 return output
676
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000677 codecEnabled = False
678
679 @classmethod
680 def lookupTestDecoder(cls, name):
681 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +0000682 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000683 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +0000684 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000685 incrementalencoder=None,
686 streamreader=None, streamwriter=None,
687 incrementaldecoder=cls)
688
689# Register the previous decoder for testing.
690# Disabled by default, tests will enable it.
691codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
692
693
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000694class StatefulIncrementalDecoderTest(unittest.TestCase):
695 """
696 Make sure the StatefulIncrementalDecoder actually works.
697 """
698
699 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000700 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000701 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000702 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000703 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000704 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000705 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000706 # I=0, O=6 (variable-length input, fixed-length output)
707 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
708 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000709 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000710 # I=6, O=3 (fixed-length input > fixed-length output)
711 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
712 # I=0, then 3; O=29, then 15 (with longer output)
713 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
714 'a----------------------------.' +
715 'b----------------------------.' +
716 'cde--------------------------.' +
717 'abcdefghijabcde.' +
718 'a.b------------.' +
719 '.c.------------.' +
720 'd.e------------.' +
721 'k--------------.' +
722 'l--------------.' +
723 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000724 ]
725
726 def testDecoder(self):
727 # Try a few one-shot test cases.
728 for input, eof, output in self.test_cases:
729 d = StatefulIncrementalDecoder()
730 self.assertEquals(d.decode(input, eof), output)
731
732 # Also test an unfinished decode, followed by forcing EOF.
733 d = StatefulIncrementalDecoder()
734 self.assertEquals(d.decode(b'oiabcd'), '')
735 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000736
737class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000738
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000739 def setUp(self):
740 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
741 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000742 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000743
Guido van Rossumd0712812007-04-11 16:32:43 +0000744 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000745 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000746
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000747 def testLineBuffering(self):
748 r = io.BytesIO()
749 b = io.BufferedWriter(r, 1000)
750 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
751 t.write("X")
752 self.assertEquals(r.getvalue(), b"") # No flush happened
753 t.write("Y\nZ")
754 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
755 t.write("A\rB")
756 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
757
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000758 def testEncodingErrorsReading(self):
759 # (1) default
760 b = io.BytesIO(b"abc\n\xff\n")
761 t = io.TextIOWrapper(b, encoding="ascii")
762 self.assertRaises(UnicodeError, t.read)
763 # (2) explicit strict
764 b = io.BytesIO(b"abc\n\xff\n")
765 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
766 self.assertRaises(UnicodeError, t.read)
767 # (3) ignore
768 b = io.BytesIO(b"abc\n\xff\n")
769 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
770 self.assertEquals(t.read(), "abc\n\n")
771 # (4) replace
772 b = io.BytesIO(b"abc\n\xff\n")
773 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
774 self.assertEquals(t.read(), "abc\n\ufffd\n")
775
776 def testEncodingErrorsWriting(self):
777 # (1) default
778 b = io.BytesIO()
779 t = io.TextIOWrapper(b, encoding="ascii")
780 self.assertRaises(UnicodeError, t.write, "\xff")
781 # (2) explicit strict
782 b = io.BytesIO()
783 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
784 self.assertRaises(UnicodeError, t.write, "\xff")
785 # (3) ignore
786 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000787 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
788 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000789 t.write("abc\xffdef\n")
790 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000791 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000792 # (4) replace
793 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000794 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
795 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000796 t.write("abc\xffdef\n")
797 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000798 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000799
Guido van Rossum8358db22007-08-18 21:39:55 +0000800 def testNewlinesInput(self):
801 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
802 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
803 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000804 (None, normalized.decode("ascii").splitlines(True)),
805 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000806 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
807 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
808 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
809 ]:
810 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000811 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000812 self.assertEquals(txt.readlines(), expected)
813 txt.seek(0)
814 self.assertEquals(txt.read(), "".join(expected))
815
816 def testNewlinesOutput(self):
817 testdict = {
818 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
819 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
820 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
821 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
822 }
823 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
824 for newline, expected in tests:
825 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000826 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000827 txt.write("AAA\nB")
828 txt.write("BB\nCCC\n")
829 txt.write("X\rY\r\nZ")
830 txt.flush()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000831 self.assertEquals(buf.closed, False)
Guido van Rossum8358db22007-08-18 21:39:55 +0000832 self.assertEquals(buf.getvalue(), expected)
833
Guido van Rossum78892e42007-04-06 17:31:18 +0000834 def testNewlines(self):
835 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
836
837 tests = [
838 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000839 [ '', input_lines ],
840 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
841 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
842 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000843 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +0000844 encodings = (
845 'utf-8', 'latin-1',
846 'utf-16', 'utf-16-le', 'utf-16-be',
847 'utf-32', 'utf-32-le', 'utf-32-be',
848 )
Guido van Rossum78892e42007-04-06 17:31:18 +0000849
Guido van Rossum8358db22007-08-18 21:39:55 +0000850 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000851 # character in TextIOWrapper._pending_line.
852 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000853 # XXX: str.encode() should return bytes
854 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000855 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000856 for bufsize in range(1, 10):
857 for newline, exp_lines in tests:
858 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000859 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000860 encoding=encoding)
861 if do_reads:
862 got_lines = []
863 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000864 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000865 if c2 == '':
866 break
867 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000868 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000869 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000870 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000871
872 for got_line, exp_line in zip(got_lines, exp_lines):
873 self.assertEquals(got_line, exp_line)
874 self.assertEquals(len(got_lines), len(exp_lines))
875
Guido van Rossum8358db22007-08-18 21:39:55 +0000876 def testNewlinesInput(self):
877 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
878 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
879 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000880 (None, normalized.decode("ascii").splitlines(True)),
881 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000882 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
883 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
884 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
885 ]:
886 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000887 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000888 self.assertEquals(txt.readlines(), expected)
889 txt.seek(0)
890 self.assertEquals(txt.read(), "".join(expected))
891
892 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000893 data = "AAA\nBBB\rCCC\n"
894 data_lf = b"AAA\nBBB\rCCC\n"
895 data_cr = b"AAA\rBBB\rCCC\r"
896 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000897 save_linesep = os.linesep
898 try:
899 for os.linesep, newline, expected in [
900 ("\n", None, data_lf),
901 ("\r\n", None, data_crlf),
902 ("\n", "", data_lf),
903 ("\r\n", "", data_lf),
904 ("\n", "\n", data_lf),
905 ("\r\n", "\n", data_lf),
906 ("\n", "\r", data_cr),
907 ("\r\n", "\r", data_cr),
908 ("\n", "\r\n", data_crlf),
909 ("\r\n", "\r\n", data_crlf),
910 ]:
911 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000912 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000913 txt.write(data)
914 txt.close()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000915 self.assertEquals(buf.closed, True)
916 self.assertRaises(ValueError, buf.getvalue)
Guido van Rossum18363582007-08-18 22:05:58 +0000917 finally:
918 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000919
Guido van Rossum9b76da62007-04-11 01:09:03 +0000920 # Systematic tests of the text I/O API
921
922 def testBasicIO(self):
923 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
924 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000925 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000926 f._CHUNK_SIZE = chunksize
927 self.assertEquals(f.write("abc"), 3)
928 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000929 f = io.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000930 f._CHUNK_SIZE = chunksize
931 self.assertEquals(f.tell(), 0)
932 self.assertEquals(f.read(), "abc")
933 cookie = f.tell()
934 self.assertEquals(f.seek(0), 0)
935 self.assertEquals(f.read(2), "ab")
936 self.assertEquals(f.read(1), "c")
937 self.assertEquals(f.read(1), "")
938 self.assertEquals(f.read(), "")
939 self.assertEquals(f.tell(), cookie)
940 self.assertEquals(f.seek(0), 0)
941 self.assertEquals(f.seek(0, 2), cookie)
942 self.assertEquals(f.write("def"), 3)
943 self.assertEquals(f.seek(cookie), cookie)
944 self.assertEquals(f.read(), "def")
945 if enc.startswith("utf"):
946 self.multi_line_test(f, enc)
947 f.close()
948
949 def multi_line_test(self, f, enc):
950 f.seek(0)
951 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000952 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000953 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000954 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000955 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000956 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000957 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000958 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000959 wlines.append((f.tell(), line))
960 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000961 f.seek(0)
962 rlines = []
963 while True:
964 pos = f.tell()
965 line = f.readline()
966 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000967 break
968 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000969 self.assertEquals(rlines, wlines)
970
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000971 def testTelling(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000972 f = io.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000973 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000974 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000975 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000976 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000977 p2 = f.tell()
978 f.seek(0)
979 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000980 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000981 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000982 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000983 self.assertEquals(f.tell(), p2)
984 f.seek(0)
985 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000986 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000987 self.assertRaises(IOError, f.tell)
988 self.assertEquals(f.tell(), p2)
989 f.close()
990
Guido van Rossumd76e7792007-04-17 02:38:04 +0000991 def testSeeking(self):
992 chunk_size = io.TextIOWrapper._CHUNK_SIZE
993 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000994 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000995 prefix = bytes(u_prefix.encode("utf-8"))
996 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000997 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000998 suffix = bytes(u_suffix.encode("utf-8"))
999 line = prefix + suffix
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001000 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001001 f.write(line*2)
1002 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001003 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001004 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001005 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001006 self.assertEquals(f.tell(), prefix_size)
1007 self.assertEquals(f.readline(), u_suffix)
1008
1009 def testSeekingToo(self):
1010 # Regression test for a specific bug
1011 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001012 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001013 f.write(data)
1014 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001015 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001016 f._CHUNK_SIZE # Just test that it exists
1017 f._CHUNK_SIZE = 2
1018 f.readline()
1019 f.tell()
1020
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001021 def testSeekAndTell(self):
1022 """Test seek/tell using the StatefulIncrementalDecoder."""
1023
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001024 def testSeekAndTellWithData(data, min_pos=0):
1025 """Tell/seek to various points within a data stream and ensure
1026 that the decoded data returned by read() is consistent."""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001027 f = io.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001028 f.write(data)
1029 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001030 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001031 decoded = f.read()
1032 f.close()
1033
Neal Norwitze2b07052008-03-18 19:52:05 +00001034 for i in range(min_pos, len(decoded) + 1): # seek positions
1035 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001036 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001037 self.assertEquals(f.read(i), decoded[:i])
1038 cookie = f.tell()
1039 self.assertEquals(f.read(j), decoded[i:i + j])
1040 f.seek(cookie)
1041 self.assertEquals(f.read(), decoded[i:])
1042 f.close()
1043
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001044 # Enable the test decoder.
1045 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001046
1047 # Run the tests.
1048 try:
1049 # Try each test case.
1050 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1051 testSeekAndTellWithData(input)
1052
1053 # Position each test case so that it crosses a chunk boundary.
1054 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1055 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1056 offset = CHUNK_SIZE - len(input)//2
1057 prefix = b'.'*offset
1058 # Don't bother seeking into the prefix (takes too long).
1059 min_pos = offset*2
1060 testSeekAndTellWithData(prefix + input, min_pos)
1061
1062 # Ensure our test decoder won't interfere with subsequent tests.
1063 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001064 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001065
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001066 def testEncodedWrites(self):
1067 data = "1234567890"
1068 tests = ("utf-16",
1069 "utf-16-le",
1070 "utf-16-be",
1071 "utf-32",
1072 "utf-32-le",
1073 "utf-32-be")
1074 for encoding in tests:
1075 buf = io.BytesIO()
1076 f = io.TextIOWrapper(buf, encoding=encoding)
1077 # Check if the BOM is written only once (see issue1753).
1078 f.write(data)
1079 f.write(data)
1080 f.seek(0)
1081 self.assertEquals(f.read(), data * 2)
1082 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1083
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001084 def timingTest(self):
1085 timer = time.time
1086 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001087 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001088 nlines = 10000
1089 nchars = len(line)
1090 nbytes = len(line.encode(enc))
1091 for chunk_size in (32, 64, 128, 256):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001092 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001093 f._CHUNK_SIZE = chunk_size
1094 t0 = timer()
1095 for i in range(nlines):
1096 f.write(line)
1097 f.flush()
1098 t1 = timer()
1099 f.seek(0)
1100 for line in f:
1101 pass
1102 t2 = timer()
1103 f.seek(0)
1104 while f.readline():
1105 pass
1106 t3 = timer()
1107 f.seek(0)
1108 while f.readline():
1109 f.tell()
1110 t4 = timer()
1111 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001112 if support.verbose:
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001113 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1114 (nlines, nchars, nbytes))
1115 print("File chunk size: %6s" % f._CHUNK_SIZE)
1116 print("Writing: %6.3f seconds" % (t1-t0))
1117 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1118 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1119 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1120
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001121 def testReadOneByOne(self):
1122 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1123 reads = ""
1124 while True:
1125 c = txt.read(1)
1126 if not c:
1127 break
1128 reads += c
1129 self.assertEquals(reads, "AA\nBB")
1130
1131 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1132 def testReadByChunk(self):
1133 # make sure "\r\n" straddles 128 char boundary.
1134 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1135 reads = ""
1136 while True:
1137 c = txt.read(128)
1138 if not c:
1139 break
1140 reads += c
1141 self.assertEquals(reads, "A"*127+"\nB")
1142
1143 def test_issue1395_1(self):
1144 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1145
1146 # read one char at a time
1147 reads = ""
1148 while True:
1149 c = txt.read(1)
1150 if not c:
1151 break
1152 reads += c
1153 self.assertEquals(reads, self.normalized)
1154
1155 def test_issue1395_2(self):
1156 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1157 txt._CHUNK_SIZE = 4
1158
1159 reads = ""
1160 while True:
1161 c = txt.read(4)
1162 if not c:
1163 break
1164 reads += c
1165 self.assertEquals(reads, self.normalized)
1166
1167 def test_issue1395_3(self):
1168 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1169 txt._CHUNK_SIZE = 4
1170
1171 reads = txt.read(4)
1172 reads += txt.read(4)
1173 reads += txt.readline()
1174 reads += txt.readline()
1175 reads += txt.readline()
1176 self.assertEquals(reads, self.normalized)
1177
1178 def test_issue1395_4(self):
1179 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1180 txt._CHUNK_SIZE = 4
1181
1182 reads = txt.read(4)
1183 reads += txt.read()
1184 self.assertEquals(reads, self.normalized)
1185
1186 def test_issue1395_5(self):
1187 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1188 txt._CHUNK_SIZE = 4
1189
1190 reads = txt.read(4)
1191 pos = txt.tell()
1192 txt.seek(0)
1193 txt.seek(pos)
1194 self.assertEquals(txt.read(4), "BBB\n")
1195
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001196 def test_issue2282(self):
1197 buffer = io.BytesIO(self.testdata)
1198 txt = io.TextIOWrapper(buffer, encoding="ascii")
1199
1200 self.assertEqual(buffer.seekable(), txt.seekable())
1201
Antoine Pitrou180a3362008-12-14 16:36:46 +00001202 def check_newline_decoder_utf8(self, decoder):
1203 # UTF-8 specific tests for a newline decoder
1204 def _check_decode(b, s, **kwargs):
1205 # We exercise getstate() / setstate() as well as decode()
1206 state = decoder.getstate()
1207 self.assertEquals(decoder.decode(b, **kwargs), s)
1208 decoder.setstate(state)
1209 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001210
Antoine Pitrou180a3362008-12-14 16:36:46 +00001211 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001212
Antoine Pitrou180a3362008-12-14 16:36:46 +00001213 _check_decode(b'\xe8', "")
1214 _check_decode(b'\xa2', "")
1215 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001216
Antoine Pitrou180a3362008-12-14 16:36:46 +00001217 _check_decode(b'\xe8', "")
1218 _check_decode(b'\xa2', "")
1219 _check_decode(b'\x88', "\u8888")
1220
1221 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001222 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1223
Antoine Pitrou180a3362008-12-14 16:36:46 +00001224 decoder.reset()
1225 _check_decode(b'\n', "\n")
1226 _check_decode(b'\r', "")
1227 _check_decode(b'', "\n", final=True)
1228 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001229
Antoine Pitrou180a3362008-12-14 16:36:46 +00001230 _check_decode(b'\r', "")
1231 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001232
Antoine Pitrou180a3362008-12-14 16:36:46 +00001233 _check_decode(b'\r\r\n', "\n\n")
1234 _check_decode(b'\r', "")
1235 _check_decode(b'\r', "\n")
1236 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001237
Antoine Pitrou180a3362008-12-14 16:36:46 +00001238 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1239 _check_decode(b'\xe8\xa2\x88', "\u8888")
1240 _check_decode(b'\n', "\n")
1241 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1242 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001243
Antoine Pitrou180a3362008-12-14 16:36:46 +00001244 def check_newline_decoder(self, decoder, encoding):
1245 result = []
1246 encoder = codecs.getincrementalencoder(encoding)()
1247 def _decode_bytewise(s):
1248 for b in encoder.encode(s):
1249 result.append(decoder.decode(bytes([b])))
1250 self.assertEquals(decoder.newlines, None)
1251 _decode_bytewise("abc\n\r")
1252 self.assertEquals(decoder.newlines, '\n')
1253 _decode_bytewise("\nabc")
1254 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1255 _decode_bytewise("abc\r")
1256 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1257 _decode_bytewise("abc")
1258 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1259 _decode_bytewise("abc\r")
1260 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1261 decoder.reset()
1262 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1263 self.assertEquals(decoder.newlines, None)
1264
1265 def test_newline_decoder(self):
1266 encodings = (
1267 'utf-8', 'latin-1',
1268 'utf-16', 'utf-16-le', 'utf-16-be',
1269 'utf-32', 'utf-32-le', 'utf-32-be',
1270 )
1271 for enc in encodings:
1272 decoder = codecs.getincrementaldecoder(enc)()
1273 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1274 self.check_newline_decoder(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001275 decoder = codecs.getincrementaldecoder("utf-8")()
1276 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrou180a3362008-12-14 16:36:46 +00001277 self.check_newline_decoder_utf8(decoder)
1278
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001279
Guido van Rossum01a27522007-03-07 01:00:12 +00001280# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001281
Guido van Rossum5abbf752007-08-27 17:39:33 +00001282class MiscIOTest(unittest.TestCase):
1283
Barry Warsaw40e82462008-11-20 20:14:50 +00001284 def tearDown(self):
1285 support.unlink(support.TESTFN)
1286
Guido van Rossum5abbf752007-08-27 17:39:33 +00001287 def testImport__all__(self):
1288 for name in io.__all__:
1289 obj = getattr(io, name, None)
1290 self.assert_(obj is not None, name)
1291 if name == "open":
1292 continue
1293 elif "error" in name.lower():
1294 self.assert_(issubclass(obj, Exception), name)
1295 else:
1296 self.assert_(issubclass(obj, io.IOBase))
1297
Benjamin Peterson65676e42008-11-05 21:42:45 +00001298
Barry Warsaw40e82462008-11-20 20:14:50 +00001299 def test_attributes(self):
1300 f = io.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001301 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001302 f.close()
1303
1304 f = io.open(support.TESTFN, "U")
1305 self.assertEquals(f.name, support.TESTFN)
1306 self.assertEquals(f.buffer.name, support.TESTFN)
1307 self.assertEquals(f.buffer.raw.name, support.TESTFN)
1308 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001309 self.assertEquals(f.buffer.mode, "rb")
1310 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001311 f.close()
1312
1313 f = io.open(support.TESTFN, "w+")
1314 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001315 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1316 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00001317
1318 g = io.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001319 self.assertEquals(g.mode, "wb")
1320 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001321 self.assertEquals(g.name, f.fileno())
1322 self.assertEquals(g.raw.name, f.fileno())
1323 f.close()
1324 g.close()
1325
1326
Guido van Rossum28524c72007-02-27 05:47:44 +00001327def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001328 support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Christian Heimes81ee3ef2008-05-04 22:42:01 +00001329 BufferedReaderTest, BufferedWriterTest,
1330 BufferedRWPairTest, BufferedRandomTest,
1331 StatefulIncrementalDecoderTest,
1332 TextIOWrapperTest, MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001333
1334if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001335 unittest.main()