blob: 9b05aa96458469a925aa2f5609c54cc6915f2c8f [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum34d69e52007-04-10 20:08:41 +00003import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00004import time
Guido van Rossumd4103952007-04-12 05:44:49 +00005import array
Guido van Rossum28524c72007-02-27 05:47:44 +00006import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00007from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00008from test import test_support
9
Guido van Rossum53807da2007-04-10 19:01:47 +000010import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000011
Guido van Rossuma9e20242007-03-08 00:43:48 +000012
Guido van Rossum53807da2007-04-10 19:01:47 +000013class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000014
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000015 def __init__(self, read_stack=()):
16 self._read_stack = list(read_stack)
17 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000018
19 def read(self, n=None):
20 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000021 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000022 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000023 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000024
Guido van Rossum01a27522007-03-07 01:00:12 +000025 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000026 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000027 return len(b)
28
29 def writable(self):
30 return True
31
Guido van Rossum68bbcd22007-02-27 17:19:33 +000032 def fileno(self):
33 return 42
34
35 def readable(self):
36 return True
37
Guido van Rossum01a27522007-03-07 01:00:12 +000038 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000039 return True
40
Guido van Rossum01a27522007-03-07 01:00:12 +000041 def seek(self, pos, whence):
42 pass
43
44 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000045 return 42
46
Guido van Rossuma9e20242007-03-08 00:43:48 +000047
Guido van Rossum78892e42007-04-06 17:31:18 +000048class MockFileIO(io.BytesIO):
49
50 def __init__(self, data):
51 self.read_history = []
52 io.BytesIO.__init__(self, data)
53
54 def read(self, n=None):
55 res = io.BytesIO.read(self, n)
56 self.read_history.append(None if res is None else len(res))
57 return res
58
59
Guido van Rossum01a27522007-03-07 01:00:12 +000060class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000061
Guido van Rossum53807da2007-04-10 19:01:47 +000062 def __init__(self, blocking_script):
63 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000064 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000065
Guido van Rossum01a27522007-03-07 01:00:12 +000066 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000067 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000068 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000069 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000070 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000071 else:
72 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000073
Guido van Rossum01a27522007-03-07 01:00:12 +000074 def writable(self):
75 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076
Guido van Rossuma9e20242007-03-08 00:43:48 +000077
Guido van Rossum28524c72007-02-27 05:47:44 +000078class IOTest(unittest.TestCase):
79
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000080 def tearDown(self):
81 test_support.unlink(test_support.TESTFN)
82
Guido van Rossum28524c72007-02-27 05:47:44 +000083 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000084 self.assertEqual(f.write(b"blah."), 5)
85 self.assertEqual(f.seek(0), 0)
86 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000087 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000088 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000089 self.assertEqual(f.tell(), 5)
Guido van Rossum87429772007-04-10 21:06:59 +000090 self.assertEqual(f.write(" world\n\n\n"), 9)
91 self.assertEqual(f.seek(0), 0)
92 self.assertEqual(f.write("h"), 1)
93 self.assertEqual(f.seek(-1, 2), 13)
94 self.assertEqual(f.tell(), 13)
95 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000096 self.assertEqual(f.tell(), 13)
Guido van Rossum28524c72007-02-27 05:47:44 +000097
Guido van Rossum9b76da62007-04-11 01:09:03 +000098 def read_ops(self, f, buffered=False):
99 data = f.read(5)
100 self.assertEqual(data, b"hello")
101 self.assertEqual(f.readinto(data), 5)
102 self.assertEqual(data, b" worl")
103 self.assertEqual(f.readinto(data), 2)
104 self.assertEqual(len(data), 5)
105 self.assertEqual(data[:2], b"d\n")
106 self.assertEqual(f.seek(0), 0)
107 self.assertEqual(f.read(20), b"hello world\n")
108 self.assertEqual(f.read(1), b"")
109 self.assertEqual(f.readinto(b"x"), 0)
110 self.assertEqual(f.seek(-6, 2), 6)
111 self.assertEqual(f.read(5), b"world")
112 self.assertEqual(f.read(0), b"")
113 self.assertEqual(f.readinto(b""), 0)
114 self.assertEqual(f.seek(-6, 1), 5)
115 self.assertEqual(f.read(5), b" worl")
116 self.assertEqual(f.tell(), 10)
117 if buffered:
118 f.seek(0)
119 self.assertEqual(f.read(), b"hello world\n")
120 f.seek(6)
121 self.assertEqual(f.read(), b"world\n")
122 self.assertEqual(f.read(), b"")
123
Guido van Rossum34d69e52007-04-10 20:08:41 +0000124 LARGE = 2**31
125
Guido van Rossum53807da2007-04-10 19:01:47 +0000126 def large_file_ops(self, f):
127 assert f.readable()
128 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000129 self.assertEqual(f.seek(self.LARGE), self.LARGE)
130 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000131 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000132 self.assertEqual(f.tell(), self.LARGE + 3)
133 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000134 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000135 self.assertEqual(f.tell(), self.LARGE + 2)
136 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000137 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000138 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
140 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000141 self.assertEqual(f.read(2), b"x")
142
Guido van Rossum28524c72007-02-27 05:47:44 +0000143 def test_raw_file_io(self):
144 f = io.open(test_support.TESTFN, "wb", buffering=0)
145 self.assertEqual(f.readable(), False)
146 self.assertEqual(f.writable(), True)
147 self.assertEqual(f.seekable(), True)
148 self.write_ops(f)
149 f.close()
150 f = io.open(test_support.TESTFN, "rb", buffering=0)
151 self.assertEqual(f.readable(), True)
152 self.assertEqual(f.writable(), False)
153 self.assertEqual(f.seekable(), True)
154 self.read_ops(f)
155 f.close()
156
Guido van Rossum87429772007-04-10 21:06:59 +0000157 def test_buffered_file_io(self):
158 f = io.open(test_support.TESTFN, "wb")
159 self.assertEqual(f.readable(), False)
160 self.assertEqual(f.writable(), True)
161 self.assertEqual(f.seekable(), True)
162 self.write_ops(f)
163 f.close()
164 f = io.open(test_support.TESTFN, "rb")
165 self.assertEqual(f.readable(), True)
166 self.assertEqual(f.writable(), False)
167 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000168 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000169 f.close()
170
Guido van Rossum28524c72007-02-27 05:47:44 +0000171 def test_raw_bytes_io(self):
172 f = io.BytesIO()
173 self.write_ops(f)
174 data = f.getvalue()
175 self.assertEqual(data, b"hello world\n")
176 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000177 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000178
Guido van Rossum53807da2007-04-10 19:01:47 +0000179 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000180 # On Windows and Mac OSX this test comsumes large resources; It takes
181 # a long time to build the >2GB file and takes >2GB of disk space
182 # therefore the resource must be enabled to run this test.
183 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
184 if not test_support.is_resource_enabled("largefile"):
185 print("\nTesting large file ops skipped on %s." % sys.platform,
186 file=sys.stderr)
187 print("It requires %d bytes and a long time." % self.LARGE,
188 file=sys.stderr)
189 print("Use 'regrtest.py -u largefile test_io' to run it.",
190 file=sys.stderr)
191 return
Guido van Rossum87429772007-04-10 21:06:59 +0000192 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000193 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000194 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000195 f = io.open(test_support.TESTFN, "w+b")
196 self.large_file_ops(f)
197 f.close()
198
199 def test_with_open(self):
200 for bufsize in (0, 1, 100):
201 f = None
202 with open(test_support.TESTFN, "wb", bufsize) as f:
203 f.write("xxx")
204 self.assertEqual(f.closed, True)
205 f = None
206 try:
207 with open(test_support.TESTFN, "wb", bufsize) as f:
208 1/0
209 except ZeroDivisionError:
210 self.assertEqual(f.closed, True)
211 else:
212 self.fail("1/0 didn't raise an exception")
213
214 def test_destructor(self):
215 record = []
216 class MyFileIO(io.FileIO):
217 def __del__(self):
218 record.append(1)
219 io.FileIO.__del__(self)
220 def close(self):
221 record.append(2)
222 io.FileIO.close(self)
223 def flush(self):
224 record.append(3)
225 io.FileIO.flush(self)
226 f = MyFileIO(test_support.TESTFN, "w")
227 f.write("xxx")
228 del f
229 self.assertEqual(record, [1, 2, 3])
230
231 def test_close_flushes(self):
232 f = io.open(test_support.TESTFN, "wb")
233 f.write("xxx")
234 f.close()
235 f = io.open(test_support.TESTFN, "rb")
236 self.assertEqual(f.read(), b"xxx")
237 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000238
Guido van Rossumd4103952007-04-12 05:44:49 +0000239 def test_array_writes(self):
240 a = array.array('i', range(10))
241 n = len(buffer(a))
242 f = io.open(test_support.TESTFN, "wb", 0)
243 self.assertEqual(f.write(a), n)
244 f.close()
245 f = io.open(test_support.TESTFN, "wb")
246 self.assertEqual(f.write(a), n)
247 f.close()
248
Guido van Rossuma9e20242007-03-08 00:43:48 +0000249
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000250class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000251
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000252 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000253 buf = self.buftype("1234567890")
254 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000255
256 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000257 buf = self.buftype("1234567890")
258 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000259
260 self.assertEquals(buf[:1], bytesIo.read(1))
261 self.assertEquals(buf[1:5], bytesIo.read(4))
262 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000263 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000264
265 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000266 buf = self.buftype("1234567890")
267 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000268
269 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000270 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000271
272 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000273 buf = self.buftype("1234567890")
274 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000275
276 bytesIo.read(5)
277 bytesIo.seek(0)
278 self.assertEquals(buf, bytesIo.read())
279
280 bytesIo.seek(3)
281 self.assertEquals(buf[3:], bytesIo.read())
282
283 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000284 buf = self.buftype("1234567890")
285 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000286
287 self.assertEquals(0, bytesIo.tell())
288 bytesIo.seek(5)
289 self.assertEquals(5, bytesIo.tell())
290 bytesIo.seek(10000)
291 self.assertEquals(10000, bytesIo.tell())
292
Guido van Rossuma9e20242007-03-08 00:43:48 +0000293
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000294class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000295 buftype = bytes
296 ioclass = io.BytesIO
297 EOF = b""
298
299
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000300class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000301 buftype = str
302 ioclass = io.StringIO
303 EOF = ""
304
305
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000306class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000307
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000308 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000309 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000310 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000311
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000312 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000313
Guido van Rossum78892e42007-04-06 17:31:18 +0000314 def testBuffering(self):
315 data = b"abcdefghi"
316 dlen = len(data)
317
318 tests = [
319 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
320 [ 100, [ 3, 3, 3], [ dlen ] ],
321 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
322 ]
323
324 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000325 rawio = MockFileIO(data)
326 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000327 pos = 0
328 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000329 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000330 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000331 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000332
Guido van Rossum01a27522007-03-07 01:00:12 +0000333 def testReadNonBlocking(self):
334 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000335 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000336 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000337
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000338 self.assertEquals(b"abcd", bufio.read(6))
339 self.assertEquals(b"e", bufio.read(1))
340 self.assertEquals(b"fg", bufio.read())
341 self.assert_(None is bufio.read())
342 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000343
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000344 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000345 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000346 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000347
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000348 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000349
350 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000351 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000352 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000353
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000354 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000355
356 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000357 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000358 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000359
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000360 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000361
362 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000363 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000364 # this test. Else, write it.
365 pass
366
Guido van Rossuma9e20242007-03-08 00:43:48 +0000367
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000368class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000369
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370 def testWrite(self):
371 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000372 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000373 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000374
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000375 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000376
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000377 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000378
379 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000380 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000381 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000382
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000383 bufio.write(b"abc")
384 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000385
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000386 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387
Guido van Rossum01a27522007-03-07 01:00:12 +0000388 def testWriteNonBlocking(self):
389 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000390 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000391
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000392 bufio.write(b"asdf")
393 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000394 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
395
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000396 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000397 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000398 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000399 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
400 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
401
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000402 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000403
Guido van Rossum78892e42007-04-06 17:31:18 +0000404 # XXX I don't like this test. It relies too heavily on how the
405 # algorithm actually works, which we might change. Refactor
406 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000407
408 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000409 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000410 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000411
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000412 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000413
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000414 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000416 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000417
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000418 bufio.write(b"abc")
419 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000420
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000421 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000422
Guido van Rossuma9e20242007-03-08 00:43:48 +0000423
Guido van Rossum01a27522007-03-07 01:00:12 +0000424class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000425
Guido van Rossum01a27522007-03-07 01:00:12 +0000426 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000427 r = MockRawIO(())
428 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000429 pair = io.BufferedRWPair(r, w)
430
431 # XXX need implementation
432
Guido van Rossuma9e20242007-03-08 00:43:48 +0000433
Guido van Rossum78892e42007-04-06 17:31:18 +0000434class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000435
Guido van Rossum01a27522007-03-07 01:00:12 +0000436 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000437 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000438 rw = io.BufferedRandom(raw, 8, 12)
439
440 self.assertEqual(b"as", rw.read(2))
441 rw.write(b"ddd")
442 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000443 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000444 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000445 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000446
447 def testSeekAndTell(self):
448 raw = io.BytesIO(b"asdfghjkl")
449 rw = io.BufferedRandom(raw)
450
451 self.assertEquals(b"as", rw.read(2))
452 self.assertEquals(2, rw.tell())
453 rw.seek(0, 0)
454 self.assertEquals(b"asdf", rw.read(4))
455
456 rw.write(b"asdf")
457 rw.seek(0, 0)
458 self.assertEquals(b"asdfasdfl", rw.read())
459 self.assertEquals(9, rw.tell())
460 rw.seek(-4, 2)
461 self.assertEquals(5, rw.tell())
462 rw.seek(2, 1)
463 self.assertEquals(7, rw.tell())
464 self.assertEquals(b"fl", rw.read(11))
465
Guido van Rossum78892e42007-04-06 17:31:18 +0000466
467class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000468
Guido van Rossumd0712812007-04-11 16:32:43 +0000469 def tearDown(self):
470 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000471
Guido van Rossum78892e42007-04-06 17:31:18 +0000472 def testNewlines(self):
473 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
474
475 tests = [
476 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
477 [ '\n', input_lines ],
478 [ '\r\n', input_lines ],
479 ]
480
481 encodings = ('utf-8', 'bz2')
482
483 # Try a range of pad sizes to test the case where \r is the last
484 # character in TextIOWrapper._pending_line.
485 for encoding in encodings:
486 for do_reads in (False, True):
487 for padlen in chain(range(10), range(50, 60)):
488 pad = '.' * padlen
489 data_lines = [ pad + line for line in input_lines ]
490 # XXX: str.encode() should return bytes
491 data = bytes(''.join(data_lines).encode(encoding))
492
493 for newline, exp_line_ends in tests:
494 exp_lines = [ pad + line for line in exp_line_ends ]
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000495 bufio = io.BufferedReader(io.BytesIO(data))
496 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000497 encoding=encoding)
498 if do_reads:
499 got_lines = []
500 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000501 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000502 if c2 == '':
503 break
504 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000505 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000506 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000507 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000508
509 for got_line, exp_line in zip(got_lines, exp_lines):
510 self.assertEquals(got_line, exp_line)
511 self.assertEquals(len(got_lines), len(exp_lines))
512
Guido van Rossum9b76da62007-04-11 01:09:03 +0000513 # Systematic tests of the text I/O API
514
515 def testBasicIO(self):
516 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
517 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
518 f = io.open(test_support.TESTFN, "w+", encoding=enc)
519 f._CHUNK_SIZE = chunksize
520 self.assertEquals(f.write("abc"), 3)
521 f.close()
522 f = io.open(test_support.TESTFN, "r+", encoding=enc)
523 f._CHUNK_SIZE = chunksize
524 self.assertEquals(f.tell(), 0)
525 self.assertEquals(f.read(), "abc")
526 cookie = f.tell()
527 self.assertEquals(f.seek(0), 0)
528 self.assertEquals(f.read(2), "ab")
529 self.assertEquals(f.read(1), "c")
530 self.assertEquals(f.read(1), "")
531 self.assertEquals(f.read(), "")
532 self.assertEquals(f.tell(), cookie)
533 self.assertEquals(f.seek(0), 0)
534 self.assertEquals(f.seek(0, 2), cookie)
535 self.assertEquals(f.write("def"), 3)
536 self.assertEquals(f.seek(cookie), cookie)
537 self.assertEquals(f.read(), "def")
538 if enc.startswith("utf"):
539 self.multi_line_test(f, enc)
540 f.close()
541
542 def multi_line_test(self, f, enc):
543 f.seek(0)
544 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000545 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000546 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000547 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000548 chars = []
549 for i in xrange(size):
550 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000551 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000552 wlines.append((f.tell(), line))
553 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000554 f.seek(0)
555 rlines = []
556 while True:
557 pos = f.tell()
558 line = f.readline()
559 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000560 break
561 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000562 self.assertEquals(rlines, wlines)
563
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000564 def testTelling(self):
565 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
566 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000567 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000568 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000569 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000570 p2 = f.tell()
571 f.seek(0)
572 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000573 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000574 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000575 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000576 self.assertEquals(f.tell(), p2)
577 f.seek(0)
578 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000579 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000580 self.assertRaises(IOError, f.tell)
581 self.assertEquals(f.tell(), p2)
582 f.close()
583
Guido van Rossumd76e7792007-04-17 02:38:04 +0000584 def testSeeking(self):
585 chunk_size = io.TextIOWrapper._CHUNK_SIZE
586 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000587 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000588 prefix = bytes(u_prefix.encode("utf-8"))
589 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000590 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000591 suffix = bytes(u_suffix.encode("utf-8"))
592 line = prefix + suffix
593 f = io.open(test_support.TESTFN, "wb")
594 f.write(line*2)
595 f.close()
596 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
597 s = f.read(prefix_size)
598 self.assertEquals(s, prefix)
599 self.assertEquals(f.tell(), prefix_size)
600 self.assertEquals(f.readline(), u_suffix)
601
602 def testSeekingToo(self):
603 # Regression test for a specific bug
604 data = b'\xe0\xbf\xbf\n'
605 f = io.open(test_support.TESTFN, "wb")
606 f.write(data)
607 f.close()
608 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
609 f._CHUNK_SIZE # Just test that it exists
610 f._CHUNK_SIZE = 2
611 f.readline()
612 f.tell()
613
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000614 def timingTest(self):
615 timer = time.time
616 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000617 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000618 nlines = 10000
619 nchars = len(line)
620 nbytes = len(line.encode(enc))
621 for chunk_size in (32, 64, 128, 256):
622 f = io.open(test_support.TESTFN, "w+", encoding=enc)
623 f._CHUNK_SIZE = chunk_size
624 t0 = timer()
625 for i in range(nlines):
626 f.write(line)
627 f.flush()
628 t1 = timer()
629 f.seek(0)
630 for line in f:
631 pass
632 t2 = timer()
633 f.seek(0)
634 while f.readline():
635 pass
636 t3 = timer()
637 f.seek(0)
638 while f.readline():
639 f.tell()
640 t4 = timer()
641 f.close()
642 if test_support.verbose:
643 print("\nTiming test: %d lines of %d characters (%d bytes)" %
644 (nlines, nchars, nbytes))
645 print("File chunk size: %6s" % f._CHUNK_SIZE)
646 print("Writing: %6.3f seconds" % (t1-t0))
647 print("Reading using iteration: %6.3f seconds" % (t2-t1))
648 print("Reading using readline(): %6.3f seconds" % (t3-t2))
649 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
650
Guido van Rossum9b76da62007-04-11 01:09:03 +0000651
Guido van Rossum01a27522007-03-07 01:00:12 +0000652# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000653
Guido van Rossum28524c72007-02-27 05:47:44 +0000654def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000655 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
656 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000657 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum78892e42007-04-06 17:31:18 +0000658 BufferedRandomTest, TextIOWrapperTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000659
660if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000661 unittest.main()