blob: 53419f434a161cebc280a98fd9dc9f3f2b8a6e3e [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum28524c72007-02-27 05:47:44 +00003import unittest
4from test import test_support
Guido van Rossum78892e42007-04-06 17:31:18 +00005from itertools import chain
Guido van Rossum28524c72007-02-27 05:47:44 +00006import io
7
Guido van Rossuma9e20242007-03-08 00:43:48 +00008
Guido van Rossum01a27522007-03-07 01:00:12 +00009class MockIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000010
Guido van Rossum01a27522007-03-07 01:00:12 +000011 def __init__(self, readStack=()):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000012 self._readStack = list(readStack)
Guido van Rossum01a27522007-03-07 01:00:12 +000013 self._writeStack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000014
15 def read(self, n=None):
16 try:
17 return self._readStack.pop(0)
18 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000019 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000020
Guido van Rossum01a27522007-03-07 01:00:12 +000021 def write(self, b):
22 self._writeStack.append(b)
23 return len(b)
24
25 def writable(self):
26 return True
27
Guido van Rossum68bbcd22007-02-27 17:19:33 +000028 def fileno(self):
29 return 42
30
31 def readable(self):
32 return True
33
Guido van Rossum01a27522007-03-07 01:00:12 +000034 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000035 return True
36
Guido van Rossum01a27522007-03-07 01:00:12 +000037 def seek(self, pos, whence):
38 pass
39
40 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000041 return 42
42
Guido van Rossuma9e20242007-03-08 00:43:48 +000043
Guido van Rossum78892e42007-04-06 17:31:18 +000044class MockFileIO(io.BytesIO):
45
46 def __init__(self, data):
47 self.read_history = []
48 io.BytesIO.__init__(self, data)
49
50 def read(self, n=None):
51 res = io.BytesIO.read(self, n)
52 self.read_history.append(None if res is None else len(res))
53 return res
54
55
Guido van Rossum01a27522007-03-07 01:00:12 +000056class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000057
Guido van Rossum01a27522007-03-07 01:00:12 +000058 def __init__(self, blockingScript):
59 self.bs = list(blockingScript)
60 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
63 self._write_stack.append(b)
64 n = self.bs.pop(0)
65 if (n < 0):
66 raise io.BlockingIO(0, "test blocking", -n)
67 else:
68 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000069
Guido van Rossum01a27522007-03-07 01:00:12 +000070 def writable(self):
71 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000072
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum28524c72007-02-27 05:47:44 +000074class IOTest(unittest.TestCase):
75
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000076 def tearDown(self):
77 test_support.unlink(test_support.TESTFN)
78
Guido van Rossum28524c72007-02-27 05:47:44 +000079 def write_ops(self, f):
80 f.write(b"blah.")
81 f.seek(0)
82 f.write(b"Hello.")
83 self.assertEqual(f.tell(), 6)
84 f.seek(-1, 1)
85 self.assertEqual(f.tell(), 5)
86 f.write(" world\n\n\n")
87 f.seek(0)
88 f.write("h")
89 f.seek(-2, 2)
90 f.truncate()
91
92 def read_ops(self, f):
93 data = f.read(5)
94 self.assertEqual(data, b"hello")
Guido van Rossum00efead2007-03-07 05:23:25 +000095 n = f.readinto(data)
96 self.assertEqual(n, 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000097 self.assertEqual(data, b" worl")
Guido van Rossum00efead2007-03-07 05:23:25 +000098 n = f.readinto(data)
99 self.assertEqual(n, 2)
100 self.assertEqual(len(data), 5)
101 self.assertEqual(data[:2], b"d\n")
Guido van Rossum28524c72007-02-27 05:47:44 +0000102 f.seek(0)
103 self.assertEqual(f.read(20), b"hello world\n")
104 f.seek(-6, 2)
105 self.assertEqual(f.read(5), b"world")
106 f.seek(-6, 1)
107 self.assertEqual(f.read(5), b" worl")
108 self.assertEqual(f.tell(), 10)
109
110 def test_raw_file_io(self):
111 f = io.open(test_support.TESTFN, "wb", buffering=0)
112 self.assertEqual(f.readable(), False)
113 self.assertEqual(f.writable(), True)
114 self.assertEqual(f.seekable(), True)
115 self.write_ops(f)
116 f.close()
117 f = io.open(test_support.TESTFN, "rb", buffering=0)
118 self.assertEqual(f.readable(), True)
119 self.assertEqual(f.writable(), False)
120 self.assertEqual(f.seekable(), True)
121 self.read_ops(f)
122 f.close()
123
124 def test_raw_bytes_io(self):
125 f = io.BytesIO()
126 self.write_ops(f)
127 data = f.getvalue()
128 self.assertEqual(data, b"hello world\n")
129 f = io.BytesIO(data)
130 self.read_ops(f)
131
Guido van Rossuma9e20242007-03-08 00:43:48 +0000132 def test_fileio_FileIO(self):
133 import _fileio
134 f = _fileio._FileIO(test_support.TESTFN, "w")
135 self.assertEqual(f.readable(), False)
136 self.assertEqual(f.writable(), True)
137 self.assertEqual(f.seekable(), True)
138 self.write_ops(f)
139 f.close()
140 f = _fileio._FileIO(test_support.TESTFN, "r")
141 self.assertEqual(f.readable(), True)
142 self.assertEqual(f.writable(), False)
143 self.assertEqual(f.seekable(), True)
144 self.read_ops(f)
145 f.close()
146
147 def test_PyFileIO(self):
148 f = io._PyFileIO(test_support.TESTFN, "w")
149 self.assertEqual(f.readable(), False)
150 self.assertEqual(f.writable(), True)
151 self.assertEqual(f.seekable(), True)
152 self.write_ops(f)
153 f.close()
154 f = io._PyFileIO(test_support.TESTFN, "r")
155 self.assertEqual(f.readable(), True)
156 self.assertEqual(f.writable(), False)
157 self.assertEqual(f.seekable(), True)
158 self.read_ops(f)
159 f.close()
160
161
Guido van Rossum78892e42007-04-06 17:31:18 +0000162class MemorySeekTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000163
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000164 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000165 buf = self.buftype("1234567890")
166 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000167
168 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000169 buf = self.buftype("1234567890")
170 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000171
172 self.assertEquals(buf[:1], bytesIo.read(1))
173 self.assertEquals(buf[1:5], bytesIo.read(4))
174 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000175 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000176
177 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000178 buf = self.buftype("1234567890")
179 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000180
181 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000182 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000183
184 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000185 buf = self.buftype("1234567890")
186 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000187
188 bytesIo.read(5)
189 bytesIo.seek(0)
190 self.assertEquals(buf, bytesIo.read())
191
192 bytesIo.seek(3)
193 self.assertEquals(buf[3:], bytesIo.read())
194
195 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000196 buf = self.buftype("1234567890")
197 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000198
199 self.assertEquals(0, bytesIo.tell())
200 bytesIo.seek(5)
201 self.assertEquals(5, bytesIo.tell())
202 bytesIo.seek(10000)
203 self.assertEquals(10000, bytesIo.tell())
204
Guido van Rossuma9e20242007-03-08 00:43:48 +0000205
Guido van Rossum78892e42007-04-06 17:31:18 +0000206class BytesIOTest(MemorySeekTest):
207 buftype = bytes
208 ioclass = io.BytesIO
209 EOF = b""
210
211
212class StringIOTest(MemorySeekTest):
213 buftype = str
214 ioclass = io.StringIO
215 EOF = ""
216
217
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000218class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000219
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000220 def testRead(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000221 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000222 bufIo = io.BufferedReader(rawIo)
223
224 self.assertEquals(b"abcdef", bufIo.read(6))
225
Guido van Rossum78892e42007-04-06 17:31:18 +0000226 def testBuffering(self):
227 data = b"abcdefghi"
228 dlen = len(data)
229
230 tests = [
231 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
232 [ 100, [ 3, 3, 3], [ dlen ] ],
233 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
234 ]
235
236 for bufsize, buf_read_sizes, raw_read_sizes in tests:
237 rawIo = MockFileIO(data)
238 bufIo = io.BufferedReader(rawIo, buffer_size=bufsize)
239 pos = 0
240 for nbytes in buf_read_sizes:
241 self.assertEquals(bufIo.read(nbytes), data[pos:pos+nbytes])
242 pos += nbytes
243 self.assertEquals(rawIo.read_history, raw_read_sizes)
244
Guido van Rossum01a27522007-03-07 01:00:12 +0000245 def testReadNonBlocking(self):
246 # Inject some None's in there to simulate EWOULDBLOCK
247 rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
248 bufIo = io.BufferedReader(rawIo)
249
250 self.assertEquals(b"abcd", bufIo.read(6))
251 self.assertEquals(b"e", bufIo.read(1))
252 self.assertEquals(b"fg", bufIo.read())
253 self.assert_(None is bufIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000254 self.assertEquals(b"", bufIo.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000255
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000256 def testReadToEof(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000257 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000258 bufIo = io.BufferedReader(rawIo)
259
260 self.assertEquals(b"abcdefg", bufIo.read(9000))
261
262 def testReadNoArgs(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000263 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000264 bufIo = io.BufferedReader(rawIo)
265
266 self.assertEquals(b"abcdefg", bufIo.read())
267
268 def testFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000269 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000270 bufIo = io.BufferedReader(rawIo)
271
272 self.assertEquals(42, bufIo.fileno())
273
274 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000275 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000276 # this test. Else, write it.
277 pass
278
Guido van Rossuma9e20242007-03-08 00:43:48 +0000279
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000280class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000281
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000282 def testWrite(self):
283 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum01a27522007-03-07 01:00:12 +0000284 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000285 bufIo = io.BufferedWriter(writer, 8)
286
287 bufIo.write(b"abc")
288
289 self.assertFalse(writer._writeStack)
290
291 def testWriteOverflow(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000292 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000293 bufIo = io.BufferedWriter(writer, 8)
294
295 bufIo.write(b"abc")
296 bufIo.write(b"defghijkl")
297
298 self.assertEquals(b"abcdefghijkl", writer._writeStack[0])
299
Guido van Rossum01a27522007-03-07 01:00:12 +0000300 def testWriteNonBlocking(self):
301 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
302 bufIo = io.BufferedWriter(raw, 8, 16)
303
304 bufIo.write(b"asdf")
305 bufIo.write(b"asdfa")
306 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
307
308 bufIo.write(b"asdfasdfasdf")
309 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
310 bufIo.write(b"asdfasdfasdf")
311 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
312 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
313
314 bufIo.write(b"asdfasdfasdf")
315
Guido van Rossum78892e42007-04-06 17:31:18 +0000316 # XXX I don't like this test. It relies too heavily on how the
317 # algorithm actually works, which we might change. Refactor
318 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000319
320 def testFileno(self):
321 rawIo = MockIO((b"abc", b"d", b"efg"))
322 bufIo = io.BufferedWriter(rawIo)
323
324 self.assertEquals(42, bufIo.fileno())
325
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000326 def testFlush(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000327 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000328 bufIo = io.BufferedWriter(writer, 8)
329
330 bufIo.write(b"abc")
331 bufIo.flush()
332
333 self.assertEquals(b"abc", writer._writeStack[0])
334
Guido van Rossuma9e20242007-03-08 00:43:48 +0000335
Guido van Rossum01a27522007-03-07 01:00:12 +0000336class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000337
Guido van Rossum01a27522007-03-07 01:00:12 +0000338 def testRWPair(self):
339 r = MockIO(())
340 w = MockIO()
341 pair = io.BufferedRWPair(r, w)
342
343 # XXX need implementation
344
Guido van Rossuma9e20242007-03-08 00:43:48 +0000345
Guido van Rossum78892e42007-04-06 17:31:18 +0000346class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000347
Guido van Rossum01a27522007-03-07 01:00:12 +0000348 def testReadAndWrite(self):
349 raw = MockIO((b"asdf", b"ghjk"))
350 rw = io.BufferedRandom(raw, 8, 12)
351
352 self.assertEqual(b"as", rw.read(2))
353 rw.write(b"ddd")
354 rw.write(b"eee")
355 self.assertFalse(raw._writeStack) # Buffer writes
356 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
357 self.assertEquals(b"dddeee", raw._writeStack[0])
358
359 def testSeekAndTell(self):
360 raw = io.BytesIO(b"asdfghjkl")
361 rw = io.BufferedRandom(raw)
362
363 self.assertEquals(b"as", rw.read(2))
364 self.assertEquals(2, rw.tell())
365 rw.seek(0, 0)
366 self.assertEquals(b"asdf", rw.read(4))
367
368 rw.write(b"asdf")
369 rw.seek(0, 0)
370 self.assertEquals(b"asdfasdfl", rw.read())
371 self.assertEquals(9, rw.tell())
372 rw.seek(-4, 2)
373 self.assertEquals(5, rw.tell())
374 rw.seek(2, 1)
375 self.assertEquals(7, rw.tell())
376 self.assertEquals(b"fl", rw.read(11))
377
Guido van Rossum78892e42007-04-06 17:31:18 +0000378
379class TextIOWrapperTest(unittest.TestCase):
380 def testNewlines(self):
381 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
382
383 tests = [
384 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
385 [ '\n', input_lines ],
386 [ '\r\n', input_lines ],
387 ]
388
389 encodings = ('utf-8', 'bz2')
390
391 # Try a range of pad sizes to test the case where \r is the last
392 # character in TextIOWrapper._pending_line.
393 for encoding in encodings:
394 for do_reads in (False, True):
395 for padlen in chain(range(10), range(50, 60)):
396 pad = '.' * padlen
397 data_lines = [ pad + line for line in input_lines ]
398 # XXX: str.encode() should return bytes
399 data = bytes(''.join(data_lines).encode(encoding))
400
401 for newline, exp_line_ends in tests:
402 exp_lines = [ pad + line for line in exp_line_ends ]
403 bufIo = io.BufferedReader(io.BytesIO(data))
404 textIo = io.TextIOWrapper(bufIo, newline=newline,
405 encoding=encoding)
406 if do_reads:
407 got_lines = []
408 while True:
409 c2 = textIo.read(2)
410 if c2 == '':
411 break
412 self.assertEquals(len(c2), 2)
413 got_lines.append(c2 + textIo.readline())
414 else:
415 got_lines = list(textIo)
416
417 for got_line, exp_line in zip(got_lines, exp_lines):
418 self.assertEquals(got_line, exp_line)
419 self.assertEquals(len(got_lines), len(exp_lines))
420
Guido van Rossum01a27522007-03-07 01:00:12 +0000421# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000422
Guido van Rossum28524c72007-02-27 05:47:44 +0000423def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000424 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
425 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000426 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum78892e42007-04-06 17:31:18 +0000427 BufferedRandomTest, TextIOWrapperTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000428
429if __name__ == "__main__":
430 test_main()