blob: c5556231e82cea4fc3e5b1b38f48404622ce8174 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum34d69e52007-04-10 20:08:41 +00003import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00004import time
Guido van Rossumd4103952007-04-12 05:44:49 +00005import array
Guido van Rossum28524c72007-02-27 05:47:44 +00006import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00007from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00008from test import test_support
9
Guido van Rossum53807da2007-04-10 19:01:47 +000010import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000011
Guido van Rossuma9e20242007-03-08 00:43:48 +000012
Guido van Rossum53807da2007-04-10 19:01:47 +000013class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000014
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000015 def __init__(self, read_stack=()):
16 self._read_stack = list(read_stack)
17 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000018
19 def read(self, n=None):
20 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000021 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000022 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000023 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000024
Guido van Rossum01a27522007-03-07 01:00:12 +000025 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000026 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000027 return len(b)
28
29 def writable(self):
30 return True
31
Guido van Rossum68bbcd22007-02-27 17:19:33 +000032 def fileno(self):
33 return 42
34
35 def readable(self):
36 return True
37
Guido van Rossum01a27522007-03-07 01:00:12 +000038 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000039 return True
40
Guido van Rossum01a27522007-03-07 01:00:12 +000041 def seek(self, pos, whence):
42 pass
43
44 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000045 return 42
46
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Guido van Rossum78892e42007-04-06 17:31:18 +000048class MockFileIO(io.BytesIO):
49
50 def __init__(self, data):
51 self.read_history = []
52 io.BytesIO.__init__(self, data)
53
54 def read(self, n=None):
55 res = io.BytesIO.read(self, n)
56 self.read_history.append(None if res is None else len(res))
57 return res
58
59
Guido van Rossum01a27522007-03-07 01:00:12 +000060class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum53807da2007-04-10 19:01:47 +000062 def __init__(self, blocking_script):
63 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000064 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000067 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000068 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000069 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000070 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000071 else:
72 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum01a27522007-03-07 01:00:12 +000074 def writable(self):
75 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076
Guido van Rossuma9e20242007-03-08 00:43:48 +000077
Guido van Rossum28524c72007-02-27 05:47:44 +000078class IOTest(unittest.TestCase):
79
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000080 def tearDown(self):
81 test_support.unlink(test_support.TESTFN)
82
Guido van Rossum28524c72007-02-27 05:47:44 +000083 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000084 self.assertEqual(f.write(b"blah."), 5)
85 self.assertEqual(f.seek(0), 0)
86 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000087 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000088 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000089 self.assertEqual(f.tell(), 5)
Guido van Rossum2b08b382007-05-08 20:18:39 +000090 self.assertEqual(f.write(str8(" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000091 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000092 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000093 self.assertEqual(f.seek(-1, 2), 13)
94 self.assertEqual(f.tell(), 13)
95 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000096 self.assertEqual(f.tell(), 13)
Guido van Rossum28524c72007-02-27 05:47:44 +000097
Guido van Rossum9b76da62007-04-11 01:09:03 +000098 def read_ops(self, f, buffered=False):
99 data = f.read(5)
100 self.assertEqual(data, b"hello")
101 self.assertEqual(f.readinto(data), 5)
102 self.assertEqual(data, b" worl")
103 self.assertEqual(f.readinto(data), 2)
104 self.assertEqual(len(data), 5)
105 self.assertEqual(data[:2], b"d\n")
106 self.assertEqual(f.seek(0), 0)
107 self.assertEqual(f.read(20), b"hello world\n")
108 self.assertEqual(f.read(1), b"")
109 self.assertEqual(f.readinto(b"x"), 0)
110 self.assertEqual(f.seek(-6, 2), 6)
111 self.assertEqual(f.read(5), b"world")
112 self.assertEqual(f.read(0), b"")
113 self.assertEqual(f.readinto(b""), 0)
114 self.assertEqual(f.seek(-6, 1), 5)
115 self.assertEqual(f.read(5), b" worl")
116 self.assertEqual(f.tell(), 10)
117 if buffered:
118 f.seek(0)
119 self.assertEqual(f.read(), b"hello world\n")
120 f.seek(6)
121 self.assertEqual(f.read(), b"world\n")
122 self.assertEqual(f.read(), b"")
123
Guido van Rossum34d69e52007-04-10 20:08:41 +0000124 LARGE = 2**31
125
Guido van Rossum53807da2007-04-10 19:01:47 +0000126 def large_file_ops(self, f):
127 assert f.readable()
128 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000129 self.assertEqual(f.seek(self.LARGE), self.LARGE)
130 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000131 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000132 self.assertEqual(f.tell(), self.LARGE + 3)
133 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000134 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000135 self.assertEqual(f.tell(), self.LARGE + 2)
136 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000137 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000138 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
140 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000141 self.assertEqual(f.read(2), b"x")
142
Guido van Rossum28524c72007-02-27 05:47:44 +0000143 def test_raw_file_io(self):
144 f = io.open(test_support.TESTFN, "wb", buffering=0)
145 self.assertEqual(f.readable(), False)
146 self.assertEqual(f.writable(), True)
147 self.assertEqual(f.seekable(), True)
148 self.write_ops(f)
149 f.close()
150 f = io.open(test_support.TESTFN, "rb", buffering=0)
151 self.assertEqual(f.readable(), True)
152 self.assertEqual(f.writable(), False)
153 self.assertEqual(f.seekable(), True)
154 self.read_ops(f)
155 f.close()
156
Guido van Rossum87429772007-04-10 21:06:59 +0000157 def test_buffered_file_io(self):
158 f = io.open(test_support.TESTFN, "wb")
159 self.assertEqual(f.readable(), False)
160 self.assertEqual(f.writable(), True)
161 self.assertEqual(f.seekable(), True)
162 self.write_ops(f)
163 f.close()
164 f = io.open(test_support.TESTFN, "rb")
165 self.assertEqual(f.readable(), True)
166 self.assertEqual(f.writable(), False)
167 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000168 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000169 f.close()
170
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000171 def test_readline(self):
172 f = io.open(test_support.TESTFN, "wb")
173 f.write(b"abc\ndef\nxyzzy\nfoo")
174 f.close()
175 f = io.open(test_support.TESTFN, "rb")
176 self.assertEqual(f.readline(), b"abc\n")
177 self.assertEqual(f.readline(10), b"def\n")
178 self.assertEqual(f.readline(2), b"xy")
179 self.assertEqual(f.readline(4), b"zzy\n")
180 self.assertEqual(f.readline(), b"foo")
181 f.close()
182
Guido van Rossum28524c72007-02-27 05:47:44 +0000183 def test_raw_bytes_io(self):
184 f = io.BytesIO()
185 self.write_ops(f)
186 data = f.getvalue()
187 self.assertEqual(data, b"hello world\n")
188 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000189 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000190
Guido van Rossum53807da2007-04-10 19:01:47 +0000191 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000192 # On Windows and Mac OSX this test comsumes large resources; It takes
193 # a long time to build the >2GB file and takes >2GB of disk space
194 # therefore the resource must be enabled to run this test.
195 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
196 if not test_support.is_resource_enabled("largefile"):
197 print("\nTesting large file ops skipped on %s." % sys.platform,
198 file=sys.stderr)
199 print("It requires %d bytes and a long time." % self.LARGE,
200 file=sys.stderr)
201 print("Use 'regrtest.py -u largefile test_io' to run it.",
202 file=sys.stderr)
203 return
Guido van Rossum87429772007-04-10 21:06:59 +0000204 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000205 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000206 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000207 f = io.open(test_support.TESTFN, "w+b")
208 self.large_file_ops(f)
209 f.close()
210
211 def test_with_open(self):
212 for bufsize in (0, 1, 100):
213 f = None
214 with open(test_support.TESTFN, "wb", bufsize) as f:
215 f.write("xxx")
216 self.assertEqual(f.closed, True)
217 f = None
218 try:
219 with open(test_support.TESTFN, "wb", bufsize) as f:
220 1/0
221 except ZeroDivisionError:
222 self.assertEqual(f.closed, True)
223 else:
224 self.fail("1/0 didn't raise an exception")
225
226 def test_destructor(self):
227 record = []
228 class MyFileIO(io.FileIO):
229 def __del__(self):
230 record.append(1)
231 io.FileIO.__del__(self)
232 def close(self):
233 record.append(2)
234 io.FileIO.close(self)
235 def flush(self):
236 record.append(3)
237 io.FileIO.flush(self)
238 f = MyFileIO(test_support.TESTFN, "w")
239 f.write("xxx")
240 del f
241 self.assertEqual(record, [1, 2, 3])
242
243 def test_close_flushes(self):
244 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000245 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000246 f.close()
247 f = io.open(test_support.TESTFN, "rb")
248 self.assertEqual(f.read(), b"xxx")
249 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000250
Guido van Rossumd4103952007-04-12 05:44:49 +0000251 def test_array_writes(self):
252 a = array.array('i', range(10))
253 n = len(buffer(a))
254 f = io.open(test_support.TESTFN, "wb", 0)
255 self.assertEqual(f.write(a), n)
256 f.close()
257 f = io.open(test_support.TESTFN, "wb")
258 self.assertEqual(f.write(a), n)
259 f.close()
260
Guido van Rossuma9e20242007-03-08 00:43:48 +0000261
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000262class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000263
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000264 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000265 buf = self.buftype("1234567890")
266 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000267
268 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000269 buf = self.buftype("1234567890")
270 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000271
272 self.assertEquals(buf[:1], bytesIo.read(1))
273 self.assertEquals(buf[1:5], bytesIo.read(4))
274 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000275 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000276
277 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000278 buf = self.buftype("1234567890")
279 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000280
281 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000282 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000283
284 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000285 buf = self.buftype("1234567890")
286 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000287
288 bytesIo.read(5)
289 bytesIo.seek(0)
290 self.assertEquals(buf, bytesIo.read())
291
292 bytesIo.seek(3)
293 self.assertEquals(buf[3:], bytesIo.read())
294
295 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000296 buf = self.buftype("1234567890")
297 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000298
299 self.assertEquals(0, bytesIo.tell())
300 bytesIo.seek(5)
301 self.assertEquals(5, bytesIo.tell())
302 bytesIo.seek(10000)
303 self.assertEquals(10000, bytesIo.tell())
304
Guido van Rossuma9e20242007-03-08 00:43:48 +0000305
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000306class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000307 buftype = bytes
308 ioclass = io.BytesIO
309 EOF = b""
310
311
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000312class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000313 buftype = str
314 ioclass = io.StringIO
315 EOF = ""
316
317
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000318class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000319
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000320 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000321 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000322 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000323
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000324 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000325
Guido van Rossum78892e42007-04-06 17:31:18 +0000326 def testBuffering(self):
327 data = b"abcdefghi"
328 dlen = len(data)
329
330 tests = [
331 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
332 [ 100, [ 3, 3, 3], [ dlen ] ],
333 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
334 ]
335
336 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000337 rawio = MockFileIO(data)
338 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000339 pos = 0
340 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000341 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000342 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000343 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000344
Guido van Rossum01a27522007-03-07 01:00:12 +0000345 def testReadNonBlocking(self):
346 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000347 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000348 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000349
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000350 self.assertEquals(b"abcd", bufio.read(6))
351 self.assertEquals(b"e", bufio.read(1))
352 self.assertEquals(b"fg", bufio.read())
353 self.assert_(None is bufio.read())
354 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000355
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000356 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000357 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000358 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000359
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000360 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000361
362 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000363 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000364 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000365
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000366 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000367
368 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000369 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000370 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000371
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000372 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000373
374 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000375 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000376 # this test. Else, write it.
377 pass
378
Guido van Rossuma9e20242007-03-08 00:43:48 +0000379
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000380class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000381
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000382 def testWrite(self):
383 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000384 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000385 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000386
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000387 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000388
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000389 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000390
391 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000392 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000393 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000394
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000395 bufio.write(b"abc")
396 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000397
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000398 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000399
Guido van Rossum01a27522007-03-07 01:00:12 +0000400 def testWriteNonBlocking(self):
401 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000402 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000403
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000404 bufio.write(b"asdf")
405 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000406 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
407
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000408 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000409 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000410 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000411 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
412 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
413
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000414 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000415
Guido van Rossum78892e42007-04-06 17:31:18 +0000416 # XXX I don't like this test. It relies too heavily on how the
417 # algorithm actually works, which we might change. Refactor
418 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000419
420 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000421 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000422 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000423
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000424 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000425
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000426 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000427 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000428 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000429
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000430 bufio.write(b"abc")
431 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000432
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000433 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000434
Guido van Rossuma9e20242007-03-08 00:43:48 +0000435
Guido van Rossum01a27522007-03-07 01:00:12 +0000436class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000437
Guido van Rossum01a27522007-03-07 01:00:12 +0000438 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000439 r = MockRawIO(())
440 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000441 pair = io.BufferedRWPair(r, w)
442
443 # XXX need implementation
444
Guido van Rossuma9e20242007-03-08 00:43:48 +0000445
Guido van Rossum78892e42007-04-06 17:31:18 +0000446class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000447
Guido van Rossum01a27522007-03-07 01:00:12 +0000448 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000449 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000450 rw = io.BufferedRandom(raw, 8, 12)
451
452 self.assertEqual(b"as", rw.read(2))
453 rw.write(b"ddd")
454 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000455 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000456 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000457 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000458
459 def testSeekAndTell(self):
460 raw = io.BytesIO(b"asdfghjkl")
461 rw = io.BufferedRandom(raw)
462
463 self.assertEquals(b"as", rw.read(2))
464 self.assertEquals(2, rw.tell())
465 rw.seek(0, 0)
466 self.assertEquals(b"asdf", rw.read(4))
467
468 rw.write(b"asdf")
469 rw.seek(0, 0)
470 self.assertEquals(b"asdfasdfl", rw.read())
471 self.assertEquals(9, rw.tell())
472 rw.seek(-4, 2)
473 self.assertEquals(5, rw.tell())
474 rw.seek(2, 1)
475 self.assertEquals(7, rw.tell())
476 self.assertEquals(b"fl", rw.read(11))
477
Guido van Rossum78892e42007-04-06 17:31:18 +0000478
479class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000480
Guido van Rossumd0712812007-04-11 16:32:43 +0000481 def tearDown(self):
482 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000483
Guido van Rossum78892e42007-04-06 17:31:18 +0000484 def testNewlines(self):
485 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
486
487 tests = [
488 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
489 [ '\n', input_lines ],
490 [ '\r\n', input_lines ],
491 ]
492
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000493 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000494
495 # Try a range of pad sizes to test the case where \r is the last
496 # character in TextIOWrapper._pending_line.
497 for encoding in encodings:
498 for do_reads in (False, True):
499 for padlen in chain(range(10), range(50, 60)):
500 pad = '.' * padlen
501 data_lines = [ pad + line for line in input_lines ]
502 # XXX: str.encode() should return bytes
503 data = bytes(''.join(data_lines).encode(encoding))
504
505 for newline, exp_line_ends in tests:
506 exp_lines = [ pad + line for line in exp_line_ends ]
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000507 bufio = io.BufferedReader(io.BytesIO(data))
508 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000509 encoding=encoding)
510 if do_reads:
511 got_lines = []
512 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000513 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000514 if c2 == '':
515 break
516 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000517 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000518 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000519 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000520
521 for got_line, exp_line in zip(got_lines, exp_lines):
522 self.assertEquals(got_line, exp_line)
523 self.assertEquals(len(got_lines), len(exp_lines))
524
Guido van Rossum9b76da62007-04-11 01:09:03 +0000525 # Systematic tests of the text I/O API
526
527 def testBasicIO(self):
528 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
529 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
530 f = io.open(test_support.TESTFN, "w+", encoding=enc)
531 f._CHUNK_SIZE = chunksize
532 self.assertEquals(f.write("abc"), 3)
533 f.close()
534 f = io.open(test_support.TESTFN, "r+", encoding=enc)
535 f._CHUNK_SIZE = chunksize
536 self.assertEquals(f.tell(), 0)
537 self.assertEquals(f.read(), "abc")
538 cookie = f.tell()
539 self.assertEquals(f.seek(0), 0)
540 self.assertEquals(f.read(2), "ab")
541 self.assertEquals(f.read(1), "c")
542 self.assertEquals(f.read(1), "")
543 self.assertEquals(f.read(), "")
544 self.assertEquals(f.tell(), cookie)
545 self.assertEquals(f.seek(0), 0)
546 self.assertEquals(f.seek(0, 2), cookie)
547 self.assertEquals(f.write("def"), 3)
548 self.assertEquals(f.seek(cookie), cookie)
549 self.assertEquals(f.read(), "def")
550 if enc.startswith("utf"):
551 self.multi_line_test(f, enc)
552 f.close()
553
554 def multi_line_test(self, f, enc):
555 f.seek(0)
556 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000557 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000558 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000559 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000560 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000561 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000562 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000563 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000564 wlines.append((f.tell(), line))
565 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000566 f.seek(0)
567 rlines = []
568 while True:
569 pos = f.tell()
570 line = f.readline()
571 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000572 break
573 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000574 self.assertEquals(rlines, wlines)
575
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000576 def testTelling(self):
577 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
578 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000579 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000580 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000581 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000582 p2 = f.tell()
583 f.seek(0)
584 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000585 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000586 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000587 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000588 self.assertEquals(f.tell(), p2)
589 f.seek(0)
590 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000591 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000592 self.assertRaises(IOError, f.tell)
593 self.assertEquals(f.tell(), p2)
594 f.close()
595
Guido van Rossumd76e7792007-04-17 02:38:04 +0000596 def testSeeking(self):
597 chunk_size = io.TextIOWrapper._CHUNK_SIZE
598 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000599 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000600 prefix = bytes(u_prefix.encode("utf-8"))
601 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000602 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000603 suffix = bytes(u_suffix.encode("utf-8"))
604 line = prefix + suffix
605 f = io.open(test_support.TESTFN, "wb")
606 f.write(line*2)
607 f.close()
608 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
609 s = f.read(prefix_size)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000610 self.assertEquals(s, str(prefix))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000611 self.assertEquals(f.tell(), prefix_size)
612 self.assertEquals(f.readline(), u_suffix)
613
614 def testSeekingToo(self):
615 # Regression test for a specific bug
616 data = b'\xe0\xbf\xbf\n'
617 f = io.open(test_support.TESTFN, "wb")
618 f.write(data)
619 f.close()
620 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
621 f._CHUNK_SIZE # Just test that it exists
622 f._CHUNK_SIZE = 2
623 f.readline()
624 f.tell()
625
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000626 def timingTest(self):
627 timer = time.time
628 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000629 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000630 nlines = 10000
631 nchars = len(line)
632 nbytes = len(line.encode(enc))
633 for chunk_size in (32, 64, 128, 256):
634 f = io.open(test_support.TESTFN, "w+", encoding=enc)
635 f._CHUNK_SIZE = chunk_size
636 t0 = timer()
637 for i in range(nlines):
638 f.write(line)
639 f.flush()
640 t1 = timer()
641 f.seek(0)
642 for line in f:
643 pass
644 t2 = timer()
645 f.seek(0)
646 while f.readline():
647 pass
648 t3 = timer()
649 f.seek(0)
650 while f.readline():
651 f.tell()
652 t4 = timer()
653 f.close()
654 if test_support.verbose:
655 print("\nTiming test: %d lines of %d characters (%d bytes)" %
656 (nlines, nchars, nbytes))
657 print("File chunk size: %6s" % f._CHUNK_SIZE)
658 print("Writing: %6.3f seconds" % (t1-t0))
659 print("Reading using iteration: %6.3f seconds" % (t2-t1))
660 print("Reading using readline(): %6.3f seconds" % (t3-t2))
661 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
662
Guido van Rossum9b76da62007-04-11 01:09:03 +0000663
Guido van Rossum01a27522007-03-07 01:00:12 +0000664# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000665
Guido van Rossum28524c72007-02-27 05:47:44 +0000666def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000667 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
668 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000669 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum78892e42007-04-06 17:31:18 +0000670 BufferedRandomTest, TextIOWrapperTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000671
672if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000673 unittest.main()