blob: 0c37cc7397efcf33aed5fc58b95df94232062f56 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000011import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000012import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000013
Guido van Rossuma9e20242007-03-08 00:43:48 +000014
Guido van Rossum53807da2007-04-10 19:01:47 +000015class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000017 def __init__(self, read_stack=()):
18 self._read_stack = list(read_stack)
19 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000020
21 def read(self, n=None):
22 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000023 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000024 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000025 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026
Guido van Rossum01a27522007-03-07 01:00:12 +000027 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000028 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000029 return len(b)
30
31 def writable(self):
32 return True
33
Guido van Rossum68bbcd22007-02-27 17:19:33 +000034 def fileno(self):
35 return 42
36
37 def readable(self):
38 return True
39
Guido van Rossum01a27522007-03-07 01:00:12 +000040 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000041 return True
42
Guido van Rossum01a27522007-03-07 01:00:12 +000043 def seek(self, pos, whence):
44 pass
45
46 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000047 return 42
48
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum78892e42007-04-06 17:31:18 +000050class MockFileIO(io.BytesIO):
51
52 def __init__(self, data):
53 self.read_history = []
54 io.BytesIO.__init__(self, data)
55
56 def read(self, n=None):
57 res = io.BytesIO.read(self, n)
58 self.read_history.append(None if res is None else len(res))
59 return res
60
61
Guido van Rossum01a27522007-03-07 01:00:12 +000062class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000063
Guido van Rossum53807da2007-04-10 19:01:47 +000064 def __init__(self, blocking_script):
65 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000066 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000069 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000070 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000071 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000072 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 else:
74 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000075
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def writable(self):
77 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078
Guido van Rossuma9e20242007-03-08 00:43:48 +000079
Guido van Rossum28524c72007-02-27 05:47:44 +000080class IOTest(unittest.TestCase):
81
Neal Norwitze7789b12008-03-24 06:18:09 +000082 def setUp(self):
83 test_support.unlink(test_support.TESTFN)
84
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000085 def tearDown(self):
86 test_support.unlink(test_support.TESTFN)
87
Guido van Rossum28524c72007-02-27 05:47:44 +000088 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.write(b"blah."), 5)
90 self.assertEqual(f.seek(0), 0)
91 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000092 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000093 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000094 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000095 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000096 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000097 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000098 self.assertEqual(f.seek(-1, 2), 13)
99 self.assertEqual(f.tell(), 13)
100 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000101 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000102 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000103
Guido van Rossum9b76da62007-04-11 01:09:03 +0000104 def read_ops(self, f, buffered=False):
105 data = f.read(5)
106 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000107 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000108 self.assertEqual(f.readinto(data), 5)
109 self.assertEqual(data, b" worl")
110 self.assertEqual(f.readinto(data), 2)
111 self.assertEqual(len(data), 5)
112 self.assertEqual(data[:2], b"d\n")
113 self.assertEqual(f.seek(0), 0)
114 self.assertEqual(f.read(20), b"hello world\n")
115 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000116 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000117 self.assertEqual(f.seek(-6, 2), 6)
118 self.assertEqual(f.read(5), b"world")
119 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000120 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000121 self.assertEqual(f.seek(-6, 1), 5)
122 self.assertEqual(f.read(5), b" worl")
123 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000124 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000125 if buffered:
126 f.seek(0)
127 self.assertEqual(f.read(), b"hello world\n")
128 f.seek(6)
129 self.assertEqual(f.read(), b"world\n")
130 self.assertEqual(f.read(), b"")
131
Guido van Rossum34d69e52007-04-10 20:08:41 +0000132 LARGE = 2**31
133
Guido van Rossum53807da2007-04-10 19:01:47 +0000134 def large_file_ops(self, f):
135 assert f.readable()
136 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000137 self.assertEqual(f.seek(self.LARGE), self.LARGE)
138 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000139 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000140 self.assertEqual(f.tell(), self.LARGE + 3)
141 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000142 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000143 self.assertEqual(f.tell(), self.LARGE + 2)
144 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000145 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000146 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000147 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
148 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000149 self.assertEqual(f.read(2), b"x")
150
Guido van Rossum28524c72007-02-27 05:47:44 +0000151 def test_raw_file_io(self):
152 f = io.open(test_support.TESTFN, "wb", buffering=0)
153 self.assertEqual(f.readable(), False)
154 self.assertEqual(f.writable(), True)
155 self.assertEqual(f.seekable(), True)
156 self.write_ops(f)
157 f.close()
158 f = io.open(test_support.TESTFN, "rb", buffering=0)
159 self.assertEqual(f.readable(), True)
160 self.assertEqual(f.writable(), False)
161 self.assertEqual(f.seekable(), True)
162 self.read_ops(f)
163 f.close()
164
Guido van Rossum87429772007-04-10 21:06:59 +0000165 def test_buffered_file_io(self):
166 f = io.open(test_support.TESTFN, "wb")
167 self.assertEqual(f.readable(), False)
168 self.assertEqual(f.writable(), True)
169 self.assertEqual(f.seekable(), True)
170 self.write_ops(f)
171 f.close()
172 f = io.open(test_support.TESTFN, "rb")
173 self.assertEqual(f.readable(), True)
174 self.assertEqual(f.writable(), False)
175 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000176 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000177 f.close()
178
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000179 def test_readline(self):
180 f = io.open(test_support.TESTFN, "wb")
181 f.write(b"abc\ndef\nxyzzy\nfoo")
182 f.close()
183 f = io.open(test_support.TESTFN, "rb")
184 self.assertEqual(f.readline(), b"abc\n")
185 self.assertEqual(f.readline(10), b"def\n")
186 self.assertEqual(f.readline(2), b"xy")
187 self.assertEqual(f.readline(4), b"zzy\n")
188 self.assertEqual(f.readline(), b"foo")
189 f.close()
190
Guido van Rossum28524c72007-02-27 05:47:44 +0000191 def test_raw_bytes_io(self):
192 f = io.BytesIO()
193 self.write_ops(f)
194 data = f.getvalue()
195 self.assertEqual(data, b"hello world\n")
196 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000197 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000198
Guido van Rossum53807da2007-04-10 19:01:47 +0000199 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000200 # On Windows and Mac OSX this test comsumes large resources; It takes
201 # a long time to build the >2GB file and takes >2GB of disk space
202 # therefore the resource must be enabled to run this test.
203 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
204 if not test_support.is_resource_enabled("largefile"):
205 print("\nTesting large file ops skipped on %s." % sys.platform,
206 file=sys.stderr)
207 print("It requires %d bytes and a long time." % self.LARGE,
208 file=sys.stderr)
209 print("Use 'regrtest.py -u largefile test_io' to run it.",
210 file=sys.stderr)
211 return
Guido van Rossum87429772007-04-10 21:06:59 +0000212 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000213 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000214 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000215 f = io.open(test_support.TESTFN, "w+b")
216 self.large_file_ops(f)
217 f.close()
218
219 def test_with_open(self):
220 for bufsize in (0, 1, 100):
221 f = None
222 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000223 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000224 self.assertEqual(f.closed, True)
225 f = None
226 try:
227 with open(test_support.TESTFN, "wb", bufsize) as f:
228 1/0
229 except ZeroDivisionError:
230 self.assertEqual(f.closed, True)
231 else:
232 self.fail("1/0 didn't raise an exception")
233
234 def test_destructor(self):
235 record = []
236 class MyFileIO(io.FileIO):
237 def __del__(self):
238 record.append(1)
239 io.FileIO.__del__(self)
240 def close(self):
241 record.append(2)
242 io.FileIO.close(self)
243 def flush(self):
244 record.append(3)
245 io.FileIO.flush(self)
246 f = MyFileIO(test_support.TESTFN, "w")
247 f.write("xxx")
248 del f
249 self.assertEqual(record, [1, 2, 3])
250
251 def test_close_flushes(self):
252 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000253 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000254 f.close()
255 f = io.open(test_support.TESTFN, "rb")
256 self.assertEqual(f.read(), b"xxx")
257 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000258
Guido van Rossumd4103952007-04-12 05:44:49 +0000259 def test_array_writes(self):
260 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000261 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000262 f = io.open(test_support.TESTFN, "wb", 0)
263 self.assertEqual(f.write(a), n)
264 f.close()
265 f = io.open(test_support.TESTFN, "wb")
266 self.assertEqual(f.write(a), n)
267 f.close()
268
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000269 def test_closefd(self):
270 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
271 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000272
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000273class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000275 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000278
279 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000280 buf = self.buftype("1234567890")
281 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000282
283 self.assertEquals(buf[:1], bytesIo.read(1))
284 self.assertEquals(buf[1:5], bytesIo.read(4))
285 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000286 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000287
288 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000289 buf = self.buftype("1234567890")
290 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000291
292 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000293 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000294
295 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000296 buf = self.buftype("1234567890")
297 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000298
299 bytesIo.read(5)
300 bytesIo.seek(0)
301 self.assertEquals(buf, bytesIo.read())
302
303 bytesIo.seek(3)
304 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000305 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000306
307 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000308 buf = self.buftype("1234567890")
309 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000310
311 self.assertEquals(0, bytesIo.tell())
312 bytesIo.seek(5)
313 self.assertEquals(5, bytesIo.tell())
314 bytesIo.seek(10000)
315 self.assertEquals(10000, bytesIo.tell())
316
Guido van Rossuma9e20242007-03-08 00:43:48 +0000317
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000318class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000319 @staticmethod
320 def buftype(s):
321 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000322 ioclass = io.BytesIO
323 EOF = b""
324
325
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000326class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000327 buftype = str
328 ioclass = io.StringIO
329 EOF = ""
330
331
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000332class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000333
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000334 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000335 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000336 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000337
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000338 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000339
Guido van Rossum78892e42007-04-06 17:31:18 +0000340 def testBuffering(self):
341 data = b"abcdefghi"
342 dlen = len(data)
343
344 tests = [
345 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
346 [ 100, [ 3, 3, 3], [ dlen ] ],
347 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
348 ]
349
350 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351 rawio = MockFileIO(data)
352 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000353 pos = 0
354 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000355 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000356 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000357 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000358
Guido van Rossum01a27522007-03-07 01:00:12 +0000359 def testReadNonBlocking(self):
360 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000361 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000362 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000363
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000364 self.assertEquals(b"abcd", bufio.read(6))
365 self.assertEquals(b"e", bufio.read(1))
366 self.assertEquals(b"fg", bufio.read())
367 self.assert_(None is bufio.read())
368 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000369
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000371 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000372 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000373
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000374 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000375
376 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000377 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000378 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000379
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000381
382 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000383 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000384 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000385
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000386 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387
388 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000389 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000390 # this test. Else, write it.
391 pass
392
Guido van Rossuma9e20242007-03-08 00:43:48 +0000393
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000394class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000395
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000396 def testWrite(self):
397 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000398 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000401 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000402
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
405 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000406 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000407 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000408
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 bufio.write(b"abc")
410 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000411
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000412 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000413
Guido van Rossum01a27522007-03-07 01:00:12 +0000414 def testWriteNonBlocking(self):
415 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000416 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000417
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000418 bufio.write(b"asdf")
419 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000420 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
421
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000422 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000424 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000425 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
426 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
427
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000428 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000429
Guido van Rossum78892e42007-04-06 17:31:18 +0000430 # XXX I don't like this test. It relies too heavily on how the
431 # algorithm actually works, which we might change. Refactor
432 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000433
434 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000435 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000436 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000437
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000438 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000439
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000440 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000441 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000442 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000443
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000444 bufio.write(b"abc")
445 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000446
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000447 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000448
Guido van Rossuma9e20242007-03-08 00:43:48 +0000449
Guido van Rossum01a27522007-03-07 01:00:12 +0000450class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000451
Guido van Rossum01a27522007-03-07 01:00:12 +0000452 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000453 r = MockRawIO(())
454 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000455 pair = io.BufferedRWPair(r, w)
456
457 # XXX need implementation
458
Guido van Rossuma9e20242007-03-08 00:43:48 +0000459
Guido van Rossum78892e42007-04-06 17:31:18 +0000460class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000461
Guido van Rossum01a27522007-03-07 01:00:12 +0000462 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000463 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000464 rw = io.BufferedRandom(raw, 8, 12)
465
466 self.assertEqual(b"as", rw.read(2))
467 rw.write(b"ddd")
468 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000469 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000470 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000471 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000472
473 def testSeekAndTell(self):
474 raw = io.BytesIO(b"asdfghjkl")
475 rw = io.BufferedRandom(raw)
476
477 self.assertEquals(b"as", rw.read(2))
478 self.assertEquals(2, rw.tell())
479 rw.seek(0, 0)
480 self.assertEquals(b"asdf", rw.read(4))
481
482 rw.write(b"asdf")
483 rw.seek(0, 0)
484 self.assertEquals(b"asdfasdfl", rw.read())
485 self.assertEquals(9, rw.tell())
486 rw.seek(-4, 2)
487 self.assertEquals(5, rw.tell())
488 rw.seek(2, 1)
489 self.assertEquals(7, rw.tell())
490 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000491 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000492
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000493# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
494# properties:
495# - A single output character can correspond to many bytes of input.
496# - The number of input bytes to complete the character can be
497# undetermined until the last input byte is received.
498# - The number of input bytes can vary depending on previous input.
499# - A single input byte can correspond to many characters of output.
500# - The number of output characters can be undetermined until the
501# last input byte is received.
502# - The number of output characters can vary depending on previous input.
503
504class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
505 """
506 For testing seek/tell behavior with a stateful, buffering decoder.
507
508 Input is a sequence of words. Words may be fixed-length (length set
509 by input) or variable-length (period-terminated). In variable-length
510 mode, extra periods are ignored. Possible words are:
511 - 'i' followed by a number sets the input length, I (maximum 99).
512 When I is set to 0, words are space-terminated.
513 - 'o' followed by a number sets the output length, O (maximum 99).
514 - Any other word is converted into a word followed by a period on
515 the output. The output word consists of the input word truncated
516 or padded out with hyphens to make its length equal to O. If O
517 is 0, the word is output verbatim without truncating or padding.
518 I and O are initially set to 1. When I changes, any buffered input is
519 re-scanned according to the new I. EOF also terminates the last word.
520 """
521
522 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +0000523 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000524 self.reset()
525
526 def __repr__(self):
527 return '<SID %x>' % id(self)
528
529 def reset(self):
530 self.i = 1
531 self.o = 1
532 self.buffer = bytearray()
533
534 def getstate(self):
535 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
536 return bytes(self.buffer), i*100 + o
537
538 def setstate(self, state):
539 buffer, io = state
540 self.buffer = bytearray(buffer)
541 i, o = divmod(io, 100)
542 self.i, self.o = i ^ 1, o ^ 1
543
544 def decode(self, input, final=False):
545 output = ''
546 for b in input:
547 if self.i == 0: # variable-length, terminated with period
548 if b == ord('.'):
549 if self.buffer:
550 output += self.process_word()
551 else:
552 self.buffer.append(b)
553 else: # fixed-length, terminate after self.i bytes
554 self.buffer.append(b)
555 if len(self.buffer) == self.i:
556 output += self.process_word()
557 if final and self.buffer: # EOF terminates the last word
558 output += self.process_word()
559 return output
560
561 def process_word(self):
562 output = ''
563 if self.buffer[0] == ord('i'):
564 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
565 elif self.buffer[0] == ord('o'):
566 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
567 else:
568 output = self.buffer.decode('ascii')
569 if len(output) < self.o:
570 output += '-'*self.o # pad out with hyphens
571 if self.o:
572 output = output[:self.o] # truncate to output length
573 output += '.'
574 self.buffer = bytearray()
575 return output
576
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000577 codecEnabled = False
578
579 @classmethod
580 def lookupTestDecoder(cls, name):
581 if cls.codecEnabled and name == 'test_decoder':
582 return codecs.CodecInfo(
583 name='test_decoder', encode=None, decode=None,
584 incrementalencoder=None,
585 streamreader=None, streamwriter=None,
586 incrementaldecoder=cls)
587
588# Register the previous decoder for testing.
589# Disabled by default, tests will enable it.
590codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
591
592
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000593class StatefulIncrementalDecoderTest(unittest.TestCase):
594 """
595 Make sure the StatefulIncrementalDecoder actually works.
596 """
597
598 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000599 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000600 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000601 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000602 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000603 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000604 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000605 # I=0, O=6 (variable-length input, fixed-length output)
606 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
607 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000608 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000609 # I=6, O=3 (fixed-length input > fixed-length output)
610 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
611 # I=0, then 3; O=29, then 15 (with longer output)
612 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
613 'a----------------------------.' +
614 'b----------------------------.' +
615 'cde--------------------------.' +
616 'abcdefghijabcde.' +
617 'a.b------------.' +
618 '.c.------------.' +
619 'd.e------------.' +
620 'k--------------.' +
621 'l--------------.' +
622 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000623 ]
624
625 def testDecoder(self):
626 # Try a few one-shot test cases.
627 for input, eof, output in self.test_cases:
628 d = StatefulIncrementalDecoder()
629 self.assertEquals(d.decode(input, eof), output)
630
631 # Also test an unfinished decode, followed by forcing EOF.
632 d = StatefulIncrementalDecoder()
633 self.assertEquals(d.decode(b'oiabcd'), '')
634 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000635
636class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000637
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000638 def setUp(self):
639 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
640 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Neal Norwitze7789b12008-03-24 06:18:09 +0000641 test_support.unlink(test_support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000642
Guido van Rossumd0712812007-04-11 16:32:43 +0000643 def tearDown(self):
644 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000645
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000646 def testLineBuffering(self):
647 r = io.BytesIO()
648 b = io.BufferedWriter(r, 1000)
649 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
650 t.write("X")
651 self.assertEquals(r.getvalue(), b"") # No flush happened
652 t.write("Y\nZ")
653 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
654 t.write("A\rB")
655 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
656
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000657 def testEncodingErrorsReading(self):
658 # (1) default
659 b = io.BytesIO(b"abc\n\xff\n")
660 t = io.TextIOWrapper(b, encoding="ascii")
661 self.assertRaises(UnicodeError, t.read)
662 # (2) explicit strict
663 b = io.BytesIO(b"abc\n\xff\n")
664 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
665 self.assertRaises(UnicodeError, t.read)
666 # (3) ignore
667 b = io.BytesIO(b"abc\n\xff\n")
668 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
669 self.assertEquals(t.read(), "abc\n\n")
670 # (4) replace
671 b = io.BytesIO(b"abc\n\xff\n")
672 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
673 self.assertEquals(t.read(), "abc\n\ufffd\n")
674
675 def testEncodingErrorsWriting(self):
676 # (1) default
677 b = io.BytesIO()
678 t = io.TextIOWrapper(b, encoding="ascii")
679 self.assertRaises(UnicodeError, t.write, "\xff")
680 # (2) explicit strict
681 b = io.BytesIO()
682 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
683 self.assertRaises(UnicodeError, t.write, "\xff")
684 # (3) ignore
685 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000686 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
687 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000688 t.write("abc\xffdef\n")
689 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000690 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000691 # (4) replace
692 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000693 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
694 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000695 t.write("abc\xffdef\n")
696 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000697 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000698
Guido van Rossum8358db22007-08-18 21:39:55 +0000699 def testNewlinesInput(self):
700 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
701 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
702 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000703 (None, normalized.decode("ascii").splitlines(True)),
704 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000705 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
706 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
707 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
708 ]:
709 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000710 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000711 self.assertEquals(txt.readlines(), expected)
712 txt.seek(0)
713 self.assertEquals(txt.read(), "".join(expected))
714
715 def testNewlinesOutput(self):
716 testdict = {
717 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
718 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
719 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
720 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
721 }
722 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
723 for newline, expected in tests:
724 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000725 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000726 txt.write("AAA\nB")
727 txt.write("BB\nCCC\n")
728 txt.write("X\rY\r\nZ")
729 txt.flush()
730 self.assertEquals(buf.getvalue(), expected)
731
Guido van Rossum78892e42007-04-06 17:31:18 +0000732 def testNewlines(self):
733 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
734
735 tests = [
736 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000737 [ '', input_lines ],
738 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
739 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
740 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000741 ]
742
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000743 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000744
Guido van Rossum8358db22007-08-18 21:39:55 +0000745 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000746 # character in TextIOWrapper._pending_line.
747 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000748 # XXX: str.encode() should return bytes
749 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000750 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000751 for bufsize in range(1, 10):
752 for newline, exp_lines in tests:
753 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000754 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000755 encoding=encoding)
756 if do_reads:
757 got_lines = []
758 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000759 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000760 if c2 == '':
761 break
762 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000763 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000764 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000765 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000766
767 for got_line, exp_line in zip(got_lines, exp_lines):
768 self.assertEquals(got_line, exp_line)
769 self.assertEquals(len(got_lines), len(exp_lines))
770
Guido van Rossum8358db22007-08-18 21:39:55 +0000771 def testNewlinesInput(self):
772 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
773 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
774 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000775 (None, normalized.decode("ascii").splitlines(True)),
776 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000777 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
778 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
779 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
780 ]:
781 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000782 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000783 self.assertEquals(txt.readlines(), expected)
784 txt.seek(0)
785 self.assertEquals(txt.read(), "".join(expected))
786
787 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000788 data = "AAA\nBBB\rCCC\n"
789 data_lf = b"AAA\nBBB\rCCC\n"
790 data_cr = b"AAA\rBBB\rCCC\r"
791 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000792 save_linesep = os.linesep
793 try:
794 for os.linesep, newline, expected in [
795 ("\n", None, data_lf),
796 ("\r\n", None, data_crlf),
797 ("\n", "", data_lf),
798 ("\r\n", "", data_lf),
799 ("\n", "\n", data_lf),
800 ("\r\n", "\n", data_lf),
801 ("\n", "\r", data_cr),
802 ("\r\n", "\r", data_cr),
803 ("\n", "\r\n", data_crlf),
804 ("\r\n", "\r\n", data_crlf),
805 ]:
806 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000807 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000808 txt.write(data)
809 txt.close()
810 self.assertEquals(buf.getvalue(), expected)
811 finally:
812 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000813
Guido van Rossum9b76da62007-04-11 01:09:03 +0000814 # Systematic tests of the text I/O API
815
816 def testBasicIO(self):
817 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
818 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
819 f = io.open(test_support.TESTFN, "w+", encoding=enc)
820 f._CHUNK_SIZE = chunksize
821 self.assertEquals(f.write("abc"), 3)
822 f.close()
823 f = io.open(test_support.TESTFN, "r+", encoding=enc)
824 f._CHUNK_SIZE = chunksize
825 self.assertEquals(f.tell(), 0)
826 self.assertEquals(f.read(), "abc")
827 cookie = f.tell()
828 self.assertEquals(f.seek(0), 0)
829 self.assertEquals(f.read(2), "ab")
830 self.assertEquals(f.read(1), "c")
831 self.assertEquals(f.read(1), "")
832 self.assertEquals(f.read(), "")
833 self.assertEquals(f.tell(), cookie)
834 self.assertEquals(f.seek(0), 0)
835 self.assertEquals(f.seek(0, 2), cookie)
836 self.assertEquals(f.write("def"), 3)
837 self.assertEquals(f.seek(cookie), cookie)
838 self.assertEquals(f.read(), "def")
839 if enc.startswith("utf"):
840 self.multi_line_test(f, enc)
841 f.close()
842
843 def multi_line_test(self, f, enc):
844 f.seek(0)
845 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000846 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000847 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000848 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000849 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000850 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000851 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000852 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000853 wlines.append((f.tell(), line))
854 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000855 f.seek(0)
856 rlines = []
857 while True:
858 pos = f.tell()
859 line = f.readline()
860 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000861 break
862 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000863 self.assertEquals(rlines, wlines)
864
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000865 def testTelling(self):
866 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
867 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000868 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000869 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000870 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000871 p2 = f.tell()
872 f.seek(0)
873 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000874 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000875 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000876 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000877 self.assertEquals(f.tell(), p2)
878 f.seek(0)
879 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000880 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000881 self.assertRaises(IOError, f.tell)
882 self.assertEquals(f.tell(), p2)
883 f.close()
884
Guido van Rossumd76e7792007-04-17 02:38:04 +0000885 def testSeeking(self):
886 chunk_size = io.TextIOWrapper._CHUNK_SIZE
887 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000888 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000889 prefix = bytes(u_prefix.encode("utf-8"))
890 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000891 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000892 suffix = bytes(u_suffix.encode("utf-8"))
893 line = prefix + suffix
894 f = io.open(test_support.TESTFN, "wb")
895 f.write(line*2)
896 f.close()
897 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
898 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000899 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000900 self.assertEquals(f.tell(), prefix_size)
901 self.assertEquals(f.readline(), u_suffix)
902
903 def testSeekingToo(self):
904 # Regression test for a specific bug
905 data = b'\xe0\xbf\xbf\n'
906 f = io.open(test_support.TESTFN, "wb")
907 f.write(data)
908 f.close()
909 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
910 f._CHUNK_SIZE # Just test that it exists
911 f._CHUNK_SIZE = 2
912 f.readline()
913 f.tell()
914
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000915 def testSeekAndTell(self):
916 """Test seek/tell using the StatefulIncrementalDecoder."""
917
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000918 def testSeekAndTellWithData(data, min_pos=0):
919 """Tell/seek to various points within a data stream and ensure
920 that the decoded data returned by read() is consistent."""
921 f = io.open(test_support.TESTFN, 'wb')
922 f.write(data)
923 f.close()
924 f = io.open(test_support.TESTFN, encoding='test_decoder')
925 decoded = f.read()
926 f.close()
927
Neal Norwitze2b07052008-03-18 19:52:05 +0000928 for i in range(min_pos, len(decoded) + 1): # seek positions
929 for j in [1, 5, len(decoded) - i]: # read lengths
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000930 f = io.open(test_support.TESTFN, encoding='test_decoder')
931 self.assertEquals(f.read(i), decoded[:i])
932 cookie = f.tell()
933 self.assertEquals(f.read(j), decoded[i:i + j])
934 f.seek(cookie)
935 self.assertEquals(f.read(), decoded[i:])
936 f.close()
937
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000938 # Enable the test decoder.
939 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000940
941 # Run the tests.
942 try:
943 # Try each test case.
944 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
945 testSeekAndTellWithData(input)
946
947 # Position each test case so that it crosses a chunk boundary.
948 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
949 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
950 offset = CHUNK_SIZE - len(input)//2
951 prefix = b'.'*offset
952 # Don't bother seeking into the prefix (takes too long).
953 min_pos = offset*2
954 testSeekAndTellWithData(prefix + input, min_pos)
955
956 # Ensure our test decoder won't interfere with subsequent tests.
957 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000958 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000959
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +0000960 def testEncodedWrites(self):
961 data = "1234567890"
962 tests = ("utf-16",
963 "utf-16-le",
964 "utf-16-be",
965 "utf-32",
966 "utf-32-le",
967 "utf-32-be")
968 for encoding in tests:
969 buf = io.BytesIO()
970 f = io.TextIOWrapper(buf, encoding=encoding)
971 # Check if the BOM is written only once (see issue1753).
972 f.write(data)
973 f.write(data)
974 f.seek(0)
975 self.assertEquals(f.read(), data * 2)
976 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
977
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000978 def timingTest(self):
979 timer = time.time
980 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000981 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000982 nlines = 10000
983 nchars = len(line)
984 nbytes = len(line.encode(enc))
985 for chunk_size in (32, 64, 128, 256):
986 f = io.open(test_support.TESTFN, "w+", encoding=enc)
987 f._CHUNK_SIZE = chunk_size
988 t0 = timer()
989 for i in range(nlines):
990 f.write(line)
991 f.flush()
992 t1 = timer()
993 f.seek(0)
994 for line in f:
995 pass
996 t2 = timer()
997 f.seek(0)
998 while f.readline():
999 pass
1000 t3 = timer()
1001 f.seek(0)
1002 while f.readline():
1003 f.tell()
1004 t4 = timer()
1005 f.close()
1006 if test_support.verbose:
1007 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1008 (nlines, nchars, nbytes))
1009 print("File chunk size: %6s" % f._CHUNK_SIZE)
1010 print("Writing: %6.3f seconds" % (t1-t0))
1011 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1012 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1013 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1014
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001015 def testReadOneByOne(self):
1016 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1017 reads = ""
1018 while True:
1019 c = txt.read(1)
1020 if not c:
1021 break
1022 reads += c
1023 self.assertEquals(reads, "AA\nBB")
1024
1025 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1026 def testReadByChunk(self):
1027 # make sure "\r\n" straddles 128 char boundary.
1028 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1029 reads = ""
1030 while True:
1031 c = txt.read(128)
1032 if not c:
1033 break
1034 reads += c
1035 self.assertEquals(reads, "A"*127+"\nB")
1036
1037 def test_issue1395_1(self):
1038 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1039
1040 # read one char at a time
1041 reads = ""
1042 while True:
1043 c = txt.read(1)
1044 if not c:
1045 break
1046 reads += c
1047 self.assertEquals(reads, self.normalized)
1048
1049 def test_issue1395_2(self):
1050 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1051 txt._CHUNK_SIZE = 4
1052
1053 reads = ""
1054 while True:
1055 c = txt.read(4)
1056 if not c:
1057 break
1058 reads += c
1059 self.assertEquals(reads, self.normalized)
1060
1061 def test_issue1395_3(self):
1062 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1063 txt._CHUNK_SIZE = 4
1064
1065 reads = txt.read(4)
1066 reads += txt.read(4)
1067 reads += txt.readline()
1068 reads += txt.readline()
1069 reads += txt.readline()
1070 self.assertEquals(reads, self.normalized)
1071
1072 def test_issue1395_4(self):
1073 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1074 txt._CHUNK_SIZE = 4
1075
1076 reads = txt.read(4)
1077 reads += txt.read()
1078 self.assertEquals(reads, self.normalized)
1079
1080 def test_issue1395_5(self):
1081 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1082 txt._CHUNK_SIZE = 4
1083
1084 reads = txt.read(4)
1085 pos = txt.tell()
1086 txt.seek(0)
1087 txt.seek(pos)
1088 self.assertEquals(txt.read(4), "BBB\n")
1089
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001090 def test_issue2282(self):
1091 buffer = io.BytesIO(self.testdata)
1092 txt = io.TextIOWrapper(buffer, encoding="ascii")
1093
1094 self.assertEqual(buffer.seekable(), txt.seekable())
1095
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001096 def test_newline_decoder(self):
1097 import codecs
1098 decoder = codecs.getincrementaldecoder("utf-8")()
1099 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1100
1101 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1102
1103 self.assertEquals(decoder.decode(b'\xe8'), "")
1104 self.assertEquals(decoder.decode(b'\xa2'), "")
1105 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
1106
1107 self.assertEquals(decoder.decode(b'\xe8'), "")
1108 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1109
1110 decoder.setstate((b'', 0))
1111 self.assertEquals(decoder.decode(b'\n'), "\n")
1112 self.assertEquals(decoder.decode(b'\r'), "")
1113 self.assertEquals(decoder.decode(b'', final=True), "\n")
1114 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
1115
1116 self.assertEquals(decoder.decode(b'\r'), "")
1117 self.assertEquals(decoder.decode(b'a'), "\na")
1118
1119 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
1120 self.assertEquals(decoder.decode(b'\r'), "")
1121 self.assertEquals(decoder.decode(b'\r'), "\n")
1122 self.assertEquals(decoder.decode(b'\na'), "\na")
1123
1124 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
1125 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1126 self.assertEquals(decoder.decode(b'\n'), "\n")
1127 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
1128 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001129
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001130 decoder = codecs.getincrementaldecoder("utf-8")()
1131 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1132 self.assertEquals(decoder.newlines, None)
1133 decoder.decode(b"abc\n\r")
1134 self.assertEquals(decoder.newlines, '\n')
1135 decoder.decode(b"\nabc")
1136 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1137 decoder.decode(b"abc\r")
1138 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1139 decoder.decode(b"abc")
1140 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1141 decoder.decode(b"abc\r")
1142 decoder.reset()
1143 self.assertEquals(decoder.decode(b"abc"), "abc")
1144 self.assertEquals(decoder.newlines, None)
1145
Guido van Rossum01a27522007-03-07 01:00:12 +00001146# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001147
Guido van Rossum5abbf752007-08-27 17:39:33 +00001148class MiscIOTest(unittest.TestCase):
1149
1150 def testImport__all__(self):
1151 for name in io.__all__:
1152 obj = getattr(io, name, None)
1153 self.assert_(obj is not None, name)
1154 if name == "open":
1155 continue
1156 elif "error" in name.lower():
1157 self.assert_(issubclass(obj, Exception), name)
1158 else:
1159 self.assert_(issubclass(obj, io.IOBase))
1160
1161
Guido van Rossum28524c72007-02-27 05:47:44 +00001162def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +00001163 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1164 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +00001165 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +00001166 BufferedRandomTest, TextIOWrapperTest,
1167 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001168
1169if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001170 unittest.main()