blob: 489e5605f65200bc3079ca11af5fc378c4cd7190 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Antoine Pitrou87695762008-08-14 22:44:29 +00007import threading
8import random
Guido van Rossum28524c72007-02-27 05:47:44 +00009import unittest
Antoine Pitrou87695762008-08-14 22:44:29 +000010from itertools import chain, cycle
Benjamin Petersonee8712c2008-05-20 21:35:26 +000011from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000012
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000013import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000014import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000015
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum53807da2007-04-10 19:01:47 +000017class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000018
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000019 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000022
23 def read(self, n=None):
24 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000025 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000027 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000028
Guido van Rossum01a27522007-03-07 01:00:12 +000029 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000030 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000031 return len(b)
32
33 def writable(self):
34 return True
35
Guido van Rossum68bbcd22007-02-27 17:19:33 +000036 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000043 return True
44
Guido van Rossum01a27522007-03-07 01:00:12 +000045 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000049 return 42
50
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Guido van Rossum78892e42007-04-06 17:31:18 +000052class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
Guido van Rossum01a27522007-03-07 01:00:12 +000064class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000065
Guido van Rossum53807da2007-04-10 19:01:47 +000066 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000068 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000069
Guido van Rossum01a27522007-03-07 01:00:12 +000070 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000072 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000074 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000075 else:
76 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000077
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def writable(self):
79 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080
Guido van Rossuma9e20242007-03-08 00:43:48 +000081
Guido van Rossum28524c72007-02-27 05:47:44 +000082class IOTest(unittest.TestCase):
83
Neal Norwitze7789b12008-03-24 06:18:09 +000084 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000085 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +000086
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000087 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000088 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000089
Guido van Rossum28524c72007-02-27 05:47:44 +000090 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000091 self.assertEqual(f.write(b"blah."), 5)
92 self.assertEqual(f.seek(0), 0)
93 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000094 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000095 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000096 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000097 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000098 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000099 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000100 self.assertEqual(f.seek(-1, 2), 13)
101 self.assertEqual(f.tell(), 13)
102 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000103 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000104 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000105
Guido van Rossum9b76da62007-04-11 01:09:03 +0000106 def read_ops(self, f, buffered=False):
107 data = f.read(5)
108 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000109 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000110 self.assertEqual(f.readinto(data), 5)
111 self.assertEqual(data, b" worl")
112 self.assertEqual(f.readinto(data), 2)
113 self.assertEqual(len(data), 5)
114 self.assertEqual(data[:2], b"d\n")
115 self.assertEqual(f.seek(0), 0)
116 self.assertEqual(f.read(20), b"hello world\n")
117 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000118 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000119 self.assertEqual(f.seek(-6, 2), 6)
120 self.assertEqual(f.read(5), b"world")
121 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000122 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000123 self.assertEqual(f.seek(-6, 1), 5)
124 self.assertEqual(f.read(5), b" worl")
125 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000126 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000127 if buffered:
128 f.seek(0)
129 self.assertEqual(f.read(), b"hello world\n")
130 f.seek(6)
131 self.assertEqual(f.read(), b"world\n")
132 self.assertEqual(f.read(), b"")
133
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 LARGE = 2**31
135
Guido van Rossum53807da2007-04-10 19:01:47 +0000136 def large_file_ops(self, f):
137 assert f.readable()
138 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.seek(self.LARGE), self.LARGE)
140 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000141 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000142 self.assertEqual(f.tell(), self.LARGE + 3)
143 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000144 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000145 self.assertEqual(f.tell(), self.LARGE + 2)
146 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000147 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000148 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000149 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
150 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000151 self.assertEqual(f.read(2), b"x")
152
Guido van Rossum28524c72007-02-27 05:47:44 +0000153 def test_raw_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 f = io.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000155 self.assertEqual(f.readable(), False)
156 self.assertEqual(f.writable(), True)
157 self.assertEqual(f.seekable(), True)
158 self.write_ops(f)
159 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000160 f = io.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000161 self.assertEqual(f.readable(), True)
162 self.assertEqual(f.writable(), False)
163 self.assertEqual(f.seekable(), True)
164 self.read_ops(f)
165 f.close()
166
Guido van Rossum87429772007-04-10 21:06:59 +0000167 def test_buffered_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000168 f = io.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000169 self.assertEqual(f.readable(), False)
170 self.assertEqual(f.writable(), True)
171 self.assertEqual(f.seekable(), True)
172 self.write_ops(f)
173 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000174 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000175 self.assertEqual(f.readable(), True)
176 self.assertEqual(f.writable(), False)
177 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000178 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000179 f.close()
180
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000181 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000182 f = io.open(support.TESTFN, "wb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000183 f.write(b"abc\ndef\nxyzzy\nfoo")
184 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000185 f = io.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000186 self.assertEqual(f.readline(), b"abc\n")
187 self.assertEqual(f.readline(10), b"def\n")
188 self.assertEqual(f.readline(2), b"xy")
189 self.assertEqual(f.readline(4), b"zzy\n")
190 self.assertEqual(f.readline(), b"foo")
191 f.close()
192
Guido van Rossum28524c72007-02-27 05:47:44 +0000193 def test_raw_bytes_io(self):
194 f = io.BytesIO()
195 self.write_ops(f)
196 data = f.getvalue()
197 self.assertEqual(data, b"hello world\n")
198 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000199 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000200
Guido van Rossum53807da2007-04-10 19:01:47 +0000201 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000202 # On Windows and Mac OSX this test comsumes large resources; It takes
203 # a long time to build the >2GB file and takes >2GB of disk space
204 # therefore the resource must be enabled to run this test.
205 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000206 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000207 print("\nTesting large file ops skipped on %s." % sys.platform,
208 file=sys.stderr)
209 print("It requires %d bytes and a long time." % self.LARGE,
210 file=sys.stderr)
211 print("Use 'regrtest.py -u largefile test_io' to run it.",
212 file=sys.stderr)
213 return
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000214 f = io.open(support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000215 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000216 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000217 f = io.open(support.TESTFN, "w+b")
Guido van Rossum87429772007-04-10 21:06:59 +0000218 self.large_file_ops(f)
219 f.close()
220
221 def test_with_open(self):
222 for bufsize in (0, 1, 100):
223 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000225 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000226 self.assertEqual(f.closed, True)
227 f = None
228 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000230 1/0
231 except ZeroDivisionError:
232 self.assertEqual(f.closed, True)
233 else:
234 self.fail("1/0 didn't raise an exception")
235
236 def test_destructor(self):
237 record = []
238 class MyFileIO(io.FileIO):
239 def __del__(self):
240 record.append(1)
241 io.FileIO.__del__(self)
242 def close(self):
243 record.append(2)
244 io.FileIO.close(self)
245 def flush(self):
246 record.append(3)
247 io.FileIO.flush(self)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000248 f = MyFileIO(support.TESTFN, "w")
Guido van Rossum87429772007-04-10 21:06:59 +0000249 f.write("xxx")
250 del f
251 self.assertEqual(record, [1, 2, 3])
252
253 def test_close_flushes(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000254 f = io.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000255 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000256 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000257 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000258 self.assertEqual(f.read(), b"xxx")
259 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000260
Guido van Rossumd4103952007-04-12 05:44:49 +0000261 def test_array_writes(self):
262 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000263 n = len(a.tostring())
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 f = io.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000265 self.assertEqual(f.write(a), n)
266 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 f = io.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000268 self.assertEqual(f.write(a), n)
269 f.close()
270
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000271 def test_closefd(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 self.assertRaises(ValueError, io.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000273 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Christian Heimesecc42a22008-11-05 19:30:32 +0000275 def testReadClosed(self):
276 with io.open(support.TESTFN, "w") as f:
277 f.write("egg\n")
278 with io.open(support.TESTFN, "r") as f:
279 file = io.open(f.fileno(), "r", closefd=False)
280 self.assertEqual(file.read(), "egg\n")
281 file.seek(0)
282 file.close()
283 self.assertRaises(ValueError, file.read)
284
285 def test_no_closefd_with_filename(self):
286 # can't use closefd in combination with a file name
287 self.assertRaises(ValueError, io.open, support.TESTFN, "r", closefd=False)
288
289 def test_closefd_attr(self):
290 with io.open(support.TESTFN, "wb") as f:
291 f.write(b"egg\n")
292 with io.open(support.TESTFN, "r") as f:
293 self.assertEqual(f.buffer.raw.closefd, True)
294 file = io.open(f.fileno(), "r", closefd=False)
295 self.assertEqual(file.buffer.raw.closefd, False)
296
297
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000298class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000299
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000300 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000301 buf = self.buftype("1234567890")
302 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000303
304 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000305 buf = self.buftype("1234567890")
306 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000307
308 self.assertEquals(buf[:1], bytesIo.read(1))
309 self.assertEquals(buf[1:5], bytesIo.read(4))
310 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000311 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000312
313 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000314 buf = self.buftype("1234567890")
315 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000316
317 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000318 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000319
320 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000321 buf = self.buftype("1234567890")
322 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000323
324 bytesIo.read(5)
325 bytesIo.seek(0)
326 self.assertEquals(buf, bytesIo.read())
327
328 bytesIo.seek(3)
329 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000330 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000331
332 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000333 buf = self.buftype("1234567890")
334 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000335
336 self.assertEquals(0, bytesIo.tell())
337 bytesIo.seek(5)
338 self.assertEquals(5, bytesIo.tell())
339 bytesIo.seek(10000)
340 self.assertEquals(10000, bytesIo.tell())
341
Guido van Rossuma9e20242007-03-08 00:43:48 +0000342
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000343class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000344 @staticmethod
345 def buftype(s):
346 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000347 ioclass = io.BytesIO
348 EOF = b""
349
350
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000352 buftype = str
353 ioclass = io.StringIO
354 EOF = ""
355
356
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000357class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000358
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000359 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000360 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000362
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000363 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000364
Guido van Rossum78892e42007-04-06 17:31:18 +0000365 def testBuffering(self):
366 data = b"abcdefghi"
367 dlen = len(data)
368
369 tests = [
370 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
371 [ 100, [ 3, 3, 3], [ dlen ] ],
372 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
373 ]
374
375 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000376 rawio = MockFileIO(data)
377 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000378 pos = 0
379 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000381 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000382 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000383
Guido van Rossum01a27522007-03-07 01:00:12 +0000384 def testReadNonBlocking(self):
385 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000386 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000387 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000388
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000389 self.assertEquals(b"abcd", bufio.read(6))
390 self.assertEquals(b"e", bufio.read(1))
391 self.assertEquals(b"fg", bufio.read())
392 self.assert_(None is bufio.read())
393 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000394
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000395 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000396 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000397 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000398
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
401 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000406
407 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000408 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000410
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000411 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000412
413 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000414 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000415 # this test. Else, write it.
416 pass
417
Antoine Pitrou87695762008-08-14 22:44:29 +0000418 def testThreads(self):
419 try:
420 # Write out many bytes with exactly the same number of 0's,
421 # 1's... 255's. This will help us check that concurrent reading
422 # doesn't duplicate or forget contents.
423 N = 1000
424 l = list(range(256)) * N
425 random.shuffle(l)
426 s = bytes(bytearray(l))
427 with io.open(support.TESTFN, "wb") as f:
428 f.write(s)
429 with io.open(support.TESTFN, "rb", buffering=0) as raw:
430 bufio = io.BufferedReader(raw, 8)
431 errors = []
432 results = []
433 def f():
434 try:
435 # Intra-buffer read then buffer-flushing read
436 for n in cycle([1, 19]):
437 s = bufio.read(n)
438 if not s:
439 break
440 # list.append() is atomic
441 results.append(s)
442 except Exception as e:
443 errors.append(e)
444 raise
445 threads = [threading.Thread(target=f) for x in range(20)]
446 for t in threads:
447 t.start()
448 time.sleep(0.02) # yield
449 for t in threads:
450 t.join()
451 self.assertFalse(errors,
452 "the following exceptions were caught: %r" % errors)
453 s = b''.join(results)
454 for i in range(256):
455 c = bytes(bytearray([i]))
456 self.assertEqual(s.count(c), N)
457 finally:
458 support.unlink(support.TESTFN)
459
460
Guido van Rossuma9e20242007-03-08 00:43:48 +0000461
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000462class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000463
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000464 def testWrite(self):
465 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000466 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000467 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000468
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000469 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000470
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000471 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000472
473 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000474 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000475 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000476
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000477 bufio.write(b"abc")
478 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000479
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000480 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000481
Guido van Rossum01a27522007-03-07 01:00:12 +0000482 def testWriteNonBlocking(self):
483 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000484 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000485
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000486 bufio.write(b"asdf")
487 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000488 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
489
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000490 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000491 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000492 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000493 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
494 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
495
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000496 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000497
Guido van Rossum78892e42007-04-06 17:31:18 +0000498 # XXX I don't like this test. It relies too heavily on how the
499 # algorithm actually works, which we might change. Refactor
500 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000501
502 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000503 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000504 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000505
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000506 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000507
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000508 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000509 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000510 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000511
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000512 bufio.write(b"abc")
513 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000514
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000515 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000516
Antoine Pitrou87695762008-08-14 22:44:29 +0000517 def testThreads(self):
518 # BufferedWriter should not raise exceptions or crash
519 # when called from multiple threads.
520 try:
521 # We use a real file object because it allows us to
522 # exercise situations where the GIL is released before
523 # writing the buffer to the raw streams. This is in addition
524 # to concurrency issues due to switching threads in the middle
525 # of Python code.
526 with io.open(support.TESTFN, "wb", buffering=0) as raw:
527 bufio = io.BufferedWriter(raw, 8)
528 errors = []
529 def f():
530 try:
531 # Write enough bytes to flush the buffer
532 s = b"a" * 19
533 for i in range(50):
534 bufio.write(s)
535 except Exception as e:
536 errors.append(e)
537 raise
538 threads = [threading.Thread(target=f) for x in range(20)]
539 for t in threads:
540 t.start()
541 time.sleep(0.02) # yield
542 for t in threads:
543 t.join()
544 self.assertFalse(errors,
545 "the following exceptions were caught: %r" % errors)
546 finally:
547 support.unlink(support.TESTFN)
548
Guido van Rossuma9e20242007-03-08 00:43:48 +0000549
Guido van Rossum01a27522007-03-07 01:00:12 +0000550class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000551
Guido van Rossum01a27522007-03-07 01:00:12 +0000552 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000553 r = MockRawIO(())
554 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000555 pair = io.BufferedRWPair(r, w)
Benjamin Peterson92035012008-12-27 16:00:54 +0000556 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +0000557
Benjamin Peterson92035012008-12-27 16:00:54 +0000558 # XXX More Tests
Guido van Rossum01a27522007-03-07 01:00:12 +0000559
Guido van Rossuma9e20242007-03-08 00:43:48 +0000560
Guido van Rossum78892e42007-04-06 17:31:18 +0000561class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000562
Guido van Rossum01a27522007-03-07 01:00:12 +0000563 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000564 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000565 rw = io.BufferedRandom(raw, 8, 12)
566
567 self.assertEqual(b"as", rw.read(2))
568 rw.write(b"ddd")
569 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000570 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000571 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000572 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000573
574 def testSeekAndTell(self):
575 raw = io.BytesIO(b"asdfghjkl")
576 rw = io.BufferedRandom(raw)
577
578 self.assertEquals(b"as", rw.read(2))
579 self.assertEquals(2, rw.tell())
580 rw.seek(0, 0)
581 self.assertEquals(b"asdf", rw.read(4))
582
583 rw.write(b"asdf")
584 rw.seek(0, 0)
585 self.assertEquals(b"asdfasdfl", rw.read())
586 self.assertEquals(9, rw.tell())
587 rw.seek(-4, 2)
588 self.assertEquals(5, rw.tell())
589 rw.seek(2, 1)
590 self.assertEquals(7, rw.tell())
591 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000592 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000593
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000594# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
595# properties:
596# - A single output character can correspond to many bytes of input.
597# - The number of input bytes to complete the character can be
598# undetermined until the last input byte is received.
599# - The number of input bytes can vary depending on previous input.
600# - A single input byte can correspond to many characters of output.
601# - The number of output characters can be undetermined until the
602# last input byte is received.
603# - The number of output characters can vary depending on previous input.
604
605class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
606 """
607 For testing seek/tell behavior with a stateful, buffering decoder.
608
609 Input is a sequence of words. Words may be fixed-length (length set
610 by input) or variable-length (period-terminated). In variable-length
611 mode, extra periods are ignored. Possible words are:
612 - 'i' followed by a number sets the input length, I (maximum 99).
613 When I is set to 0, words are space-terminated.
614 - 'o' followed by a number sets the output length, O (maximum 99).
615 - Any other word is converted into a word followed by a period on
616 the output. The output word consists of the input word truncated
617 or padded out with hyphens to make its length equal to O. If O
618 is 0, the word is output verbatim without truncating or padding.
619 I and O are initially set to 1. When I changes, any buffered input is
620 re-scanned according to the new I. EOF also terminates the last word.
621 """
622
623 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +0000624 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000625 self.reset()
626
627 def __repr__(self):
628 return '<SID %x>' % id(self)
629
630 def reset(self):
631 self.i = 1
632 self.o = 1
633 self.buffer = bytearray()
634
635 def getstate(self):
636 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
637 return bytes(self.buffer), i*100 + o
638
639 def setstate(self, state):
640 buffer, io = state
641 self.buffer = bytearray(buffer)
642 i, o = divmod(io, 100)
643 self.i, self.o = i ^ 1, o ^ 1
644
645 def decode(self, input, final=False):
646 output = ''
647 for b in input:
648 if self.i == 0: # variable-length, terminated with period
649 if b == ord('.'):
650 if self.buffer:
651 output += self.process_word()
652 else:
653 self.buffer.append(b)
654 else: # fixed-length, terminate after self.i bytes
655 self.buffer.append(b)
656 if len(self.buffer) == self.i:
657 output += self.process_word()
658 if final and self.buffer: # EOF terminates the last word
659 output += self.process_word()
660 return output
661
662 def process_word(self):
663 output = ''
664 if self.buffer[0] == ord('i'):
665 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
666 elif self.buffer[0] == ord('o'):
667 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
668 else:
669 output = self.buffer.decode('ascii')
670 if len(output) < self.o:
671 output += '-'*self.o # pad out with hyphens
672 if self.o:
673 output = output[:self.o] # truncate to output length
674 output += '.'
675 self.buffer = bytearray()
676 return output
677
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000678 codecEnabled = False
679
680 @classmethod
681 def lookupTestDecoder(cls, name):
682 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +0000683 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000684 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +0000685 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000686 incrementalencoder=None,
687 streamreader=None, streamwriter=None,
688 incrementaldecoder=cls)
689
690# Register the previous decoder for testing.
691# Disabled by default, tests will enable it.
692codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
693
694
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000695class StatefulIncrementalDecoderTest(unittest.TestCase):
696 """
697 Make sure the StatefulIncrementalDecoder actually works.
698 """
699
700 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000701 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000702 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000703 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000704 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000705 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000706 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000707 # I=0, O=6 (variable-length input, fixed-length output)
708 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
709 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000710 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000711 # I=6, O=3 (fixed-length input > fixed-length output)
712 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
713 # I=0, then 3; O=29, then 15 (with longer output)
714 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
715 'a----------------------------.' +
716 'b----------------------------.' +
717 'cde--------------------------.' +
718 'abcdefghijabcde.' +
719 'a.b------------.' +
720 '.c.------------.' +
721 'd.e------------.' +
722 'k--------------.' +
723 'l--------------.' +
724 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000725 ]
726
727 def testDecoder(self):
728 # Try a few one-shot test cases.
729 for input, eof, output in self.test_cases:
730 d = StatefulIncrementalDecoder()
731 self.assertEquals(d.decode(input, eof), output)
732
733 # Also test an unfinished decode, followed by forcing EOF.
734 d = StatefulIncrementalDecoder()
735 self.assertEquals(d.decode(b'oiabcd'), '')
736 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000737
738class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000739
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000740 def setUp(self):
741 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
742 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000743 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000744
Guido van Rossumd0712812007-04-11 16:32:43 +0000745 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000746 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000747
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000748 def testLineBuffering(self):
749 r = io.BytesIO()
750 b = io.BufferedWriter(r, 1000)
751 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
752 t.write("X")
753 self.assertEquals(r.getvalue(), b"") # No flush happened
754 t.write("Y\nZ")
755 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
756 t.write("A\rB")
757 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
758
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000759 def testEncodingErrorsReading(self):
760 # (1) default
761 b = io.BytesIO(b"abc\n\xff\n")
762 t = io.TextIOWrapper(b, encoding="ascii")
763 self.assertRaises(UnicodeError, t.read)
764 # (2) explicit strict
765 b = io.BytesIO(b"abc\n\xff\n")
766 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
767 self.assertRaises(UnicodeError, t.read)
768 # (3) ignore
769 b = io.BytesIO(b"abc\n\xff\n")
770 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
771 self.assertEquals(t.read(), "abc\n\n")
772 # (4) replace
773 b = io.BytesIO(b"abc\n\xff\n")
774 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
775 self.assertEquals(t.read(), "abc\n\ufffd\n")
776
777 def testEncodingErrorsWriting(self):
778 # (1) default
779 b = io.BytesIO()
780 t = io.TextIOWrapper(b, encoding="ascii")
781 self.assertRaises(UnicodeError, t.write, "\xff")
782 # (2) explicit strict
783 b = io.BytesIO()
784 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
785 self.assertRaises(UnicodeError, t.write, "\xff")
786 # (3) ignore
787 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000788 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
789 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000790 t.write("abc\xffdef\n")
791 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000792 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000793 # (4) replace
794 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000795 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
796 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000797 t.write("abc\xffdef\n")
798 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000799 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000800
Guido van Rossum8358db22007-08-18 21:39:55 +0000801 def testNewlinesInput(self):
802 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
803 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
804 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000805 (None, normalized.decode("ascii").splitlines(True)),
806 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000807 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
808 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
809 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
810 ]:
811 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000812 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000813 self.assertEquals(txt.readlines(), expected)
814 txt.seek(0)
815 self.assertEquals(txt.read(), "".join(expected))
816
817 def testNewlinesOutput(self):
818 testdict = {
819 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
820 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
821 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
822 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
823 }
824 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
825 for newline, expected in tests:
826 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000827 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000828 txt.write("AAA\nB")
829 txt.write("BB\nCCC\n")
830 txt.write("X\rY\r\nZ")
831 txt.flush()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000832 self.assertEquals(buf.closed, False)
Guido van Rossum8358db22007-08-18 21:39:55 +0000833 self.assertEquals(buf.getvalue(), expected)
834
Guido van Rossum78892e42007-04-06 17:31:18 +0000835 def testNewlines(self):
836 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
837
838 tests = [
839 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000840 [ '', input_lines ],
841 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
842 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
843 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000844 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +0000845 encodings = (
846 'utf-8', 'latin-1',
847 'utf-16', 'utf-16-le', 'utf-16-be',
848 'utf-32', 'utf-32-le', 'utf-32-be',
849 )
Guido van Rossum78892e42007-04-06 17:31:18 +0000850
Guido van Rossum8358db22007-08-18 21:39:55 +0000851 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000852 # character in TextIOWrapper._pending_line.
853 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000854 # XXX: str.encode() should return bytes
855 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000856 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000857 for bufsize in range(1, 10):
858 for newline, exp_lines in tests:
859 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000860 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000861 encoding=encoding)
862 if do_reads:
863 got_lines = []
864 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000865 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000866 if c2 == '':
867 break
868 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000869 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000870 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000871 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000872
873 for got_line, exp_line in zip(got_lines, exp_lines):
874 self.assertEquals(got_line, exp_line)
875 self.assertEquals(len(got_lines), len(exp_lines))
876
Guido van Rossum8358db22007-08-18 21:39:55 +0000877 def testNewlinesInput(self):
878 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
879 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
880 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000881 (None, normalized.decode("ascii").splitlines(True)),
882 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000883 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
884 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
885 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
886 ]:
887 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000888 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000889 self.assertEquals(txt.readlines(), expected)
890 txt.seek(0)
891 self.assertEquals(txt.read(), "".join(expected))
892
893 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000894 data = "AAA\nBBB\rCCC\n"
895 data_lf = b"AAA\nBBB\rCCC\n"
896 data_cr = b"AAA\rBBB\rCCC\r"
897 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000898 save_linesep = os.linesep
899 try:
900 for os.linesep, newline, expected in [
901 ("\n", None, data_lf),
902 ("\r\n", None, data_crlf),
903 ("\n", "", data_lf),
904 ("\r\n", "", data_lf),
905 ("\n", "\n", data_lf),
906 ("\r\n", "\n", data_lf),
907 ("\n", "\r", data_cr),
908 ("\r\n", "\r", data_cr),
909 ("\n", "\r\n", data_crlf),
910 ("\r\n", "\r\n", data_crlf),
911 ]:
912 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000913 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000914 txt.write(data)
915 txt.close()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000916 self.assertEquals(buf.closed, True)
917 self.assertRaises(ValueError, buf.getvalue)
Guido van Rossum18363582007-08-18 22:05:58 +0000918 finally:
919 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000920
Guido van Rossum9b76da62007-04-11 01:09:03 +0000921 # Systematic tests of the text I/O API
922
923 def testBasicIO(self):
924 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
925 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000926 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000927 f._CHUNK_SIZE = chunksize
928 self.assertEquals(f.write("abc"), 3)
929 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000930 f = io.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000931 f._CHUNK_SIZE = chunksize
932 self.assertEquals(f.tell(), 0)
933 self.assertEquals(f.read(), "abc")
934 cookie = f.tell()
935 self.assertEquals(f.seek(0), 0)
936 self.assertEquals(f.read(2), "ab")
937 self.assertEquals(f.read(1), "c")
938 self.assertEquals(f.read(1), "")
939 self.assertEquals(f.read(), "")
940 self.assertEquals(f.tell(), cookie)
941 self.assertEquals(f.seek(0), 0)
942 self.assertEquals(f.seek(0, 2), cookie)
943 self.assertEquals(f.write("def"), 3)
944 self.assertEquals(f.seek(cookie), cookie)
945 self.assertEquals(f.read(), "def")
946 if enc.startswith("utf"):
947 self.multi_line_test(f, enc)
948 f.close()
949
950 def multi_line_test(self, f, enc):
951 f.seek(0)
952 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000953 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000954 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000955 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000956 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000957 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000958 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000959 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000960 wlines.append((f.tell(), line))
961 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000962 f.seek(0)
963 rlines = []
964 while True:
965 pos = f.tell()
966 line = f.readline()
967 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000968 break
969 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000970 self.assertEquals(rlines, wlines)
971
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000972 def testTelling(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000973 f = io.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000974 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000975 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000976 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000977 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000978 p2 = f.tell()
979 f.seek(0)
980 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000981 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000982 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000983 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000984 self.assertEquals(f.tell(), p2)
985 f.seek(0)
986 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000987 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000988 self.assertRaises(IOError, f.tell)
989 self.assertEquals(f.tell(), p2)
990 f.close()
991
Guido van Rossumd76e7792007-04-17 02:38:04 +0000992 def testSeeking(self):
993 chunk_size = io.TextIOWrapper._CHUNK_SIZE
994 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000995 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000996 prefix = bytes(u_prefix.encode("utf-8"))
997 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000998 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000999 suffix = bytes(u_suffix.encode("utf-8"))
1000 line = prefix + suffix
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001001 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001002 f.write(line*2)
1003 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001004 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001005 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001006 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001007 self.assertEquals(f.tell(), prefix_size)
1008 self.assertEquals(f.readline(), u_suffix)
1009
1010 def testSeekingToo(self):
1011 # Regression test for a specific bug
1012 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001013 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001014 f.write(data)
1015 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001016 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001017 f._CHUNK_SIZE # Just test that it exists
1018 f._CHUNK_SIZE = 2
1019 f.readline()
1020 f.tell()
1021
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001022 def testSeekAndTell(self):
1023 """Test seek/tell using the StatefulIncrementalDecoder."""
Antoine Pitrouc10c34d2009-01-17 02:40:13 +00001024 # Make this test faster by forcing a smaller (but large enough)
1025 # chunk size. The bigger the chunker size, the slower seek() is,
1026 # as it tries to replay character decoding one byte at a time.
1027 CHUNK_SIZE = 256
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001028
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001029 def testSeekAndTellWithData(data, min_pos=0):
1030 """Tell/seek to various points within a data stream and ensure
1031 that the decoded data returned by read() is consistent."""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001032 f = io.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001033 f.write(data)
1034 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001035 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001036 decoded = f.read()
1037 f.close()
1038
Neal Norwitze2b07052008-03-18 19:52:05 +00001039 for i in range(min_pos, len(decoded) + 1): # seek positions
1040 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001041 f = io.open(support.TESTFN, encoding='test_decoder')
Antoine Pitrouc10c34d2009-01-17 02:40:13 +00001042 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001043 self.assertEquals(f.read(i), decoded[:i])
1044 cookie = f.tell()
1045 self.assertEquals(f.read(j), decoded[i:i + j])
1046 f.seek(cookie)
1047 self.assertEquals(f.read(), decoded[i:])
1048 f.close()
1049
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001050 # Enable the test decoder.
1051 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001052
1053 # Run the tests.
1054 try:
1055 # Try each test case.
1056 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1057 testSeekAndTellWithData(input)
1058
1059 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001060 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1061 offset = CHUNK_SIZE - len(input)//2
1062 prefix = b'.'*offset
1063 # Don't bother seeking into the prefix (takes too long).
1064 min_pos = offset*2
1065 testSeekAndTellWithData(prefix + input, min_pos)
1066
1067 # Ensure our test decoder won't interfere with subsequent tests.
1068 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001069 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001070
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001071 def testEncodedWrites(self):
1072 data = "1234567890"
1073 tests = ("utf-16",
1074 "utf-16-le",
1075 "utf-16-be",
1076 "utf-32",
1077 "utf-32-le",
1078 "utf-32-be")
1079 for encoding in tests:
1080 buf = io.BytesIO()
1081 f = io.TextIOWrapper(buf, encoding=encoding)
1082 # Check if the BOM is written only once (see issue1753).
1083 f.write(data)
1084 f.write(data)
1085 f.seek(0)
1086 self.assertEquals(f.read(), data * 2)
1087 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1088
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001089 def timingTest(self):
1090 timer = time.time
1091 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001092 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001093 nlines = 10000
1094 nchars = len(line)
1095 nbytes = len(line.encode(enc))
1096 for chunk_size in (32, 64, 128, 256):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001097 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001098 f._CHUNK_SIZE = chunk_size
1099 t0 = timer()
1100 for i in range(nlines):
1101 f.write(line)
1102 f.flush()
1103 t1 = timer()
1104 f.seek(0)
1105 for line in f:
1106 pass
1107 t2 = timer()
1108 f.seek(0)
1109 while f.readline():
1110 pass
1111 t3 = timer()
1112 f.seek(0)
1113 while f.readline():
1114 f.tell()
1115 t4 = timer()
1116 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001117 if support.verbose:
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001118 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1119 (nlines, nchars, nbytes))
1120 print("File chunk size: %6s" % f._CHUNK_SIZE)
1121 print("Writing: %6.3f seconds" % (t1-t0))
1122 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1123 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1124 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1125
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001126 def testReadOneByOne(self):
1127 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1128 reads = ""
1129 while True:
1130 c = txt.read(1)
1131 if not c:
1132 break
1133 reads += c
1134 self.assertEquals(reads, "AA\nBB")
1135
1136 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1137 def testReadByChunk(self):
1138 # make sure "\r\n" straddles 128 char boundary.
1139 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1140 reads = ""
1141 while True:
1142 c = txt.read(128)
1143 if not c:
1144 break
1145 reads += c
1146 self.assertEquals(reads, "A"*127+"\nB")
1147
1148 def test_issue1395_1(self):
1149 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1150
1151 # read one char at a time
1152 reads = ""
1153 while True:
1154 c = txt.read(1)
1155 if not c:
1156 break
1157 reads += c
1158 self.assertEquals(reads, self.normalized)
1159
1160 def test_issue1395_2(self):
1161 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1162 txt._CHUNK_SIZE = 4
1163
1164 reads = ""
1165 while True:
1166 c = txt.read(4)
1167 if not c:
1168 break
1169 reads += c
1170 self.assertEquals(reads, self.normalized)
1171
1172 def test_issue1395_3(self):
1173 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1174 txt._CHUNK_SIZE = 4
1175
1176 reads = txt.read(4)
1177 reads += txt.read(4)
1178 reads += txt.readline()
1179 reads += txt.readline()
1180 reads += txt.readline()
1181 self.assertEquals(reads, self.normalized)
1182
1183 def test_issue1395_4(self):
1184 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1185 txt._CHUNK_SIZE = 4
1186
1187 reads = txt.read(4)
1188 reads += txt.read()
1189 self.assertEquals(reads, self.normalized)
1190
1191 def test_issue1395_5(self):
1192 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1193 txt._CHUNK_SIZE = 4
1194
1195 reads = txt.read(4)
1196 pos = txt.tell()
1197 txt.seek(0)
1198 txt.seek(pos)
1199 self.assertEquals(txt.read(4), "BBB\n")
1200
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001201 def test_issue2282(self):
1202 buffer = io.BytesIO(self.testdata)
1203 txt = io.TextIOWrapper(buffer, encoding="ascii")
1204
1205 self.assertEqual(buffer.seekable(), txt.seekable())
1206
Antoine Pitrou180a3362008-12-14 16:36:46 +00001207 def check_newline_decoder_utf8(self, decoder):
1208 # UTF-8 specific tests for a newline decoder
1209 def _check_decode(b, s, **kwargs):
1210 # We exercise getstate() / setstate() as well as decode()
1211 state = decoder.getstate()
1212 self.assertEquals(decoder.decode(b, **kwargs), s)
1213 decoder.setstate(state)
1214 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001215
Antoine Pitrou180a3362008-12-14 16:36:46 +00001216 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001217
Antoine Pitrou180a3362008-12-14 16:36:46 +00001218 _check_decode(b'\xe8', "")
1219 _check_decode(b'\xa2', "")
1220 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001221
Antoine Pitrou180a3362008-12-14 16:36:46 +00001222 _check_decode(b'\xe8', "")
1223 _check_decode(b'\xa2', "")
1224 _check_decode(b'\x88', "\u8888")
1225
1226 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001227 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1228
Antoine Pitrou180a3362008-12-14 16:36:46 +00001229 decoder.reset()
1230 _check_decode(b'\n', "\n")
1231 _check_decode(b'\r', "")
1232 _check_decode(b'', "\n", final=True)
1233 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001234
Antoine Pitrou180a3362008-12-14 16:36:46 +00001235 _check_decode(b'\r', "")
1236 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001237
Antoine Pitrou180a3362008-12-14 16:36:46 +00001238 _check_decode(b'\r\r\n', "\n\n")
1239 _check_decode(b'\r', "")
1240 _check_decode(b'\r', "\n")
1241 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001242
Antoine Pitrou180a3362008-12-14 16:36:46 +00001243 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1244 _check_decode(b'\xe8\xa2\x88', "\u8888")
1245 _check_decode(b'\n', "\n")
1246 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1247 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001248
Antoine Pitrou180a3362008-12-14 16:36:46 +00001249 def check_newline_decoder(self, decoder, encoding):
1250 result = []
1251 encoder = codecs.getincrementalencoder(encoding)()
1252 def _decode_bytewise(s):
1253 for b in encoder.encode(s):
1254 result.append(decoder.decode(bytes([b])))
1255 self.assertEquals(decoder.newlines, None)
1256 _decode_bytewise("abc\n\r")
1257 self.assertEquals(decoder.newlines, '\n')
1258 _decode_bytewise("\nabc")
1259 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1260 _decode_bytewise("abc\r")
1261 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1262 _decode_bytewise("abc")
1263 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1264 _decode_bytewise("abc\r")
1265 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1266 decoder.reset()
1267 self.assertEquals(decoder.decode("abc".encode(encoding)), "abc")
1268 self.assertEquals(decoder.newlines, None)
1269
1270 def test_newline_decoder(self):
1271 encodings = (
1272 'utf-8', 'latin-1',
1273 'utf-16', 'utf-16-le', 'utf-16-be',
1274 'utf-32', 'utf-32-le', 'utf-32-be',
1275 )
1276 for enc in encodings:
1277 decoder = codecs.getincrementaldecoder(enc)()
1278 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1279 self.check_newline_decoder(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001280 decoder = codecs.getincrementaldecoder("utf-8")()
1281 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
Antoine Pitrou180a3362008-12-14 16:36:46 +00001282 self.check_newline_decoder_utf8(decoder)
1283
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001284
Guido van Rossum01a27522007-03-07 01:00:12 +00001285# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001286
Guido van Rossum5abbf752007-08-27 17:39:33 +00001287class MiscIOTest(unittest.TestCase):
1288
Barry Warsaw40e82462008-11-20 20:14:50 +00001289 def tearDown(self):
1290 support.unlink(support.TESTFN)
1291
Guido van Rossum5abbf752007-08-27 17:39:33 +00001292 def testImport__all__(self):
1293 for name in io.__all__:
1294 obj = getattr(io, name, None)
1295 self.assert_(obj is not None, name)
1296 if name == "open":
1297 continue
1298 elif "error" in name.lower():
1299 self.assert_(issubclass(obj, Exception), name)
1300 else:
1301 self.assert_(issubclass(obj, io.IOBase))
1302
Benjamin Peterson65676e42008-11-05 21:42:45 +00001303
Barry Warsaw40e82462008-11-20 20:14:50 +00001304 def test_attributes(self):
1305 f = io.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001306 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001307 f.close()
1308
1309 f = io.open(support.TESTFN, "U")
1310 self.assertEquals(f.name, support.TESTFN)
1311 self.assertEquals(f.buffer.name, support.TESTFN)
1312 self.assertEquals(f.buffer.raw.name, support.TESTFN)
1313 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001314 self.assertEquals(f.buffer.mode, "rb")
1315 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001316 f.close()
1317
1318 f = io.open(support.TESTFN, "w+")
1319 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001320 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
1321 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00001322
1323 g = io.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001324 self.assertEquals(g.mode, "wb")
1325 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001326 self.assertEquals(g.name, f.fileno())
1327 self.assertEquals(g.raw.name, f.fileno())
1328 f.close()
1329 g.close()
1330
Antoine Pitrou8043cf82009-01-09 19:54:29 +00001331 def test_io_after_close(self):
1332 for kwargs in [
1333 {"mode": "w"},
1334 {"mode": "wb"},
1335 {"mode": "w", "buffering": 1},
1336 {"mode": "w", "buffering": 2},
1337 {"mode": "wb", "buffering": 0},
1338 {"mode": "r"},
1339 {"mode": "rb"},
1340 {"mode": "r", "buffering": 1},
1341 {"mode": "r", "buffering": 2},
1342 {"mode": "rb", "buffering": 0},
1343 {"mode": "w+"},
1344 {"mode": "w+b"},
1345 {"mode": "w+", "buffering": 1},
1346 {"mode": "w+", "buffering": 2},
1347 {"mode": "w+b", "buffering": 0},
1348 ]:
1349 f = io.open(support.TESTFN, **kwargs)
1350 f.close()
1351 self.assertRaises(ValueError, f.flush)
1352 self.assertRaises(ValueError, f.fileno)
1353 self.assertRaises(ValueError, f.isatty)
1354 self.assertRaises(ValueError, f.__iter__)
1355 if hasattr(f, "peek"):
1356 self.assertRaises(ValueError, f.peek, 1)
1357 self.assertRaises(ValueError, f.read)
1358 if hasattr(f, "read1"):
1359 self.assertRaises(ValueError, f.read1, 1024)
1360 if hasattr(f, "readinto"):
1361 self.assertRaises(ValueError, f.readinto, bytearray(1024))
1362 self.assertRaises(ValueError, f.readline)
1363 self.assertRaises(ValueError, f.readlines)
1364 self.assertRaises(ValueError, f.seek, 0)
1365 self.assertRaises(ValueError, f.tell)
1366 self.assertRaises(ValueError, f.truncate)
1367 self.assertRaises(ValueError, f.write, "")
1368 self.assertRaises(ValueError, f.writelines, [])
1369
Barry Warsaw40e82462008-11-20 20:14:50 +00001370
Guido van Rossum28524c72007-02-27 05:47:44 +00001371def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001372 support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Christian Heimes81ee3ef2008-05-04 22:42:01 +00001373 BufferedReaderTest, BufferedWriterTest,
1374 BufferedRWPairTest, BufferedRandomTest,
1375 StatefulIncrementalDecoderTest,
1376 TextIOWrapperTest, MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001377
1378if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001379 unittest.main()