blob: 0fc017f17f4453c269a6022e78026aa2b9258f68 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Antoine Pitrou87695762008-08-14 22:44:29 +00007import threading
8import random
Guido van Rossum28524c72007-02-27 05:47:44 +00009import unittest
Antoine Pitrou87695762008-08-14 22:44:29 +000010from itertools import chain, cycle
Benjamin Petersonee8712c2008-05-20 21:35:26 +000011from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000012
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000013import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000014import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000015
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum53807da2007-04-10 19:01:47 +000017class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000018
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000019 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000022
23 def read(self, n=None):
24 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000025 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000027 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000028
Guido van Rossum01a27522007-03-07 01:00:12 +000029 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000030 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000031 return len(b)
32
33 def writable(self):
34 return True
35
Guido van Rossum68bbcd22007-02-27 17:19:33 +000036 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000043 return True
44
Guido van Rossum01a27522007-03-07 01:00:12 +000045 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000049 return 42
50
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Guido van Rossum78892e42007-04-06 17:31:18 +000052class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
Guido van Rossum01a27522007-03-07 01:00:12 +000064class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000065
Guido van Rossum53807da2007-04-10 19:01:47 +000066 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000068 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000069
Guido van Rossum01a27522007-03-07 01:00:12 +000070 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000072 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000074 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000075 else:
76 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000077
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def writable(self):
79 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080
Guido van Rossuma9e20242007-03-08 00:43:48 +000081
Guido van Rossum28524c72007-02-27 05:47:44 +000082class IOTest(unittest.TestCase):
83
Neal Norwitze7789b12008-03-24 06:18:09 +000084 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000085 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +000086
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000087 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000088 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000089
Guido van Rossum28524c72007-02-27 05:47:44 +000090 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000091 self.assertEqual(f.write(b"blah."), 5)
92 self.assertEqual(f.seek(0), 0)
93 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000094 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000095 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000096 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000097 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000098 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000099 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000100 self.assertEqual(f.seek(-1, 2), 13)
101 self.assertEqual(f.tell(), 13)
102 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000103 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000104 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000105
Guido van Rossum9b76da62007-04-11 01:09:03 +0000106 def read_ops(self, f, buffered=False):
107 data = f.read(5)
108 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000109 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000110 self.assertEqual(f.readinto(data), 5)
111 self.assertEqual(data, b" worl")
112 self.assertEqual(f.readinto(data), 2)
113 self.assertEqual(len(data), 5)
114 self.assertEqual(data[:2], b"d\n")
115 self.assertEqual(f.seek(0), 0)
116 self.assertEqual(f.read(20), b"hello world\n")
117 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000118 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000119 self.assertEqual(f.seek(-6, 2), 6)
120 self.assertEqual(f.read(5), b"world")
121 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000122 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000123 self.assertEqual(f.seek(-6, 1), 5)
124 self.assertEqual(f.read(5), b" worl")
125 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000126 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000127 if buffered:
128 f.seek(0)
129 self.assertEqual(f.read(), b"hello world\n")
130 f.seek(6)
131 self.assertEqual(f.read(), b"world\n")
132 self.assertEqual(f.read(), b"")
133
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 LARGE = 2**31
135
Guido van Rossum53807da2007-04-10 19:01:47 +0000136 def large_file_ops(self, f):
137 assert f.readable()
138 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.seek(self.LARGE), self.LARGE)
140 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000141 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000142 self.assertEqual(f.tell(), self.LARGE + 3)
143 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000144 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000145 self.assertEqual(f.tell(), self.LARGE + 2)
146 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000147 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000148 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000149 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
150 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000151 self.assertEqual(f.read(2), b"x")
152
Guido van Rossum28524c72007-02-27 05:47:44 +0000153 def test_raw_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 f = io.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000155 self.assertEqual(f.readable(), False)
156 self.assertEqual(f.writable(), True)
157 self.assertEqual(f.seekable(), True)
158 self.write_ops(f)
159 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000160 f = io.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000161 self.assertEqual(f.readable(), True)
162 self.assertEqual(f.writable(), False)
163 self.assertEqual(f.seekable(), True)
164 self.read_ops(f)
165 f.close()
166
Guido van Rossum87429772007-04-10 21:06:59 +0000167 def test_buffered_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000168 f = io.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000169 self.assertEqual(f.readable(), False)
170 self.assertEqual(f.writable(), True)
171 self.assertEqual(f.seekable(), True)
172 self.write_ops(f)
173 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000174 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000175 self.assertEqual(f.readable(), True)
176 self.assertEqual(f.writable(), False)
177 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000178 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000179 f.close()
180
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000181 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000182 f = io.open(support.TESTFN, "wb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000183 f.write(b"abc\ndef\nxyzzy\nfoo")
184 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000185 f = io.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000186 self.assertEqual(f.readline(), b"abc\n")
187 self.assertEqual(f.readline(10), b"def\n")
188 self.assertEqual(f.readline(2), b"xy")
189 self.assertEqual(f.readline(4), b"zzy\n")
190 self.assertEqual(f.readline(), b"foo")
191 f.close()
192
Guido van Rossum28524c72007-02-27 05:47:44 +0000193 def test_raw_bytes_io(self):
194 f = io.BytesIO()
195 self.write_ops(f)
196 data = f.getvalue()
197 self.assertEqual(data, b"hello world\n")
198 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000199 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000200
Guido van Rossum53807da2007-04-10 19:01:47 +0000201 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000202 # On Windows and Mac OSX this test comsumes large resources; It takes
203 # a long time to build the >2GB file and takes >2GB of disk space
204 # therefore the resource must be enabled to run this test.
205 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000206 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000207 print("\nTesting large file ops skipped on %s." % sys.platform,
208 file=sys.stderr)
209 print("It requires %d bytes and a long time." % self.LARGE,
210 file=sys.stderr)
211 print("Use 'regrtest.py -u largefile test_io' to run it.",
212 file=sys.stderr)
213 return
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000214 f = io.open(support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000215 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000216 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000217 f = io.open(support.TESTFN, "w+b")
Guido van Rossum87429772007-04-10 21:06:59 +0000218 self.large_file_ops(f)
219 f.close()
220
221 def test_with_open(self):
222 for bufsize in (0, 1, 100):
223 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000225 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000226 self.assertEqual(f.closed, True)
227 f = None
228 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000230 1/0
231 except ZeroDivisionError:
232 self.assertEqual(f.closed, True)
233 else:
234 self.fail("1/0 didn't raise an exception")
235
236 def test_destructor(self):
237 record = []
238 class MyFileIO(io.FileIO):
239 def __del__(self):
240 record.append(1)
241 io.FileIO.__del__(self)
242 def close(self):
243 record.append(2)
244 io.FileIO.close(self)
245 def flush(self):
246 record.append(3)
247 io.FileIO.flush(self)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000248 f = MyFileIO(support.TESTFN, "w")
Guido van Rossum87429772007-04-10 21:06:59 +0000249 f.write("xxx")
250 del f
251 self.assertEqual(record, [1, 2, 3])
252
253 def test_close_flushes(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000254 f = io.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000255 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000256 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000257 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000258 self.assertEqual(f.read(), b"xxx")
259 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000260
Guido van Rossumd4103952007-04-12 05:44:49 +0000261 def test_array_writes(self):
262 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000263 n = len(memoryview(a))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 f = io.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000265 self.assertEqual(f.write(a), n)
266 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 f = io.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000268 self.assertEqual(f.write(a), n)
269 f.close()
270
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000271 def test_closefd(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 self.assertRaises(ValueError, io.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000273 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Christian Heimesecc42a22008-11-05 19:30:32 +0000275 def testReadClosed(self):
276 with io.open(support.TESTFN, "w") as f:
277 f.write("egg\n")
278 with io.open(support.TESTFN, "r") as f:
279 file = io.open(f.fileno(), "r", closefd=False)
280 self.assertEqual(file.read(), "egg\n")
281 file.seek(0)
282 file.close()
283 self.assertRaises(ValueError, file.read)
284
285 def test_no_closefd_with_filename(self):
286 # can't use closefd in combination with a file name
287 self.assertRaises(ValueError, io.open, support.TESTFN, "r", closefd=False)
288
289 def test_closefd_attr(self):
290 with io.open(support.TESTFN, "wb") as f:
291 f.write(b"egg\n")
292 with io.open(support.TESTFN, "r") as f:
293 self.assertEqual(f.buffer.raw.closefd, True)
294 file = io.open(f.fileno(), "r", closefd=False)
295 self.assertEqual(file.buffer.raw.closefd, False)
296
297
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000298class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000299
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000300 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000301 buf = self.buftype("1234567890")
302 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000303
304 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000305 buf = self.buftype("1234567890")
306 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000307
308 self.assertEquals(buf[:1], bytesIo.read(1))
309 self.assertEquals(buf[1:5], bytesIo.read(4))
310 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000311 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000312
313 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000314 buf = self.buftype("1234567890")
315 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000316
317 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000318 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000319
320 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000321 buf = self.buftype("1234567890")
322 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000323
324 bytesIo.read(5)
325 bytesIo.seek(0)
326 self.assertEquals(buf, bytesIo.read())
327
328 bytesIo.seek(3)
329 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000330 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000331
332 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000333 buf = self.buftype("1234567890")
334 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000335
336 self.assertEquals(0, bytesIo.tell())
337 bytesIo.seek(5)
338 self.assertEquals(5, bytesIo.tell())
339 bytesIo.seek(10000)
340 self.assertEquals(10000, bytesIo.tell())
341
Guido van Rossuma9e20242007-03-08 00:43:48 +0000342
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000343class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000344 @staticmethod
345 def buftype(s):
346 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000347 ioclass = io.BytesIO
348 EOF = b""
349
350
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000352 buftype = str
353 ioclass = io.StringIO
354 EOF = ""
355
356
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000357class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000358
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000359 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000360 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000362
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000363 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000364
Guido van Rossum78892e42007-04-06 17:31:18 +0000365 def testBuffering(self):
366 data = b"abcdefghi"
367 dlen = len(data)
368
369 tests = [
370 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
371 [ 100, [ 3, 3, 3], [ dlen ] ],
372 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
373 ]
374
375 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000376 rawio = MockFileIO(data)
377 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000378 pos = 0
379 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000381 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000382 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000383
Guido van Rossum01a27522007-03-07 01:00:12 +0000384 def testReadNonBlocking(self):
385 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000386 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000387 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000388
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000389 self.assertEquals(b"abcd", bufio.read(6))
390 self.assertEquals(b"e", bufio.read(1))
391 self.assertEquals(b"fg", bufio.read())
392 self.assert_(None is bufio.read())
393 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000394
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000395 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000396 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000397 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000398
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
401 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000406
407 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000408 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000410
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000411 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000412
413 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000414 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000415 # this test. Else, write it.
416 pass
417
Antoine Pitrou87695762008-08-14 22:44:29 +0000418 def testThreads(self):
419 try:
420 # Write out many bytes with exactly the same number of 0's,
421 # 1's... 255's. This will help us check that concurrent reading
422 # doesn't duplicate or forget contents.
423 N = 1000
424 l = list(range(256)) * N
425 random.shuffle(l)
426 s = bytes(bytearray(l))
427 with io.open(support.TESTFN, "wb") as f:
428 f.write(s)
429 with io.open(support.TESTFN, "rb", buffering=0) as raw:
430 bufio = io.BufferedReader(raw, 8)
431 errors = []
432 results = []
433 def f():
434 try:
435 # Intra-buffer read then buffer-flushing read
436 for n in cycle([1, 19]):
437 s = bufio.read(n)
438 if not s:
439 break
440 # list.append() is atomic
441 results.append(s)
442 except Exception as e:
443 errors.append(e)
444 raise
445 threads = [threading.Thread(target=f) for x in range(20)]
446 for t in threads:
447 t.start()
448 time.sleep(0.02) # yield
449 for t in threads:
450 t.join()
451 self.assertFalse(errors,
452 "the following exceptions were caught: %r" % errors)
453 s = b''.join(results)
454 for i in range(256):
455 c = bytes(bytearray([i]))
456 self.assertEqual(s.count(c), N)
457 finally:
458 support.unlink(support.TESTFN)
459
460
Guido van Rossuma9e20242007-03-08 00:43:48 +0000461
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000462class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000463
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000464 def testWrite(self):
465 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000466 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000467 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000468
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000469 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000470
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000471 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000472
473 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000474 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000475 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000476
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000477 bufio.write(b"abc")
478 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000479
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000480 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000481
Guido van Rossum01a27522007-03-07 01:00:12 +0000482 def testWriteNonBlocking(self):
483 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000484 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000485
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000486 bufio.write(b"asdf")
487 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000488 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
489
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000490 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000491 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000492 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000493 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
494 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
495
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000496 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000497
Guido van Rossum78892e42007-04-06 17:31:18 +0000498 # XXX I don't like this test. It relies too heavily on how the
499 # algorithm actually works, which we might change. Refactor
500 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000501
502 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000503 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000504 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000505
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000506 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000507
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000508 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000509 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000510 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000511
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000512 bufio.write(b"abc")
513 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000514
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000515 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000516
Antoine Pitrou87695762008-08-14 22:44:29 +0000517 def testThreads(self):
518 # BufferedWriter should not raise exceptions or crash
519 # when called from multiple threads.
520 try:
521 # We use a real file object because it allows us to
522 # exercise situations where the GIL is released before
523 # writing the buffer to the raw streams. This is in addition
524 # to concurrency issues due to switching threads in the middle
525 # of Python code.
526 with io.open(support.TESTFN, "wb", buffering=0) as raw:
527 bufio = io.BufferedWriter(raw, 8)
528 errors = []
529 def f():
530 try:
531 # Write enough bytes to flush the buffer
532 s = b"a" * 19
533 for i in range(50):
534 bufio.write(s)
535 except Exception as e:
536 errors.append(e)
537 raise
538 threads = [threading.Thread(target=f) for x in range(20)]
539 for t in threads:
540 t.start()
541 time.sleep(0.02) # yield
542 for t in threads:
543 t.join()
544 self.assertFalse(errors,
545 "the following exceptions were caught: %r" % errors)
546 finally:
547 support.unlink(support.TESTFN)
548
Guido van Rossuma9e20242007-03-08 00:43:48 +0000549
Guido van Rossum01a27522007-03-07 01:00:12 +0000550class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000551
Guido van Rossum01a27522007-03-07 01:00:12 +0000552 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000553 r = MockRawIO(())
554 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000555 pair = io.BufferedRWPair(r, w)
Benjamin Peterson92035012008-12-27 16:00:54 +0000556 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +0000557
Benjamin Peterson92035012008-12-27 16:00:54 +0000558 # XXX More Tests
Guido van Rossum01a27522007-03-07 01:00:12 +0000559
Guido van Rossuma9e20242007-03-08 00:43:48 +0000560
Guido van Rossum78892e42007-04-06 17:31:18 +0000561class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000562
Guido van Rossum01a27522007-03-07 01:00:12 +0000563 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000564 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000565 rw = io.BufferedRandom(raw, 8, 12)
566
567 self.assertEqual(b"as", rw.read(2))
568 rw.write(b"ddd")
569 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000570 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000571 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000572 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000573
574 def testSeekAndTell(self):
575 raw = io.BytesIO(b"asdfghjkl")
576 rw = io.BufferedRandom(raw)
577
578 self.assertEquals(b"as", rw.read(2))
579 self.assertEquals(2, rw.tell())
580 rw.seek(0, 0)
581 self.assertEquals(b"asdf", rw.read(4))
582
583 rw.write(b"asdf")
584 rw.seek(0, 0)
585 self.assertEquals(b"asdfasdfl", rw.read())
586 self.assertEquals(9, rw.tell())
587 rw.seek(-4, 2)
588 self.assertEquals(5, rw.tell())
589 rw.seek(2, 1)
590 self.assertEquals(7, rw.tell())
591 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000592 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000593
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000594# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
595# properties:
596# - A single output character can correspond to many bytes of input.
597# - The number of input bytes to complete the character can be
598# undetermined until the last input byte is received.
599# - The number of input bytes can vary depending on previous input.
600# - A single input byte can correspond to many characters of output.
601# - The number of output characters can be undetermined until the
602# last input byte is received.
603# - The number of output characters can vary depending on previous input.
604
605class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
606 """
607 For testing seek/tell behavior with a stateful, buffering decoder.
608
609 Input is a sequence of words. Words may be fixed-length (length set
610 by input) or variable-length (period-terminated). In variable-length
611 mode, extra periods are ignored. Possible words are:
612 - 'i' followed by a number sets the input length, I (maximum 99).
613 When I is set to 0, words are space-terminated.
614 - 'o' followed by a number sets the output length, O (maximum 99).
615 - Any other word is converted into a word followed by a period on
616 the output. The output word consists of the input word truncated
617 or padded out with hyphens to make its length equal to O. If O
618 is 0, the word is output verbatim without truncating or padding.
619 I and O are initially set to 1. When I changes, any buffered input is
620 re-scanned according to the new I. EOF also terminates the last word.
621 """
622
623 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +0000624 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000625 self.reset()
626
627 def __repr__(self):
628 return '<SID %x>' % id(self)
629
630 def reset(self):
631 self.i = 1
632 self.o = 1
633 self.buffer = bytearray()
634
635 def getstate(self):
636 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
637 return bytes(self.buffer), i*100 + o
638
639 def setstate(self, state):
640 buffer, io = state
641 self.buffer = bytearray(buffer)
642 i, o = divmod(io, 100)
643 self.i, self.o = i ^ 1, o ^ 1
644
645 def decode(self, input, final=False):
646 output = ''
647 for b in input:
648 if self.i == 0: # variable-length, terminated with period
649 if b == ord('.'):
650 if self.buffer:
651 output += self.process_word()
652 else:
653 self.buffer.append(b)
654 else: # fixed-length, terminate after self.i bytes
655 self.buffer.append(b)
656 if len(self.buffer) == self.i:
657 output += self.process_word()
658 if final and self.buffer: # EOF terminates the last word
659 output += self.process_word()
660 return output
661
662 def process_word(self):
663 output = ''
664 if self.buffer[0] == ord('i'):
665 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
666 elif self.buffer[0] == ord('o'):
667 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
668 else:
669 output = self.buffer.decode('ascii')
670 if len(output) < self.o:
671 output += '-'*self.o # pad out with hyphens
672 if self.o:
673 output = output[:self.o] # truncate to output length
674 output += '.'
675 self.buffer = bytearray()
676 return output
677
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000678 codecEnabled = False
679
680 @classmethod
681 def lookupTestDecoder(cls, name):
682 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +0000683 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000684 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +0000685 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000686 incrementalencoder=None,
687 streamreader=None, streamwriter=None,
688 incrementaldecoder=cls)
689
690# Register the previous decoder for testing.
691# Disabled by default, tests will enable it.
692codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
693
694
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000695class StatefulIncrementalDecoderTest(unittest.TestCase):
696 """
697 Make sure the StatefulIncrementalDecoder actually works.
698 """
699
700 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000701 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000702 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000703 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000704 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000705 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000706 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000707 # I=0, O=6 (variable-length input, fixed-length output)
708 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
709 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000710 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000711 # I=6, O=3 (fixed-length input > fixed-length output)
712 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
713 # I=0, then 3; O=29, then 15 (with longer output)
714 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
715 'a----------------------------.' +
716 'b----------------------------.' +
717 'cde--------------------------.' +
718 'abcdefghijabcde.' +
719 'a.b------------.' +
720 '.c.------------.' +
721 'd.e------------.' +
722 'k--------------.' +
723 'l--------------.' +
724 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000725 ]
726
727 def testDecoder(self):
728 # Try a few one-shot test cases.
729 for input, eof, output in self.test_cases:
730 d = StatefulIncrementalDecoder()
731 self.assertEquals(d.decode(input, eof), output)
732
733 # Also test an unfinished decode, followed by forcing EOF.
734 d = StatefulIncrementalDecoder()
735 self.assertEquals(d.decode(b'oiabcd'), '')
736 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000737
738class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000739
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000740 def setUp(self):
741 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
742 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000743 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000744
Guido van Rossumd0712812007-04-11 16:32:43 +0000745 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000746 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000747
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000748 def testLineBuffering(self):
749 r = io.BytesIO()
750 b = io.BufferedWriter(r, 1000)
751 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
752 t.write("X")
753 self.assertEquals(r.getvalue(), b"") # No flush happened
754 t.write("Y\nZ")
755 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
756 t.write("A\rB")
757 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
758
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000759 def testEncodingErrorsReading(self):
760 # (1) default
761 b = io.BytesIO(b"abc\n\xff\n")
762 t = io.TextIOWrapper(b, encoding="ascii")
763 self.assertRaises(UnicodeError, t.read)
764 # (2) explicit strict
765 b = io.BytesIO(b"abc\n\xff\n")
766 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
767 self.assertRaises(UnicodeError, t.read)
768 # (3) ignore
769 b = io.BytesIO(b"abc\n\xff\n")
770 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
771 self.assertEquals(t.read(), "abc\n\n")
772 # (4) replace
773 b = io.BytesIO(b"abc\n\xff\n")
774 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
775 self.assertEquals(t.read(), "abc\n\ufffd\n")
776
777 def testEncodingErrorsWriting(self):
778 # (1) default
779 b = io.BytesIO()
780 t = io.TextIOWrapper(b, encoding="ascii")
781 self.assertRaises(UnicodeError, t.write, "\xff")
782 # (2) explicit strict
783 b = io.BytesIO()
784 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
785 self.assertRaises(UnicodeError, t.write, "\xff")
786 # (3) ignore
787 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000788 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
789 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000790 t.write("abc\xffdef\n")
791 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000792 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000793 # (4) replace
794 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000795 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
796 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000797 t.write("abc\xffdef\n")
798 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000799 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000800
Guido van Rossum8358db22007-08-18 21:39:55 +0000801 def testNewlinesInput(self):
802 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
803 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
804 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000805 (None, normalized.decode("ascii").splitlines(True)),
806 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000807 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
808 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
809 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
810 ]:
811 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000812 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000813 self.assertEquals(txt.readlines(), expected)
814 txt.seek(0)
815 self.assertEquals(txt.read(), "".join(expected))
816
817 def testNewlinesOutput(self):
818 testdict = {
819 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
820 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
821 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
822 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
823 }
824 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
825 for newline, expected in tests:
826 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000827 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000828 txt.write("AAA\nB")
829 txt.write("BB\nCCC\n")
830 txt.write("X\rY\r\nZ")
831 txt.flush()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000832 self.assertEquals(buf.closed, False)
Guido van Rossum8358db22007-08-18 21:39:55 +0000833 self.assertEquals(buf.getvalue(), expected)
834
Guido van Rossum78892e42007-04-06 17:31:18 +0000835 def testNewlines(self):
836 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
837
838 tests = [
839 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000840 [ '', input_lines ],
841 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
842 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
843 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000844 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +0000845 encodings = (
846 'utf-8', 'latin-1',
847 'utf-16', 'utf-16-le', 'utf-16-be',
848 'utf-32', 'utf-32-le', 'utf-32-be',
849 )
Guido van Rossum78892e42007-04-06 17:31:18 +0000850
Guido van Rossum8358db22007-08-18 21:39:55 +0000851 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000852 # character in TextIOWrapper._pending_line.
853 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000854 # XXX: str.encode() should return bytes
855 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000856 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000857 for bufsize in range(1, 10):
858 for newline, exp_lines in tests:
859 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000860 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000861 encoding=encoding)
862 if do_reads:
863 got_lines = []
864 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000865 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000866 if c2 == '':
867 break
868 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000869 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000870 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000871 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000872
873 for got_line, exp_line in zip(got_lines, exp_lines):
874 self.assertEquals(got_line, exp_line)
875 self.assertEquals(len(got_lines), len(exp_lines))
876
Guido van Rossum8358db22007-08-18 21:39:55 +0000877 def testNewlinesInput(self):
878 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
879 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
880 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000881 (None, normalized.decode("ascii").splitlines(True)),
882 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000883 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
884 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
885 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
886 ]:
887 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000888 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000889 self.assertEquals(txt.readlines(), expected)
890 txt.seek(0)
891 self.assertEquals(txt.read(), "".join(expected))
892
893 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000894 data = "AAA\nBBB\rCCC\n"
895 data_lf = b"AAA\nBBB\rCCC\n"
896 data_cr = b"AAA\rBBB\rCCC\r"
897 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000898 save_linesep = os.linesep
899 try:
900 for os.linesep, newline, expected in [
901 ("\n", None, data_lf),
902 ("\r\n", None, data_crlf),
903 ("\n", "", data_lf),
904 ("\r\n", "", data_lf),
905 ("\n", "\n", data_lf),
906 ("\r\n", "\n", data_lf),
907 ("\n", "\r", data_cr),
908 ("\r\n", "\r", data_cr),
909 ("\n", "\r\n", data_crlf),
910 ("\r\n", "\r\n", data_crlf),
911 ]:
912 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000913 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000914 txt.write(data)
915 txt.close()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000916 self.assertEquals(buf.closed, True)
917 self.assertRaises(ValueError, buf.getvalue)
Guido van Rossum18363582007-08-18 22:05:58 +0000918 finally:
919 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000920
Guido van Rossum9b76da62007-04-11 01:09:03 +0000921 # Systematic tests of the text I/O API
922
923 def testBasicIO(self):
924 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
925 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000926 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000927 f._CHUNK_SIZE = chunksize
928 self.assertEquals(f.write("abc"), 3)
929 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000930 f = io.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000931 f._CHUNK_SIZE = chunksize
932 self.assertEquals(f.tell(), 0)
933 self.assertEquals(f.read(), "abc")
934 cookie = f.tell()
935 self.assertEquals(f.seek(0), 0)
936 self.assertEquals(f.read(2), "ab")
937 self.assertEquals(f.read(1), "c")
938 self.assertEquals(f.read(1), "")
939 self.assertEquals(f.read(), "")
940 self.assertEquals(f.tell(), cookie)
941 self.assertEquals(f.seek(0), 0)
942 self.assertEquals(f.seek(0, 2), cookie)
943 self.assertEquals(f.write("def"), 3)
944 self.assertEquals(f.seek(cookie), cookie)
945 self.assertEquals(f.read(), "def")
946 if enc.startswith("utf"):
947 self.multi_line_test(f, enc)
948 f.close()
949
950 def multi_line_test(self, f, enc):
951 f.seek(0)
952 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000953 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000954 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000955 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000956 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000957 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000958 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000959 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000960 wlines.append((f.tell(), line))
961 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000962 f.seek(0)
963 rlines = []
964 while True:
965 pos = f.tell()
966 line = f.readline()
967 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000968 break
969 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000970 self.assertEquals(rlines, wlines)
971
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000972 def testTelling(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000973 f = io.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000974 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000975 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000976 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000977 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000978 p2 = f.tell()
979 f.seek(0)
980 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000981 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000982 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000983 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000984 self.assertEquals(f.tell(), p2)
985 f.seek(0)
986 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000987 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000988 self.assertRaises(IOError, f.tell)
989 self.assertEquals(f.tell(), p2)
990 f.close()
991
Guido van Rossumd76e7792007-04-17 02:38:04 +0000992 def testSeeking(self):
993 chunk_size = io.TextIOWrapper._CHUNK_SIZE
994 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000995 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000996 prefix = bytes(u_prefix.encode("utf-8"))
997 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000998 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000999 suffix = bytes(u_suffix.encode("utf-8"))
1000 line = prefix + suffix
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001001 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001002 f.write(line*2)
1003 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001004 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001005 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001006 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001007 self.assertEquals(f.tell(), prefix_size)
1008 self.assertEquals(f.readline(), u_suffix)
1009
1010 def testSeekingToo(self):
1011 # Regression test for a specific bug
1012 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001013 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001014 f.write(data)
1015 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001016 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001017 f._CHUNK_SIZE # Just test that it exists
1018 f._CHUNK_SIZE = 2
1019 f.readline()
1020 f.tell()
1021
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001022 def testSeekAndTell(self):
1023 """Test seek/tell using the StatefulIncrementalDecoder."""
1024
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001025 def testSeekAndTellWithData(data, min_pos=0):
1026 """Tell/seek to various points within a data stream and ensure
1027 that the decoded data returned by read() is consistent."""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001028 f = io.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001029 f.write(data)
1030 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001031 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001032 decoded = f.read()
1033 f.close()
1034
Neal Norwitze2b07052008-03-18 19:52:05 +00001035 for i in range(min_pos, len(decoded) + 1): # seek positions
1036 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001037 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001038 self.assertEquals(f.read(i), decoded[:i])
1039 cookie = f.tell()
1040 self.assertEquals(f.read(j), decoded[i:i + j])
1041 f.seek(cookie)
1042 self.assertEquals(f.read(), decoded[i:])
1043 f.close()
1044
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001045 # Enable the test decoder.
1046 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001047
1048 # Run the tests.
1049 try:
1050 # Try each test case.
1051 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1052 testSeekAndTellWithData(input)
1053
1054 # Position each test case so that it crosses a chunk boundary.
1055 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1056 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1057 offset = CHUNK_SIZE - len(input)//2
1058 prefix = b'.'*offset
1059 # Don't bother seeking into the prefix (takes too long).
1060 min_pos = offset*2
1061 testSeekAndTellWithData(prefix + input, min_pos)
1062
1063 # Ensure our test decoder won't interfere with subsequent tests.
1064 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001065 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001066
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001067 def testEncodedWrites(self):
1068 data = "1234567890"
1069 tests = ("utf-16",
1070 "utf-16-le",
1071 "utf-16-be",
1072 "utf-32",
1073 "utf-32-le",
1074 "utf-32-be")
1075 for encoding in tests:
1076 buf = io.BytesIO()
1077 f = io.TextIOWrapper(buf, encoding=encoding)
1078 # Check if the BOM is written only once (see issue1753).
1079 f.write(data)
1080 f.write(data)
1081 f.seek(0)
1082 self.assertEquals(f.read(), data * 2)
1083 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1084
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001085 def timingTest(self):
1086 timer = time.time
1087 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001088 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001089 nlines = 10000
1090 nchars = len(line)
1091 nbytes = len(line.encode(enc))
1092 for chunk_size in (32, 64, 128, 256):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001093 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001094 f._CHUNK_SIZE = chunk_size
1095 t0 = timer()
1096 for i in range(nlines):
1097 f.write(line)
1098 f.flush()
1099 t1 = timer()
1100 f.seek(0)
1101 for line in f:
1102 pass
1103 t2 = timer()
1104 f.seek(0)
1105 while f.readline():
1106 pass
1107 t3 = timer()
1108 f.seek(0)
1109 while f.readline():
1110 f.tell()
1111 t4 = timer()
1112 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001113 if support.verbose:
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001114 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1115 (nlines, nchars, nbytes))
1116 print("File chunk size: %6s" % f._CHUNK_SIZE)
1117 print("Writing: %6.3f seconds" % (t1-t0))
1118 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1119 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1120 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1121
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001122 def testReadOneByOne(self):
1123 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1124 reads = ""
1125 while True:
1126 c = txt.read(1)
1127 if not c:
1128 break
1129 reads += c
1130 self.assertEquals(reads, "AA\nBB")
1131
1132 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1133 def testReadByChunk(self):
1134 # make sure "\r\n" straddles 128 char boundary.
1135 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1136 reads = ""
1137 while True:
1138 c = txt.read(128)
1139 if not c:
1140 break
1141 reads += c
1142 self.assertEquals(reads, "A"*127+"\nB")
1143
1144 def test_issue1395_1(self):
1145 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1146
1147 # read one char at a time
1148 reads = ""
1149 while True:
1150 c = txt.read(1)
1151 if not c:
1152 break
1153 reads += c
1154 self.assertEquals(reads, self.normalized)
1155
1156 def test_issue1395_2(self):
1157 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1158 txt._CHUNK_SIZE = 4
1159
1160 reads = ""
1161 while True:
1162 c = txt.read(4)
1163 if not c:
1164 break
1165 reads += c
1166 self.assertEquals(reads, self.normalized)
1167
1168 def test_issue1395_3(self):
1169 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1170 txt._CHUNK_SIZE = 4
1171
1172 reads = txt.read(4)
1173 reads += txt.read(4)
1174 reads += txt.readline()
1175 reads += txt.readline()
1176 reads += txt.readline()
1177 self.assertEquals(reads, self.normalized)
1178
1179 def test_issue1395_4(self):
1180 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1181 txt._CHUNK_SIZE = 4
1182
1183 reads = txt.read(4)
1184 reads += txt.read()
1185 self.assertEquals(reads, self.normalized)
1186
1187 def test_issue1395_5(self):
1188 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1189 txt._CHUNK_SIZE = 4
1190
1191 reads = txt.read(4)
1192 pos = txt.tell()
1193 txt.seek(0)
1194 txt.seek(pos)
1195 self.assertEquals(txt.read(4), "BBB\n")
1196
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001197 def test_issue2282(self):
1198 buffer = io.BytesIO(self.testdata)
1199 txt = io.TextIOWrapper(buffer, encoding="ascii")
1200
1201 self.assertEqual(buffer.seekable(), txt.seekable())
1202
Antoine Pitrou180a3362008-12-14 16:36:46 +00001203 def check_newline_decoder_utf8(self, decoder):
1204 # UTF-8 specific tests for a newline decoder
1205 def _check_decode(b, s, **kwargs):
1206 # We exercise getstate() / setstate() as well as decode()
1207 state = decoder.getstate()
1208 self.assertEquals(decoder.decode(b, **kwargs), s)
1209 decoder.setstate(state)
1210 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001211
Antoine Pitrou180a3362008-12-14 16:36:46 +00001212 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001213
Antoine Pitrou180a3362008-12-14 16:36:46 +00001214 _check_decode(b'\xe8', "")
1215 _check_decode(b'\xa2', "")
1216 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001217
Antoine Pitrou180a3362008-12-14 16:36:46 +00001218 _check_decode(b'\xe8', "")
1219 _check_decode(b'\xa2', "")
1220 _check_decode(b'\x88', "\u8888")
1221
1222 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001223 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1224
Antoine Pitrou180a3362008-12-14 16:36:46 +00001225 decoder.reset()
1226 _check_decode(b'\n', "\n")
1227 _check_decode(b'\r', "")
1228 _check_decode(b'', "\n", final=True)
1229 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001230
Antoine Pitrou180a3362008-12-14 16:36:46 +00001231 _check_decode(b'\r', "")
1232 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001233
Antoine Pitrou180a3362008-12-14 16:36:46 +00001234 _check_decode(b'\r\r\n', "\n\n")
1235 _check_decode(b'\r', "")
1236 _check_decode(b'\r', "\n")
1237 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001238
Antoine Pitrou180a3362008-12-14 16:36:46 +00001239 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1240 _check_decode(b'\xe8\xa2\x88', "\u8888")
1241 _check_decode(b'\n', "\n")
1242 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1243 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001244
Antoine Pitrou180a3362008-12-14 16:36:46 +00001245 def check_newline_decoder(self, decoder, encoding):
1246 result = []
1247 encoder = codecs.getincrementalencoder(encoding)()
1248 def _decode_bytewise(s):
1249 for b in encoder.encode(s):
1250 result.append(decoder.decode(bytes([b])))
1251 self.assertEquals(decoder.newlines, None)
1252 _decode_bytewise("abc\n\r")
1253 self.assertEquals(decoder.newlines, '\n')
1254 _decode_bytewise("\nabc")
1255 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1256 _decode_bytewise("abc\r")
1257 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1258 _decode_bytewise("abc")
1259 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1260 _decode_bytewise("abc\r")
1261 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1262 decoder.reset()
1263 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1264 self.assertEquals(decoder.newlines, None)
1265
1266 def test_newline_decoder(self):
1267 encodings = (
1268 'utf-8', 'latin-1',
1269 'utf-16', 'utf-16-le', 'utf-16-be',
1270 'utf-32', 'utf-32-le', 'utf-32-be',
1271 )
1272 for enc in encodings:
1273 decoder = codecs.getincrementaldecoder(enc)()
1274 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1275 self.check_newline_decoder(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001276 decoder = codecs.getincrementaldecoder("utf-8")()
1277 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrou180a3362008-12-14 16:36:46 +00001278 self.check_newline_decoder_utf8(decoder)
1279
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001280
Guido van Rossum01a27522007-03-07 01:00:12 +00001281# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001282
Guido van Rossum5abbf752007-08-27 17:39:33 +00001283class MiscIOTest(unittest.TestCase):
1284
Barry Warsaw40e82462008-11-20 20:14:50 +00001285 def tearDown(self):
1286 support.unlink(support.TESTFN)
1287
Guido van Rossum5abbf752007-08-27 17:39:33 +00001288 def testImport__all__(self):
1289 for name in io.__all__:
1290 obj = getattr(io, name, None)
1291 self.assert_(obj is not None, name)
1292 if name == "open":
1293 continue
1294 elif "error" in name.lower():
1295 self.assert_(issubclass(obj, Exception), name)
1296 else:
1297 self.assert_(issubclass(obj, io.IOBase))
1298
Benjamin Peterson65676e42008-11-05 21:42:45 +00001299
Barry Warsaw40e82462008-11-20 20:14:50 +00001300 def test_attributes(self):
1301 f = io.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001302 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001303 f.close()
1304
1305 f = io.open(support.TESTFN, "U")
1306 self.assertEquals(f.name, support.TESTFN)
1307 self.assertEquals(f.buffer.name, support.TESTFN)
1308 self.assertEquals(f.buffer.raw.name, support.TESTFN)
1309 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001310 self.assertEquals(f.buffer.mode, "rb")
1311 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001312 f.close()
1313
1314 f = io.open(support.TESTFN, "w+")
1315 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001316 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1317 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00001318
1319 g = io.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001320 self.assertEquals(g.mode, "wb")
1321 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001322 self.assertEquals(g.name, f.fileno())
1323 self.assertEquals(g.raw.name, f.fileno())
1324 f.close()
1325 g.close()
1326
1327
Guido van Rossum28524c72007-02-27 05:47:44 +00001328def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001329 support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Christian Heimes81ee3ef2008-05-04 22:42:01 +00001330 BufferedReaderTest, BufferedWriterTest,
1331 BufferedRWPairTest, BufferedRandomTest,
1332 StatefulIncrementalDecoderTest,
1333 TextIOWrapperTest, MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001334
1335if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001336 unittest.main()