blob: f0e793120580d6c54b34ca1bdb6b57e32d24d410 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000011import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000012import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000013
Guido van Rossuma9e20242007-03-08 00:43:48 +000014
Guido van Rossum53807da2007-04-10 19:01:47 +000015class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000017 def __init__(self, read_stack=()):
18 self._read_stack = list(read_stack)
19 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000020
21 def read(self, n=None):
22 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000023 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000024 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000025 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026
Guido van Rossum01a27522007-03-07 01:00:12 +000027 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000028 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000029 return len(b)
30
31 def writable(self):
32 return True
33
Guido van Rossum68bbcd22007-02-27 17:19:33 +000034 def fileno(self):
35 return 42
36
37 def readable(self):
38 return True
39
Guido van Rossum01a27522007-03-07 01:00:12 +000040 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000041 return True
42
Guido van Rossum01a27522007-03-07 01:00:12 +000043 def seek(self, pos, whence):
44 pass
45
46 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000047 return 42
48
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum78892e42007-04-06 17:31:18 +000050class MockFileIO(io.BytesIO):
51
52 def __init__(self, data):
53 self.read_history = []
54 io.BytesIO.__init__(self, data)
55
56 def read(self, n=None):
57 res = io.BytesIO.read(self, n)
58 self.read_history.append(None if res is None else len(res))
59 return res
60
61
Guido van Rossum01a27522007-03-07 01:00:12 +000062class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000063
Guido van Rossum53807da2007-04-10 19:01:47 +000064 def __init__(self, blocking_script):
65 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000066 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000069 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000070 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000071 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000072 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 else:
74 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000075
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def writable(self):
77 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078
Guido van Rossuma9e20242007-03-08 00:43:48 +000079
Guido van Rossum28524c72007-02-27 05:47:44 +000080class IOTest(unittest.TestCase):
81
Neal Norwitze7789b12008-03-24 06:18:09 +000082 def setUp(self):
83 test_support.unlink(test_support.TESTFN)
84
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000085 def tearDown(self):
86 test_support.unlink(test_support.TESTFN)
87
Guido van Rossum28524c72007-02-27 05:47:44 +000088 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.write(b"blah."), 5)
90 self.assertEqual(f.seek(0), 0)
91 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000092 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000093 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000094 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000095 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000096 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000097 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000098 self.assertEqual(f.seek(-1, 2), 13)
99 self.assertEqual(f.tell(), 13)
100 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000101 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000102 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000103
Guido van Rossum9b76da62007-04-11 01:09:03 +0000104 def read_ops(self, f, buffered=False):
105 data = f.read(5)
106 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000107 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000108 self.assertEqual(f.readinto(data), 5)
109 self.assertEqual(data, b" worl")
110 self.assertEqual(f.readinto(data), 2)
111 self.assertEqual(len(data), 5)
112 self.assertEqual(data[:2], b"d\n")
113 self.assertEqual(f.seek(0), 0)
114 self.assertEqual(f.read(20), b"hello world\n")
115 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000116 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000117 self.assertEqual(f.seek(-6, 2), 6)
118 self.assertEqual(f.read(5), b"world")
119 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000120 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000121 self.assertEqual(f.seek(-6, 1), 5)
122 self.assertEqual(f.read(5), b" worl")
123 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000124 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000125 if buffered:
126 f.seek(0)
127 self.assertEqual(f.read(), b"hello world\n")
128 f.seek(6)
129 self.assertEqual(f.read(), b"world\n")
130 self.assertEqual(f.read(), b"")
131
Guido van Rossum34d69e52007-04-10 20:08:41 +0000132 LARGE = 2**31
133
Guido van Rossum53807da2007-04-10 19:01:47 +0000134 def large_file_ops(self, f):
135 assert f.readable()
136 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000137 self.assertEqual(f.seek(self.LARGE), self.LARGE)
138 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000139 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000140 self.assertEqual(f.tell(), self.LARGE + 3)
141 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000142 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000143 self.assertEqual(f.tell(), self.LARGE + 2)
144 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000145 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000146 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000147 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
148 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000149 self.assertEqual(f.read(2), b"x")
150
Guido van Rossum28524c72007-02-27 05:47:44 +0000151 def test_raw_file_io(self):
152 f = io.open(test_support.TESTFN, "wb", buffering=0)
153 self.assertEqual(f.readable(), False)
154 self.assertEqual(f.writable(), True)
155 self.assertEqual(f.seekable(), True)
156 self.write_ops(f)
157 f.close()
158 f = io.open(test_support.TESTFN, "rb", buffering=0)
159 self.assertEqual(f.readable(), True)
160 self.assertEqual(f.writable(), False)
161 self.assertEqual(f.seekable(), True)
162 self.read_ops(f)
163 f.close()
164
Guido van Rossum87429772007-04-10 21:06:59 +0000165 def test_buffered_file_io(self):
166 f = io.open(test_support.TESTFN, "wb")
167 self.assertEqual(f.readable(), False)
168 self.assertEqual(f.writable(), True)
169 self.assertEqual(f.seekable(), True)
170 self.write_ops(f)
171 f.close()
172 f = io.open(test_support.TESTFN, "rb")
173 self.assertEqual(f.readable(), True)
174 self.assertEqual(f.writable(), False)
175 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000176 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000177 f.close()
178
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000179 def test_readline(self):
180 f = io.open(test_support.TESTFN, "wb")
181 f.write(b"abc\ndef\nxyzzy\nfoo")
182 f.close()
183 f = io.open(test_support.TESTFN, "rb")
184 self.assertEqual(f.readline(), b"abc\n")
185 self.assertEqual(f.readline(10), b"def\n")
186 self.assertEqual(f.readline(2), b"xy")
187 self.assertEqual(f.readline(4), b"zzy\n")
188 self.assertEqual(f.readline(), b"foo")
189 f.close()
190
Guido van Rossum28524c72007-02-27 05:47:44 +0000191 def test_raw_bytes_io(self):
192 f = io.BytesIO()
193 self.write_ops(f)
194 data = f.getvalue()
195 self.assertEqual(data, b"hello world\n")
196 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000197 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000198
Guido van Rossum53807da2007-04-10 19:01:47 +0000199 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000200 # On Windows and Mac OSX this test comsumes large resources; It takes
201 # a long time to build the >2GB file and takes >2GB of disk space
202 # therefore the resource must be enabled to run this test.
203 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
204 if not test_support.is_resource_enabled("largefile"):
205 print("\nTesting large file ops skipped on %s." % sys.platform,
206 file=sys.stderr)
207 print("It requires %d bytes and a long time." % self.LARGE,
208 file=sys.stderr)
209 print("Use 'regrtest.py -u largefile test_io' to run it.",
210 file=sys.stderr)
211 return
Guido van Rossum87429772007-04-10 21:06:59 +0000212 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000213 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000214 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000215 f = io.open(test_support.TESTFN, "w+b")
216 self.large_file_ops(f)
217 f.close()
218
219 def test_with_open(self):
220 for bufsize in (0, 1, 100):
221 f = None
222 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000223 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000224 self.assertEqual(f.closed, True)
225 f = None
226 try:
227 with open(test_support.TESTFN, "wb", bufsize) as f:
228 1/0
229 except ZeroDivisionError:
230 self.assertEqual(f.closed, True)
231 else:
232 self.fail("1/0 didn't raise an exception")
233
234 def test_destructor(self):
235 record = []
236 class MyFileIO(io.FileIO):
237 def __del__(self):
238 record.append(1)
239 io.FileIO.__del__(self)
240 def close(self):
241 record.append(2)
242 io.FileIO.close(self)
243 def flush(self):
244 record.append(3)
245 io.FileIO.flush(self)
246 f = MyFileIO(test_support.TESTFN, "w")
247 f.write("xxx")
248 del f
249 self.assertEqual(record, [1, 2, 3])
250
251 def test_close_flushes(self):
252 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000253 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000254 f.close()
255 f = io.open(test_support.TESTFN, "rb")
256 self.assertEqual(f.read(), b"xxx")
257 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000258
Guido van Rossumd4103952007-04-12 05:44:49 +0000259 def test_array_writes(self):
260 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000261 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000262 f = io.open(test_support.TESTFN, "wb", 0)
263 self.assertEqual(f.write(a), n)
264 f.close()
265 f = io.open(test_support.TESTFN, "wb")
266 self.assertEqual(f.write(a), n)
267 f.close()
268
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000269 def test_closefd(self):
270 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
271 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000272
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000273class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000275 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000278
279 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000280 buf = self.buftype("1234567890")
281 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000282
283 self.assertEquals(buf[:1], bytesIo.read(1))
284 self.assertEquals(buf[1:5], bytesIo.read(4))
285 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000286 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000287
288 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000289 buf = self.buftype("1234567890")
290 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000291
292 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000293 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000294
295 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000296 buf = self.buftype("1234567890")
297 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000298
299 bytesIo.read(5)
300 bytesIo.seek(0)
301 self.assertEquals(buf, bytesIo.read())
302
303 bytesIo.seek(3)
304 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000305 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000306
307 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000308 buf = self.buftype("1234567890")
309 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000310
311 self.assertEquals(0, bytesIo.tell())
312 bytesIo.seek(5)
313 self.assertEquals(5, bytesIo.tell())
314 bytesIo.seek(10000)
315 self.assertEquals(10000, bytesIo.tell())
316
Guido van Rossuma9e20242007-03-08 00:43:48 +0000317
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000318class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000319 @staticmethod
320 def buftype(s):
321 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000322 ioclass = io.BytesIO
323 EOF = b""
324
325
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000326class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000327 buftype = str
328 ioclass = io.StringIO
329 EOF = ""
330
331
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000332class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000333
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000334 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000335 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000336 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000337
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000338 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000339
Guido van Rossum78892e42007-04-06 17:31:18 +0000340 def testBuffering(self):
341 data = b"abcdefghi"
342 dlen = len(data)
343
344 tests = [
345 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
346 [ 100, [ 3, 3, 3], [ dlen ] ],
347 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
348 ]
349
350 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351 rawio = MockFileIO(data)
352 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000353 pos = 0
354 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000355 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000356 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000357 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000358
Guido van Rossum01a27522007-03-07 01:00:12 +0000359 def testReadNonBlocking(self):
360 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000361 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000362 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000363
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000364 self.assertEquals(b"abcd", bufio.read(6))
365 self.assertEquals(b"e", bufio.read(1))
366 self.assertEquals(b"fg", bufio.read())
367 self.assert_(None is bufio.read())
368 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000369
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000371 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000372 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000373
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000374 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000375
376 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000377 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000378 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000379
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000381
382 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000383 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000384 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000385
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000386 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387
388 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000389 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000390 # this test. Else, write it.
391 pass
392
Guido van Rossuma9e20242007-03-08 00:43:48 +0000393
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000394class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000395
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000396 def testWrite(self):
397 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000398 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000401 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000402
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
405 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000406 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000407 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000408
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 bufio.write(b"abc")
410 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000411
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000412 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000413
Guido van Rossum01a27522007-03-07 01:00:12 +0000414 def testWriteNonBlocking(self):
415 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000416 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000417
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000418 bufio.write(b"asdf")
419 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000420 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
421
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000422 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000424 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000425 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
426 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
427
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000428 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000429
Guido van Rossum78892e42007-04-06 17:31:18 +0000430 # XXX I don't like this test. It relies too heavily on how the
431 # algorithm actually works, which we might change. Refactor
432 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000433
434 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000435 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000436 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000437
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000438 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000439
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000440 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000441 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000442 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000443
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000444 bufio.write(b"abc")
445 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000446
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000447 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000448
Guido van Rossuma9e20242007-03-08 00:43:48 +0000449
Guido van Rossum01a27522007-03-07 01:00:12 +0000450class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000451
Guido van Rossum01a27522007-03-07 01:00:12 +0000452 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000453 r = MockRawIO(())
454 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000455 pair = io.BufferedRWPair(r, w)
456
457 # XXX need implementation
458
Guido van Rossuma9e20242007-03-08 00:43:48 +0000459
Guido van Rossum78892e42007-04-06 17:31:18 +0000460class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000461
Guido van Rossum01a27522007-03-07 01:00:12 +0000462 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000463 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000464 rw = io.BufferedRandom(raw, 8, 12)
465
466 self.assertEqual(b"as", rw.read(2))
467 rw.write(b"ddd")
468 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000469 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000470 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000471 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000472
473 def testSeekAndTell(self):
474 raw = io.BytesIO(b"asdfghjkl")
475 rw = io.BufferedRandom(raw)
476
477 self.assertEquals(b"as", rw.read(2))
478 self.assertEquals(2, rw.tell())
479 rw.seek(0, 0)
480 self.assertEquals(b"asdf", rw.read(4))
481
482 rw.write(b"asdf")
483 rw.seek(0, 0)
484 self.assertEquals(b"asdfasdfl", rw.read())
485 self.assertEquals(9, rw.tell())
486 rw.seek(-4, 2)
487 self.assertEquals(5, rw.tell())
488 rw.seek(2, 1)
489 self.assertEquals(7, rw.tell())
490 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000491 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000492
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000493# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
494# properties:
495# - A single output character can correspond to many bytes of input.
496# - The number of input bytes to complete the character can be
497# undetermined until the last input byte is received.
498# - The number of input bytes can vary depending on previous input.
499# - A single input byte can correspond to many characters of output.
500# - The number of output characters can be undetermined until the
501# last input byte is received.
502# - The number of output characters can vary depending on previous input.
503
504class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
505 """
506 For testing seek/tell behavior with a stateful, buffering decoder.
507
508 Input is a sequence of words. Words may be fixed-length (length set
509 by input) or variable-length (period-terminated). In variable-length
510 mode, extra periods are ignored. Possible words are:
511 - 'i' followed by a number sets the input length, I (maximum 99).
512 When I is set to 0, words are space-terminated.
513 - 'o' followed by a number sets the output length, O (maximum 99).
514 - Any other word is converted into a word followed by a period on
515 the output. The output word consists of the input word truncated
516 or padded out with hyphens to make its length equal to O. If O
517 is 0, the word is output verbatim without truncating or padding.
518 I and O are initially set to 1. When I changes, any buffered input is
519 re-scanned according to the new I. EOF also terminates the last word.
520 """
521
522 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +0000523 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000524 self.reset()
525
526 def __repr__(self):
527 return '<SID %x>' % id(self)
528
529 def reset(self):
530 self.i = 1
531 self.o = 1
532 self.buffer = bytearray()
533
534 def getstate(self):
535 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
536 return bytes(self.buffer), i*100 + o
537
538 def setstate(self, state):
539 buffer, io = state
540 self.buffer = bytearray(buffer)
541 i, o = divmod(io, 100)
542 self.i, self.o = i ^ 1, o ^ 1
543
544 def decode(self, input, final=False):
545 output = ''
546 for b in input:
547 if self.i == 0: # variable-length, terminated with period
548 if b == ord('.'):
549 if self.buffer:
550 output += self.process_word()
551 else:
552 self.buffer.append(b)
553 else: # fixed-length, terminate after self.i bytes
554 self.buffer.append(b)
555 if len(self.buffer) == self.i:
556 output += self.process_word()
557 if final and self.buffer: # EOF terminates the last word
558 output += self.process_word()
559 return output
560
561 def process_word(self):
562 output = ''
563 if self.buffer[0] == ord('i'):
564 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
565 elif self.buffer[0] == ord('o'):
566 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
567 else:
568 output = self.buffer.decode('ascii')
569 if len(output) < self.o:
570 output += '-'*self.o # pad out with hyphens
571 if self.o:
572 output = output[:self.o] # truncate to output length
573 output += '.'
574 self.buffer = bytearray()
575 return output
576
577class StatefulIncrementalDecoderTest(unittest.TestCase):
578 """
579 Make sure the StatefulIncrementalDecoder actually works.
580 """
581
582 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000583 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000584 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000585 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000586 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000587 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000588 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000589 # I=0, O=6 (variable-length input, fixed-length output)
590 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
591 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000592 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000593 # I=6, O=3 (fixed-length input > fixed-length output)
594 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
595 # I=0, then 3; O=29, then 15 (with longer output)
596 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
597 'a----------------------------.' +
598 'b----------------------------.' +
599 'cde--------------------------.' +
600 'abcdefghijabcde.' +
601 'a.b------------.' +
602 '.c.------------.' +
603 'd.e------------.' +
604 'k--------------.' +
605 'l--------------.' +
606 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000607 ]
608
609 def testDecoder(self):
610 # Try a few one-shot test cases.
611 for input, eof, output in self.test_cases:
612 d = StatefulIncrementalDecoder()
613 self.assertEquals(d.decode(input, eof), output)
614
615 # Also test an unfinished decode, followed by forcing EOF.
616 d = StatefulIncrementalDecoder()
617 self.assertEquals(d.decode(b'oiabcd'), '')
618 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000619
620class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000621
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000622 def setUp(self):
623 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
624 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Neal Norwitze7789b12008-03-24 06:18:09 +0000625 test_support.unlink(test_support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000626
Guido van Rossumd0712812007-04-11 16:32:43 +0000627 def tearDown(self):
628 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000629
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000630 def testLineBuffering(self):
631 r = io.BytesIO()
632 b = io.BufferedWriter(r, 1000)
633 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
634 t.write("X")
635 self.assertEquals(r.getvalue(), b"") # No flush happened
636 t.write("Y\nZ")
637 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
638 t.write("A\rB")
639 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
640
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000641 def testEncodingErrorsReading(self):
642 # (1) default
643 b = io.BytesIO(b"abc\n\xff\n")
644 t = io.TextIOWrapper(b, encoding="ascii")
645 self.assertRaises(UnicodeError, t.read)
646 # (2) explicit strict
647 b = io.BytesIO(b"abc\n\xff\n")
648 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
649 self.assertRaises(UnicodeError, t.read)
650 # (3) ignore
651 b = io.BytesIO(b"abc\n\xff\n")
652 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
653 self.assertEquals(t.read(), "abc\n\n")
654 # (4) replace
655 b = io.BytesIO(b"abc\n\xff\n")
656 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
657 self.assertEquals(t.read(), "abc\n\ufffd\n")
658
659 def testEncodingErrorsWriting(self):
660 # (1) default
661 b = io.BytesIO()
662 t = io.TextIOWrapper(b, encoding="ascii")
663 self.assertRaises(UnicodeError, t.write, "\xff")
664 # (2) explicit strict
665 b = io.BytesIO()
666 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
667 self.assertRaises(UnicodeError, t.write, "\xff")
668 # (3) ignore
669 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000670 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
671 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000672 t.write("abc\xffdef\n")
673 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000674 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000675 # (4) replace
676 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000677 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
678 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000679 t.write("abc\xffdef\n")
680 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000681 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000682
Guido van Rossum8358db22007-08-18 21:39:55 +0000683 def testNewlinesInput(self):
684 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
685 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
686 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000687 (None, normalized.decode("ascii").splitlines(True)),
688 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000689 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
690 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
691 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
692 ]:
693 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000694 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000695 self.assertEquals(txt.readlines(), expected)
696 txt.seek(0)
697 self.assertEquals(txt.read(), "".join(expected))
698
699 def testNewlinesOutput(self):
700 testdict = {
701 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
702 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
703 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
704 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
705 }
706 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
707 for newline, expected in tests:
708 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000709 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000710 txt.write("AAA\nB")
711 txt.write("BB\nCCC\n")
712 txt.write("X\rY\r\nZ")
713 txt.flush()
714 self.assertEquals(buf.getvalue(), expected)
715
Guido van Rossum78892e42007-04-06 17:31:18 +0000716 def testNewlines(self):
717 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
718
719 tests = [
720 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000721 [ '', input_lines ],
722 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
723 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
724 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000725 ]
726
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000727 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000728
Guido van Rossum8358db22007-08-18 21:39:55 +0000729 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000730 # character in TextIOWrapper._pending_line.
731 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000732 # XXX: str.encode() should return bytes
733 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000734 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000735 for bufsize in range(1, 10):
736 for newline, exp_lines in tests:
737 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000738 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000739 encoding=encoding)
740 if do_reads:
741 got_lines = []
742 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000743 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000744 if c2 == '':
745 break
746 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000747 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000748 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000749 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000750
751 for got_line, exp_line in zip(got_lines, exp_lines):
752 self.assertEquals(got_line, exp_line)
753 self.assertEquals(len(got_lines), len(exp_lines))
754
Guido van Rossum8358db22007-08-18 21:39:55 +0000755 def testNewlinesInput(self):
756 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
757 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
758 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000759 (None, normalized.decode("ascii").splitlines(True)),
760 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000761 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
762 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
763 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
764 ]:
765 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000766 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000767 self.assertEquals(txt.readlines(), expected)
768 txt.seek(0)
769 self.assertEquals(txt.read(), "".join(expected))
770
771 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000772 data = "AAA\nBBB\rCCC\n"
773 data_lf = b"AAA\nBBB\rCCC\n"
774 data_cr = b"AAA\rBBB\rCCC\r"
775 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000776 save_linesep = os.linesep
777 try:
778 for os.linesep, newline, expected in [
779 ("\n", None, data_lf),
780 ("\r\n", None, data_crlf),
781 ("\n", "", data_lf),
782 ("\r\n", "", data_lf),
783 ("\n", "\n", data_lf),
784 ("\r\n", "\n", data_lf),
785 ("\n", "\r", data_cr),
786 ("\r\n", "\r", data_cr),
787 ("\n", "\r\n", data_crlf),
788 ("\r\n", "\r\n", data_crlf),
789 ]:
790 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000791 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000792 txt.write(data)
793 txt.close()
794 self.assertEquals(buf.getvalue(), expected)
795 finally:
796 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000797
Guido van Rossum9b76da62007-04-11 01:09:03 +0000798 # Systematic tests of the text I/O API
799
800 def testBasicIO(self):
801 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
802 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
803 f = io.open(test_support.TESTFN, "w+", encoding=enc)
804 f._CHUNK_SIZE = chunksize
805 self.assertEquals(f.write("abc"), 3)
806 f.close()
807 f = io.open(test_support.TESTFN, "r+", encoding=enc)
808 f._CHUNK_SIZE = chunksize
809 self.assertEquals(f.tell(), 0)
810 self.assertEquals(f.read(), "abc")
811 cookie = f.tell()
812 self.assertEquals(f.seek(0), 0)
813 self.assertEquals(f.read(2), "ab")
814 self.assertEquals(f.read(1), "c")
815 self.assertEquals(f.read(1), "")
816 self.assertEquals(f.read(), "")
817 self.assertEquals(f.tell(), cookie)
818 self.assertEquals(f.seek(0), 0)
819 self.assertEquals(f.seek(0, 2), cookie)
820 self.assertEquals(f.write("def"), 3)
821 self.assertEquals(f.seek(cookie), cookie)
822 self.assertEquals(f.read(), "def")
823 if enc.startswith("utf"):
824 self.multi_line_test(f, enc)
825 f.close()
826
827 def multi_line_test(self, f, enc):
828 f.seek(0)
829 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000830 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000831 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000832 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000833 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000834 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000835 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000836 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000837 wlines.append((f.tell(), line))
838 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000839 f.seek(0)
840 rlines = []
841 while True:
842 pos = f.tell()
843 line = f.readline()
844 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000845 break
846 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000847 self.assertEquals(rlines, wlines)
848
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000849 def testTelling(self):
850 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
851 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000852 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000853 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000854 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000855 p2 = f.tell()
856 f.seek(0)
857 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000858 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000859 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000860 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000861 self.assertEquals(f.tell(), p2)
862 f.seek(0)
863 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000864 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000865 self.assertRaises(IOError, f.tell)
866 self.assertEquals(f.tell(), p2)
867 f.close()
868
Guido van Rossumd76e7792007-04-17 02:38:04 +0000869 def testSeeking(self):
870 chunk_size = io.TextIOWrapper._CHUNK_SIZE
871 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000872 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000873 prefix = bytes(u_prefix.encode("utf-8"))
874 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000875 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000876 suffix = bytes(u_suffix.encode("utf-8"))
877 line = prefix + suffix
878 f = io.open(test_support.TESTFN, "wb")
879 f.write(line*2)
880 f.close()
881 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
882 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000883 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000884 self.assertEquals(f.tell(), prefix_size)
885 self.assertEquals(f.readline(), u_suffix)
886
887 def testSeekingToo(self):
888 # Regression test for a specific bug
889 data = b'\xe0\xbf\xbf\n'
890 f = io.open(test_support.TESTFN, "wb")
891 f.write(data)
892 f.close()
893 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
894 f._CHUNK_SIZE # Just test that it exists
895 f._CHUNK_SIZE = 2
896 f.readline()
897 f.tell()
898
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000899 def testSeekAndTell(self):
900 """Test seek/tell using the StatefulIncrementalDecoder."""
901
902 def lookupTestDecoder(name):
903 if self.codecEnabled and name == 'test_decoder':
904 return codecs.CodecInfo(
905 name='test_decoder', encode=None, decode=None,
906 incrementalencoder=None,
Neal Norwitze2b07052008-03-18 19:52:05 +0000907 streamreader=None, streamwriter=None,
908 incrementaldecoder=StatefulIncrementalDecoder)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000909
910 def testSeekAndTellWithData(data, min_pos=0):
911 """Tell/seek to various points within a data stream and ensure
912 that the decoded data returned by read() is consistent."""
913 f = io.open(test_support.TESTFN, 'wb')
914 f.write(data)
915 f.close()
916 f = io.open(test_support.TESTFN, encoding='test_decoder')
917 decoded = f.read()
918 f.close()
919
Neal Norwitze2b07052008-03-18 19:52:05 +0000920 for i in range(min_pos, len(decoded) + 1): # seek positions
921 for j in [1, 5, len(decoded) - i]: # read lengths
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000922 f = io.open(test_support.TESTFN, encoding='test_decoder')
923 self.assertEquals(f.read(i), decoded[:i])
924 cookie = f.tell()
925 self.assertEquals(f.read(j), decoded[i:i + j])
926 f.seek(cookie)
927 self.assertEquals(f.read(), decoded[i:])
928 f.close()
929
930 # Register a special incremental decoder for testing.
931 codecs.register(lookupTestDecoder)
932 self.codecEnabled = 1
933
934 # Run the tests.
935 try:
936 # Try each test case.
937 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
938 testSeekAndTellWithData(input)
939
940 # Position each test case so that it crosses a chunk boundary.
941 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
942 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
943 offset = CHUNK_SIZE - len(input)//2
944 prefix = b'.'*offset
945 # Don't bother seeking into the prefix (takes too long).
946 min_pos = offset*2
947 testSeekAndTellWithData(prefix + input, min_pos)
948
949 # Ensure our test decoder won't interfere with subsequent tests.
950 finally:
951 self.codecEnabled = 0
952
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +0000953 def testEncodedWrites(self):
954 data = "1234567890"
955 tests = ("utf-16",
956 "utf-16-le",
957 "utf-16-be",
958 "utf-32",
959 "utf-32-le",
960 "utf-32-be")
961 for encoding in tests:
962 buf = io.BytesIO()
963 f = io.TextIOWrapper(buf, encoding=encoding)
964 # Check if the BOM is written only once (see issue1753).
965 f.write(data)
966 f.write(data)
967 f.seek(0)
968 self.assertEquals(f.read(), data * 2)
969 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
970
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000971 def timingTest(self):
972 timer = time.time
973 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000974 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000975 nlines = 10000
976 nchars = len(line)
977 nbytes = len(line.encode(enc))
978 for chunk_size in (32, 64, 128, 256):
979 f = io.open(test_support.TESTFN, "w+", encoding=enc)
980 f._CHUNK_SIZE = chunk_size
981 t0 = timer()
982 for i in range(nlines):
983 f.write(line)
984 f.flush()
985 t1 = timer()
986 f.seek(0)
987 for line in f:
988 pass
989 t2 = timer()
990 f.seek(0)
991 while f.readline():
992 pass
993 t3 = timer()
994 f.seek(0)
995 while f.readline():
996 f.tell()
997 t4 = timer()
998 f.close()
999 if test_support.verbose:
1000 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1001 (nlines, nchars, nbytes))
1002 print("File chunk size: %6s" % f._CHUNK_SIZE)
1003 print("Writing: %6.3f seconds" % (t1-t0))
1004 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1005 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1006 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1007
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001008 def testReadOneByOne(self):
1009 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1010 reads = ""
1011 while True:
1012 c = txt.read(1)
1013 if not c:
1014 break
1015 reads += c
1016 self.assertEquals(reads, "AA\nBB")
1017
1018 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1019 def testReadByChunk(self):
1020 # make sure "\r\n" straddles 128 char boundary.
1021 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1022 reads = ""
1023 while True:
1024 c = txt.read(128)
1025 if not c:
1026 break
1027 reads += c
1028 self.assertEquals(reads, "A"*127+"\nB")
1029
1030 def test_issue1395_1(self):
1031 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1032
1033 # read one char at a time
1034 reads = ""
1035 while True:
1036 c = txt.read(1)
1037 if not c:
1038 break
1039 reads += c
1040 self.assertEquals(reads, self.normalized)
1041
1042 def test_issue1395_2(self):
1043 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1044 txt._CHUNK_SIZE = 4
1045
1046 reads = ""
1047 while True:
1048 c = txt.read(4)
1049 if not c:
1050 break
1051 reads += c
1052 self.assertEquals(reads, self.normalized)
1053
1054 def test_issue1395_3(self):
1055 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1056 txt._CHUNK_SIZE = 4
1057
1058 reads = txt.read(4)
1059 reads += txt.read(4)
1060 reads += txt.readline()
1061 reads += txt.readline()
1062 reads += txt.readline()
1063 self.assertEquals(reads, self.normalized)
1064
1065 def test_issue1395_4(self):
1066 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1067 txt._CHUNK_SIZE = 4
1068
1069 reads = txt.read(4)
1070 reads += txt.read()
1071 self.assertEquals(reads, self.normalized)
1072
1073 def test_issue1395_5(self):
1074 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1075 txt._CHUNK_SIZE = 4
1076
1077 reads = txt.read(4)
1078 pos = txt.tell()
1079 txt.seek(0)
1080 txt.seek(pos)
1081 self.assertEquals(txt.read(4), "BBB\n")
1082
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001083 def test_issue2282(self):
1084 buffer = io.BytesIO(self.testdata)
1085 txt = io.TextIOWrapper(buffer, encoding="ascii")
1086
1087 self.assertEqual(buffer.seekable(), txt.seekable())
1088
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001089 def test_newline_decoder(self):
1090 import codecs
1091 decoder = codecs.getincrementaldecoder("utf-8")()
1092 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1093
1094 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1095
1096 self.assertEquals(decoder.decode(b'\xe8'), "")
1097 self.assertEquals(decoder.decode(b'\xa2'), "")
1098 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
1099
1100 self.assertEquals(decoder.decode(b'\xe8'), "")
1101 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1102
1103 decoder.setstate((b'', 0))
1104 self.assertEquals(decoder.decode(b'\n'), "\n")
1105 self.assertEquals(decoder.decode(b'\r'), "")
1106 self.assertEquals(decoder.decode(b'', final=True), "\n")
1107 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
1108
1109 self.assertEquals(decoder.decode(b'\r'), "")
1110 self.assertEquals(decoder.decode(b'a'), "\na")
1111
1112 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
1113 self.assertEquals(decoder.decode(b'\r'), "")
1114 self.assertEquals(decoder.decode(b'\r'), "\n")
1115 self.assertEquals(decoder.decode(b'\na'), "\na")
1116
1117 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
1118 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1119 self.assertEquals(decoder.decode(b'\n'), "\n")
1120 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
1121 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001122
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001123 decoder = codecs.getincrementaldecoder("utf-8")()
1124 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1125 self.assertEquals(decoder.newlines, None)
1126 decoder.decode(b"abc\n\r")
1127 self.assertEquals(decoder.newlines, '\n')
1128 decoder.decode(b"\nabc")
1129 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1130 decoder.decode(b"abc\r")
1131 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1132 decoder.decode(b"abc")
1133 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1134 decoder.decode(b"abc\r")
1135 decoder.reset()
1136 self.assertEquals(decoder.decode(b"abc"), "abc")
1137 self.assertEquals(decoder.newlines, None)
1138
Guido van Rossum01a27522007-03-07 01:00:12 +00001139# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001140
Guido van Rossum5abbf752007-08-27 17:39:33 +00001141class MiscIOTest(unittest.TestCase):
1142
1143 def testImport__all__(self):
1144 for name in io.__all__:
1145 obj = getattr(io, name, None)
1146 self.assert_(obj is not None, name)
1147 if name == "open":
1148 continue
1149 elif "error" in name.lower():
1150 self.assert_(issubclass(obj, Exception), name)
1151 else:
1152 self.assert_(issubclass(obj, io.IOBase))
1153
1154
Guido van Rossum28524c72007-02-27 05:47:44 +00001155def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +00001156 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1157 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +00001158 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +00001159 BufferedRandomTest, TextIOWrapperTest,
1160 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001161
1162if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001163 unittest.main()