blob: 9d4163edc1d9fb5ce738b2f932365b06eb6c7809 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Guido van Rossum53807da2007-04-10 19:01:47 +000011import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000012
Guido van Rossuma9e20242007-03-08 00:43:48 +000013
Guido van Rossum53807da2007-04-10 19:01:47 +000014class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000015
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000016 def __init__(self, read_stack=()):
17 self._read_stack = list(read_stack)
18 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000019
20 def read(self, n=None):
21 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000022 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000023 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000024 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000025
Guido van Rossum01a27522007-03-07 01:00:12 +000026 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000027 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000028 return len(b)
29
30 def writable(self):
31 return True
32
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 def fileno(self):
34 return 42
35
36 def readable(self):
37 return True
38
Guido van Rossum01a27522007-03-07 01:00:12 +000039 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000040 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seek(self, pos, whence):
43 pass
44
45 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000046 return 42
47
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum78892e42007-04-06 17:31:18 +000049class MockFileIO(io.BytesIO):
50
51 def __init__(self, data):
52 self.read_history = []
53 io.BytesIO.__init__(self, data)
54
55 def read(self, n=None):
56 res = io.BytesIO.read(self, n)
57 self.read_history.append(None if res is None else len(res))
58 return res
59
60
Guido van Rossum01a27522007-03-07 01:00:12 +000061class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000062
Guido van Rossum53807da2007-04-10 19:01:47 +000063 def __init__(self, blocking_script):
64 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000065 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000068 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000069 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000070 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000071 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000072 else:
73 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000074
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def writable(self):
76 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossuma9e20242007-03-08 00:43:48 +000078
Guido van Rossum28524c72007-02-27 05:47:44 +000079class IOTest(unittest.TestCase):
80
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000081 def tearDown(self):
82 test_support.unlink(test_support.TESTFN)
83
Guido van Rossum28524c72007-02-27 05:47:44 +000084 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000085 self.assertEqual(f.write(b"blah."), 5)
86 self.assertEqual(f.seek(0), 0)
87 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000088 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000090 self.assertEqual(f.tell(), 5)
Georg Brandlbd1c68c2007-10-24 18:55:37 +000091 self.assertEqual(f.write(str8(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000092 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000093 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000094 self.assertEqual(f.seek(-1, 2), 13)
95 self.assertEqual(f.tell(), 13)
96 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000097 self.assertEqual(f.tell(), 13)
Guido van Rossum28524c72007-02-27 05:47:44 +000098
Guido van Rossum9b76da62007-04-11 01:09:03 +000099 def read_ops(self, f, buffered=False):
100 data = f.read(5)
101 self.assertEqual(data, b"hello")
102 self.assertEqual(f.readinto(data), 5)
103 self.assertEqual(data, b" worl")
104 self.assertEqual(f.readinto(data), 2)
105 self.assertEqual(len(data), 5)
106 self.assertEqual(data[:2], b"d\n")
107 self.assertEqual(f.seek(0), 0)
108 self.assertEqual(f.read(20), b"hello world\n")
109 self.assertEqual(f.read(1), b"")
110 self.assertEqual(f.readinto(b"x"), 0)
111 self.assertEqual(f.seek(-6, 2), 6)
112 self.assertEqual(f.read(5), b"world")
113 self.assertEqual(f.read(0), b"")
114 self.assertEqual(f.readinto(b""), 0)
115 self.assertEqual(f.seek(-6, 1), 5)
116 self.assertEqual(f.read(5), b" worl")
117 self.assertEqual(f.tell(), 10)
118 if buffered:
119 f.seek(0)
120 self.assertEqual(f.read(), b"hello world\n")
121 f.seek(6)
122 self.assertEqual(f.read(), b"world\n")
123 self.assertEqual(f.read(), b"")
124
Guido van Rossum34d69e52007-04-10 20:08:41 +0000125 LARGE = 2**31
126
Guido van Rossum53807da2007-04-10 19:01:47 +0000127 def large_file_ops(self, f):
128 assert f.readable()
129 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000130 self.assertEqual(f.seek(self.LARGE), self.LARGE)
131 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000132 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000133 self.assertEqual(f.tell(), self.LARGE + 3)
134 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000135 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000136 self.assertEqual(f.tell(), self.LARGE + 2)
137 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000138 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000139 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000140 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
141 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000142 self.assertEqual(f.read(2), b"x")
143
Guido van Rossum28524c72007-02-27 05:47:44 +0000144 def test_raw_file_io(self):
145 f = io.open(test_support.TESTFN, "wb", buffering=0)
146 self.assertEqual(f.readable(), False)
147 self.assertEqual(f.writable(), True)
148 self.assertEqual(f.seekable(), True)
149 self.write_ops(f)
150 f.close()
151 f = io.open(test_support.TESTFN, "rb", buffering=0)
152 self.assertEqual(f.readable(), True)
153 self.assertEqual(f.writable(), False)
154 self.assertEqual(f.seekable(), True)
155 self.read_ops(f)
156 f.close()
157
Guido van Rossum87429772007-04-10 21:06:59 +0000158 def test_buffered_file_io(self):
159 f = io.open(test_support.TESTFN, "wb")
160 self.assertEqual(f.readable(), False)
161 self.assertEqual(f.writable(), True)
162 self.assertEqual(f.seekable(), True)
163 self.write_ops(f)
164 f.close()
165 f = io.open(test_support.TESTFN, "rb")
166 self.assertEqual(f.readable(), True)
167 self.assertEqual(f.writable(), False)
168 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000169 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000170 f.close()
171
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000172 def test_readline(self):
173 f = io.open(test_support.TESTFN, "wb")
174 f.write(b"abc\ndef\nxyzzy\nfoo")
175 f.close()
176 f = io.open(test_support.TESTFN, "rb")
177 self.assertEqual(f.readline(), b"abc\n")
178 self.assertEqual(f.readline(10), b"def\n")
179 self.assertEqual(f.readline(2), b"xy")
180 self.assertEqual(f.readline(4), b"zzy\n")
181 self.assertEqual(f.readline(), b"foo")
182 f.close()
183
Guido van Rossum28524c72007-02-27 05:47:44 +0000184 def test_raw_bytes_io(self):
185 f = io.BytesIO()
186 self.write_ops(f)
187 data = f.getvalue()
188 self.assertEqual(data, b"hello world\n")
189 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000190 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000191
Guido van Rossum53807da2007-04-10 19:01:47 +0000192 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000193 # On Windows and Mac OSX this test comsumes large resources; It takes
194 # a long time to build the >2GB file and takes >2GB of disk space
195 # therefore the resource must be enabled to run this test.
196 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
197 if not test_support.is_resource_enabled("largefile"):
198 print("\nTesting large file ops skipped on %s." % sys.platform,
199 file=sys.stderr)
200 print("It requires %d bytes and a long time." % self.LARGE,
201 file=sys.stderr)
202 print("Use 'regrtest.py -u largefile test_io' to run it.",
203 file=sys.stderr)
204 return
Guido van Rossum87429772007-04-10 21:06:59 +0000205 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000206 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000208 f = io.open(test_support.TESTFN, "w+b")
209 self.large_file_ops(f)
210 f.close()
211
212 def test_with_open(self):
213 for bufsize in (0, 1, 100):
214 f = None
215 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000216 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000217 self.assertEqual(f.closed, True)
218 f = None
219 try:
220 with open(test_support.TESTFN, "wb", bufsize) as f:
221 1/0
222 except ZeroDivisionError:
223 self.assertEqual(f.closed, True)
224 else:
225 self.fail("1/0 didn't raise an exception")
226
227 def test_destructor(self):
228 record = []
229 class MyFileIO(io.FileIO):
230 def __del__(self):
231 record.append(1)
232 io.FileIO.__del__(self)
233 def close(self):
234 record.append(2)
235 io.FileIO.close(self)
236 def flush(self):
237 record.append(3)
238 io.FileIO.flush(self)
239 f = MyFileIO(test_support.TESTFN, "w")
240 f.write("xxx")
241 del f
242 self.assertEqual(record, [1, 2, 3])
243
244 def test_close_flushes(self):
245 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000246 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000247 f.close()
248 f = io.open(test_support.TESTFN, "rb")
249 self.assertEqual(f.read(), b"xxx")
250 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000251
Guido van Rossumd4103952007-04-12 05:44:49 +0000252 def test_array_writes(self):
253 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000254 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000255 f = io.open(test_support.TESTFN, "wb", 0)
256 self.assertEqual(f.write(a), n)
257 f.close()
258 f = io.open(test_support.TESTFN, "wb")
259 self.assertEqual(f.write(a), n)
260 f.close()
261
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000262 def test_closefd(self):
263 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
264 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000265
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000266class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000268 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000269 buf = self.buftype("1234567890")
270 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000271
272 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000273 buf = self.buftype("1234567890")
274 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000275
276 self.assertEquals(buf[:1], bytesIo.read(1))
277 self.assertEquals(buf[1:5], bytesIo.read(4))
278 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000279 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000280
281 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000282 buf = self.buftype("1234567890")
283 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000284
285 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000286 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000287
288 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000289 buf = self.buftype("1234567890")
290 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000291
292 bytesIo.read(5)
293 bytesIo.seek(0)
294 self.assertEquals(buf, bytesIo.read())
295
296 bytesIo.seek(3)
297 self.assertEquals(buf[3:], bytesIo.read())
298
299 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000300 buf = self.buftype("1234567890")
301 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000302
303 self.assertEquals(0, bytesIo.tell())
304 bytesIo.seek(5)
305 self.assertEquals(5, bytesIo.tell())
306 bytesIo.seek(10000)
307 self.assertEquals(10000, bytesIo.tell())
308
Guido van Rossuma9e20242007-03-08 00:43:48 +0000309
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000310class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000311 @staticmethod
312 def buftype(s):
313 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000314 ioclass = io.BytesIO
315 EOF = b""
316
317
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000318class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000319 buftype = str
320 ioclass = io.StringIO
321 EOF = ""
322
323
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000324class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000325
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000326 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000327 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000328 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000329
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000330 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000331
Guido van Rossum78892e42007-04-06 17:31:18 +0000332 def testBuffering(self):
333 data = b"abcdefghi"
334 dlen = len(data)
335
336 tests = [
337 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
338 [ 100, [ 3, 3, 3], [ dlen ] ],
339 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
340 ]
341
342 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000343 rawio = MockFileIO(data)
344 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000345 pos = 0
346 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000347 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000348 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000349 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000350
Guido van Rossum01a27522007-03-07 01:00:12 +0000351 def testReadNonBlocking(self):
352 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000353 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000354 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000355
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000356 self.assertEquals(b"abcd", bufio.read(6))
357 self.assertEquals(b"e", bufio.read(1))
358 self.assertEquals(b"fg", bufio.read())
359 self.assert_(None is bufio.read())
360 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000361
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000362 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000363 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000364 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000365
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000366 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000367
368 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000369 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000370 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000371
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000372 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000373
374 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000375 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000376 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000377
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000378 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000379
380 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000381 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000382 # this test. Else, write it.
383 pass
384
Guido van Rossuma9e20242007-03-08 00:43:48 +0000385
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000386class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000387
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000388 def testWrite(self):
389 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000390 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000391 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000392
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000393 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000394
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000395 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000396
397 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000398 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000401 bufio.write(b"abc")
402 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000403
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000404 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000405
Guido van Rossum01a27522007-03-07 01:00:12 +0000406 def testWriteNonBlocking(self):
407 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000408 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000409
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000410 bufio.write(b"asdf")
411 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000412 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
413
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000414 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000415 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000416 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000417 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
418 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
419
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000420 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000421
Guido van Rossum78892e42007-04-06 17:31:18 +0000422 # XXX I don't like this test. It relies too heavily on how the
423 # algorithm actually works, which we might change. Refactor
424 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000425
426 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000427 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000428 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000429
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000430 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000431
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000432 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000433 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000434 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000435
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000436 bufio.write(b"abc")
437 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000438
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000439 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000440
Guido van Rossuma9e20242007-03-08 00:43:48 +0000441
Guido van Rossum01a27522007-03-07 01:00:12 +0000442class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000443
Guido van Rossum01a27522007-03-07 01:00:12 +0000444 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000445 r = MockRawIO(())
446 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000447 pair = io.BufferedRWPair(r, w)
448
449 # XXX need implementation
450
Guido van Rossuma9e20242007-03-08 00:43:48 +0000451
Guido van Rossum78892e42007-04-06 17:31:18 +0000452class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000453
Guido van Rossum01a27522007-03-07 01:00:12 +0000454 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000455 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000456 rw = io.BufferedRandom(raw, 8, 12)
457
458 self.assertEqual(b"as", rw.read(2))
459 rw.write(b"ddd")
460 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000461 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000462 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000463 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000464
465 def testSeekAndTell(self):
466 raw = io.BytesIO(b"asdfghjkl")
467 rw = io.BufferedRandom(raw)
468
469 self.assertEquals(b"as", rw.read(2))
470 self.assertEquals(2, rw.tell())
471 rw.seek(0, 0)
472 self.assertEquals(b"asdf", rw.read(4))
473
474 rw.write(b"asdf")
475 rw.seek(0, 0)
476 self.assertEquals(b"asdfasdfl", rw.read())
477 self.assertEquals(9, rw.tell())
478 rw.seek(-4, 2)
479 self.assertEquals(5, rw.tell())
480 rw.seek(2, 1)
481 self.assertEquals(7, rw.tell())
482 self.assertEquals(b"fl", rw.read(11))
483
Guido van Rossum78892e42007-04-06 17:31:18 +0000484
485class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000486
Guido van Rossumd0712812007-04-11 16:32:43 +0000487 def tearDown(self):
488 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000489
Guido van Rossum8358db22007-08-18 21:39:55 +0000490 def testNewlinesInput(self):
491 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
492 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
493 for newline, expected in [
494 (None, normalized.decode("ASCII").splitlines(True)),
495 ("", testdata.decode("ASCII").splitlines(True)),
496 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
497 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
498 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
499 ]:
500 buf = io.BytesIO(testdata)
501 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
502 self.assertEquals(txt.readlines(), expected)
503 txt.seek(0)
504 self.assertEquals(txt.read(), "".join(expected))
505
506 def testNewlinesOutput(self):
507 testdict = {
508 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
509 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
510 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
511 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
512 }
513 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
514 for newline, expected in tests:
515 buf = io.BytesIO()
516 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
517 txt.write("AAA\nB")
518 txt.write("BB\nCCC\n")
519 txt.write("X\rY\r\nZ")
520 txt.flush()
521 self.assertEquals(buf.getvalue(), expected)
522
Guido van Rossum78892e42007-04-06 17:31:18 +0000523 def testNewlines(self):
524 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
525
526 tests = [
527 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000528 [ '', input_lines ],
529 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
530 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
531 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000532 ]
533
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000534 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000535
Guido van Rossum8358db22007-08-18 21:39:55 +0000536 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000537 # character in TextIOWrapper._pending_line.
538 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000539 # XXX: str.encode() should return bytes
540 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000541 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000542 for bufsize in range(1, 10):
543 for newline, exp_lines in tests:
544 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000545 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000546 encoding=encoding)
547 if do_reads:
548 got_lines = []
549 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000550 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000551 if c2 == '':
552 break
553 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000554 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000555 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000556 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000557
558 for got_line, exp_line in zip(got_lines, exp_lines):
559 self.assertEquals(got_line, exp_line)
560 self.assertEquals(len(got_lines), len(exp_lines))
561
Guido van Rossum8358db22007-08-18 21:39:55 +0000562 def testNewlinesInput(self):
563 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
564 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
565 for newline, expected in [
566 (None, normalized.decode("ASCII").splitlines(True)),
567 ("", testdata.decode("ASCII").splitlines(True)),
568 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
569 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
570 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
571 ]:
572 buf = io.BytesIO(testdata)
573 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
574 self.assertEquals(txt.readlines(), expected)
575 txt.seek(0)
576 self.assertEquals(txt.read(), "".join(expected))
577
578 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000579 data = "AAA\nBBB\rCCC\n"
580 data_lf = b"AAA\nBBB\rCCC\n"
581 data_cr = b"AAA\rBBB\rCCC\r"
582 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000583 save_linesep = os.linesep
584 try:
585 for os.linesep, newline, expected in [
586 ("\n", None, data_lf),
587 ("\r\n", None, data_crlf),
588 ("\n", "", data_lf),
589 ("\r\n", "", data_lf),
590 ("\n", "\n", data_lf),
591 ("\r\n", "\n", data_lf),
592 ("\n", "\r", data_cr),
593 ("\r\n", "\r", data_cr),
594 ("\n", "\r\n", data_crlf),
595 ("\r\n", "\r\n", data_crlf),
596 ]:
597 buf = io.BytesIO()
598 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
599 txt.write(data)
600 txt.close()
601 self.assertEquals(buf.getvalue(), expected)
602 finally:
603 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000604
Guido van Rossum9b76da62007-04-11 01:09:03 +0000605 # Systematic tests of the text I/O API
606
607 def testBasicIO(self):
608 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
609 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
610 f = io.open(test_support.TESTFN, "w+", encoding=enc)
611 f._CHUNK_SIZE = chunksize
612 self.assertEquals(f.write("abc"), 3)
613 f.close()
614 f = io.open(test_support.TESTFN, "r+", encoding=enc)
615 f._CHUNK_SIZE = chunksize
616 self.assertEquals(f.tell(), 0)
617 self.assertEquals(f.read(), "abc")
618 cookie = f.tell()
619 self.assertEquals(f.seek(0), 0)
620 self.assertEquals(f.read(2), "ab")
621 self.assertEquals(f.read(1), "c")
622 self.assertEquals(f.read(1), "")
623 self.assertEquals(f.read(), "")
624 self.assertEquals(f.tell(), cookie)
625 self.assertEquals(f.seek(0), 0)
626 self.assertEquals(f.seek(0, 2), cookie)
627 self.assertEquals(f.write("def"), 3)
628 self.assertEquals(f.seek(cookie), cookie)
629 self.assertEquals(f.read(), "def")
630 if enc.startswith("utf"):
631 self.multi_line_test(f, enc)
632 f.close()
633
634 def multi_line_test(self, f, enc):
635 f.seek(0)
636 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000637 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000638 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000639 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000640 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000641 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000642 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000643 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000644 wlines.append((f.tell(), line))
645 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000646 f.seek(0)
647 rlines = []
648 while True:
649 pos = f.tell()
650 line = f.readline()
651 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000652 break
653 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000654 self.assertEquals(rlines, wlines)
655
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000656 def testTelling(self):
657 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
658 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000659 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000660 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000661 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000662 p2 = f.tell()
663 f.seek(0)
664 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000665 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000666 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000667 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000668 self.assertEquals(f.tell(), p2)
669 f.seek(0)
670 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000671 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000672 self.assertRaises(IOError, f.tell)
673 self.assertEquals(f.tell(), p2)
674 f.close()
675
Guido van Rossumd76e7792007-04-17 02:38:04 +0000676 def testSeeking(self):
677 chunk_size = io.TextIOWrapper._CHUNK_SIZE
678 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000679 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000680 prefix = bytes(u_prefix.encode("utf-8"))
681 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000682 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000683 suffix = bytes(u_suffix.encode("utf-8"))
684 line = prefix + suffix
685 f = io.open(test_support.TESTFN, "wb")
686 f.write(line*2)
687 f.close()
688 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
689 s = f.read(prefix_size)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000690 self.assertEquals(s, str(prefix))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000691 self.assertEquals(f.tell(), prefix_size)
692 self.assertEquals(f.readline(), u_suffix)
693
694 def testSeekingToo(self):
695 # Regression test for a specific bug
696 data = b'\xe0\xbf\xbf\n'
697 f = io.open(test_support.TESTFN, "wb")
698 f.write(data)
699 f.close()
700 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
701 f._CHUNK_SIZE # Just test that it exists
702 f._CHUNK_SIZE = 2
703 f.readline()
704 f.tell()
705
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000706 def timingTest(self):
707 timer = time.time
708 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000709 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000710 nlines = 10000
711 nchars = len(line)
712 nbytes = len(line.encode(enc))
713 for chunk_size in (32, 64, 128, 256):
714 f = io.open(test_support.TESTFN, "w+", encoding=enc)
715 f._CHUNK_SIZE = chunk_size
716 t0 = timer()
717 for i in range(nlines):
718 f.write(line)
719 f.flush()
720 t1 = timer()
721 f.seek(0)
722 for line in f:
723 pass
724 t2 = timer()
725 f.seek(0)
726 while f.readline():
727 pass
728 t3 = timer()
729 f.seek(0)
730 while f.readline():
731 f.tell()
732 t4 = timer()
733 f.close()
734 if test_support.verbose:
735 print("\nTiming test: %d lines of %d characters (%d bytes)" %
736 (nlines, nchars, nbytes))
737 print("File chunk size: %6s" % f._CHUNK_SIZE)
738 print("Writing: %6.3f seconds" % (t1-t0))
739 print("Reading using iteration: %6.3f seconds" % (t2-t1))
740 print("Reading using readline(): %6.3f seconds" % (t3-t2))
741 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
742
Guido van Rossum9b76da62007-04-11 01:09:03 +0000743
Guido van Rossum01a27522007-03-07 01:00:12 +0000744# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000745
Guido van Rossum5abbf752007-08-27 17:39:33 +0000746class MiscIOTest(unittest.TestCase):
747
748 def testImport__all__(self):
749 for name in io.__all__:
750 obj = getattr(io, name, None)
751 self.assert_(obj is not None, name)
752 if name == "open":
753 continue
754 elif "error" in name.lower():
755 self.assert_(issubclass(obj, Exception), name)
756 else:
757 self.assert_(issubclass(obj, io.IOBase))
758
759
Guido van Rossum28524c72007-02-27 05:47:44 +0000760def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000761 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
762 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000763 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +0000764 BufferedRandomTest, TextIOWrapperTest,
765 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000766
767if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000768 unittest.main()