blob: 644593dd2ff7f90f6b3496750ca3c0315062fc9f [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Guido van Rossum53807da2007-04-10 19:01:47 +000011import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000012
Guido van Rossuma9e20242007-03-08 00:43:48 +000013
Guido van Rossum53807da2007-04-10 19:01:47 +000014class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000015
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000016 def __init__(self, read_stack=()):
17 self._read_stack = list(read_stack)
18 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000019
20 def read(self, n=None):
21 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000022 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000023 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000024 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000025
Guido van Rossum01a27522007-03-07 01:00:12 +000026 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000027 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000028 return len(b)
29
30 def writable(self):
31 return True
32
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 def fileno(self):
34 return 42
35
36 def readable(self):
37 return True
38
Guido van Rossum01a27522007-03-07 01:00:12 +000039 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000040 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seek(self, pos, whence):
43 pass
44
45 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000046 return 42
47
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum78892e42007-04-06 17:31:18 +000049class MockFileIO(io.BytesIO):
50
51 def __init__(self, data):
52 self.read_history = []
53 io.BytesIO.__init__(self, data)
54
55 def read(self, n=None):
56 res = io.BytesIO.read(self, n)
57 self.read_history.append(None if res is None else len(res))
58 return res
59
60
Guido van Rossum01a27522007-03-07 01:00:12 +000061class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000062
Guido van Rossum53807da2007-04-10 19:01:47 +000063 def __init__(self, blocking_script):
64 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000065 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000068 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000069 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000070 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000071 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000072 else:
73 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000074
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def writable(self):
76 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossuma9e20242007-03-08 00:43:48 +000078
Guido van Rossum28524c72007-02-27 05:47:44 +000079class IOTest(unittest.TestCase):
80
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000081 def tearDown(self):
82 test_support.unlink(test_support.TESTFN)
83
Guido van Rossum28524c72007-02-27 05:47:44 +000084 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000085 self.assertEqual(f.write(b"blah."), 5)
86 self.assertEqual(f.seek(0), 0)
87 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000088 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000090 self.assertEqual(f.tell(), 5)
Guido van Rossum2b08b382007-05-08 20:18:39 +000091 self.assertEqual(f.write(str8(" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000092 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000093 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000094 self.assertEqual(f.seek(-1, 2), 13)
95 self.assertEqual(f.tell(), 13)
96 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000097 self.assertEqual(f.tell(), 13)
Guido van Rossum28524c72007-02-27 05:47:44 +000098
Guido van Rossum9b76da62007-04-11 01:09:03 +000099 def read_ops(self, f, buffered=False):
100 data = f.read(5)
101 self.assertEqual(data, b"hello")
102 self.assertEqual(f.readinto(data), 5)
103 self.assertEqual(data, b" worl")
104 self.assertEqual(f.readinto(data), 2)
105 self.assertEqual(len(data), 5)
106 self.assertEqual(data[:2], b"d\n")
107 self.assertEqual(f.seek(0), 0)
108 self.assertEqual(f.read(20), b"hello world\n")
109 self.assertEqual(f.read(1), b"")
110 self.assertEqual(f.readinto(b"x"), 0)
111 self.assertEqual(f.seek(-6, 2), 6)
112 self.assertEqual(f.read(5), b"world")
113 self.assertEqual(f.read(0), b"")
114 self.assertEqual(f.readinto(b""), 0)
115 self.assertEqual(f.seek(-6, 1), 5)
116 self.assertEqual(f.read(5), b" worl")
117 self.assertEqual(f.tell(), 10)
118 if buffered:
119 f.seek(0)
120 self.assertEqual(f.read(), b"hello world\n")
121 f.seek(6)
122 self.assertEqual(f.read(), b"world\n")
123 self.assertEqual(f.read(), b"")
124
Guido van Rossum34d69e52007-04-10 20:08:41 +0000125 LARGE = 2**31
126
Guido van Rossum53807da2007-04-10 19:01:47 +0000127 def large_file_ops(self, f):
128 assert f.readable()
129 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000130 self.assertEqual(f.seek(self.LARGE), self.LARGE)
131 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000132 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000133 self.assertEqual(f.tell(), self.LARGE + 3)
134 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000135 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000136 self.assertEqual(f.tell(), self.LARGE + 2)
137 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000138 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000139 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000140 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
141 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000142 self.assertEqual(f.read(2), b"x")
143
Guido van Rossum28524c72007-02-27 05:47:44 +0000144 def test_raw_file_io(self):
145 f = io.open(test_support.TESTFN, "wb", buffering=0)
146 self.assertEqual(f.readable(), False)
147 self.assertEqual(f.writable(), True)
148 self.assertEqual(f.seekable(), True)
149 self.write_ops(f)
150 f.close()
151 f = io.open(test_support.TESTFN, "rb", buffering=0)
152 self.assertEqual(f.readable(), True)
153 self.assertEqual(f.writable(), False)
154 self.assertEqual(f.seekable(), True)
155 self.read_ops(f)
156 f.close()
157
Guido van Rossum87429772007-04-10 21:06:59 +0000158 def test_buffered_file_io(self):
159 f = io.open(test_support.TESTFN, "wb")
160 self.assertEqual(f.readable(), False)
161 self.assertEqual(f.writable(), True)
162 self.assertEqual(f.seekable(), True)
163 self.write_ops(f)
164 f.close()
165 f = io.open(test_support.TESTFN, "rb")
166 self.assertEqual(f.readable(), True)
167 self.assertEqual(f.writable(), False)
168 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000169 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000170 f.close()
171
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000172 def test_readline(self):
173 f = io.open(test_support.TESTFN, "wb")
174 f.write(b"abc\ndef\nxyzzy\nfoo")
175 f.close()
176 f = io.open(test_support.TESTFN, "rb")
177 self.assertEqual(f.readline(), b"abc\n")
178 self.assertEqual(f.readline(10), b"def\n")
179 self.assertEqual(f.readline(2), b"xy")
180 self.assertEqual(f.readline(4), b"zzy\n")
181 self.assertEqual(f.readline(), b"foo")
182 f.close()
183
Guido van Rossum28524c72007-02-27 05:47:44 +0000184 def test_raw_bytes_io(self):
185 f = io.BytesIO()
186 self.write_ops(f)
187 data = f.getvalue()
188 self.assertEqual(data, b"hello world\n")
189 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000190 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000191
Guido van Rossum53807da2007-04-10 19:01:47 +0000192 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000193 # On Windows and Mac OSX this test comsumes large resources; It takes
194 # a long time to build the >2GB file and takes >2GB of disk space
195 # therefore the resource must be enabled to run this test.
196 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
197 if not test_support.is_resource_enabled("largefile"):
198 print("\nTesting large file ops skipped on %s." % sys.platform,
199 file=sys.stderr)
200 print("It requires %d bytes and a long time." % self.LARGE,
201 file=sys.stderr)
202 print("Use 'regrtest.py -u largefile test_io' to run it.",
203 file=sys.stderr)
204 return
Guido van Rossum87429772007-04-10 21:06:59 +0000205 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000206 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000208 f = io.open(test_support.TESTFN, "w+b")
209 self.large_file_ops(f)
210 f.close()
211
212 def test_with_open(self):
213 for bufsize in (0, 1, 100):
214 f = None
215 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000216 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000217 self.assertEqual(f.closed, True)
218 f = None
219 try:
220 with open(test_support.TESTFN, "wb", bufsize) as f:
221 1/0
222 except ZeroDivisionError:
223 self.assertEqual(f.closed, True)
224 else:
225 self.fail("1/0 didn't raise an exception")
226
227 def test_destructor(self):
228 record = []
229 class MyFileIO(io.FileIO):
230 def __del__(self):
231 record.append(1)
232 io.FileIO.__del__(self)
233 def close(self):
234 record.append(2)
235 io.FileIO.close(self)
236 def flush(self):
237 record.append(3)
238 io.FileIO.flush(self)
239 f = MyFileIO(test_support.TESTFN, "w")
240 f.write("xxx")
241 del f
242 self.assertEqual(record, [1, 2, 3])
243
244 def test_close_flushes(self):
245 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000246 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000247 f.close()
248 f = io.open(test_support.TESTFN, "rb")
249 self.assertEqual(f.read(), b"xxx")
250 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000251
Guido van Rossumd4103952007-04-12 05:44:49 +0000252 def test_array_writes(self):
253 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000254 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000255 f = io.open(test_support.TESTFN, "wb", 0)
256 self.assertEqual(f.write(a), n)
257 f.close()
258 f = io.open(test_support.TESTFN, "wb")
259 self.assertEqual(f.write(a), n)
260 f.close()
261
Guido van Rossuma9e20242007-03-08 00:43:48 +0000262
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000263class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000264
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000265 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000266 buf = self.buftype("1234567890")
267 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000268
269 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000270 buf = self.buftype("1234567890")
271 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000272
273 self.assertEquals(buf[:1], bytesIo.read(1))
274 self.assertEquals(buf[1:5], bytesIo.read(4))
275 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000276 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000277
278 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000279 buf = self.buftype("1234567890")
280 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000281
282 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000283 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000284
285 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000286 buf = self.buftype("1234567890")
287 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288
289 bytesIo.read(5)
290 bytesIo.seek(0)
291 self.assertEquals(buf, bytesIo.read())
292
293 bytesIo.seek(3)
294 self.assertEquals(buf[3:], bytesIo.read())
295
296 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000297 buf = self.buftype("1234567890")
298 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000299
300 self.assertEquals(0, bytesIo.tell())
301 bytesIo.seek(5)
302 self.assertEquals(5, bytesIo.tell())
303 bytesIo.seek(10000)
304 self.assertEquals(10000, bytesIo.tell())
305
Guido van Rossuma9e20242007-03-08 00:43:48 +0000306
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000307class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000308 @staticmethod
309 def buftype(s):
310 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000311 ioclass = io.BytesIO
312 EOF = b""
313
314
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000315class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000316 buftype = str
317 ioclass = io.StringIO
318 EOF = ""
319
320
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000321class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000322
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000323 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000325 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000326
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000327 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000328
Guido van Rossum78892e42007-04-06 17:31:18 +0000329 def testBuffering(self):
330 data = b"abcdefghi"
331 dlen = len(data)
332
333 tests = [
334 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
335 [ 100, [ 3, 3, 3], [ dlen ] ],
336 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
337 ]
338
339 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000340 rawio = MockFileIO(data)
341 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000342 pos = 0
343 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000344 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000345 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000346 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000347
Guido van Rossum01a27522007-03-07 01:00:12 +0000348 def testReadNonBlocking(self):
349 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000350 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000352
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000353 self.assertEquals(b"abcd", bufio.read(6))
354 self.assertEquals(b"e", bufio.read(1))
355 self.assertEquals(b"fg", bufio.read())
356 self.assert_(None is bufio.read())
357 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000358
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000359 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000360 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000362
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000363 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000364
365 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000366 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000367 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000368
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000369 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370
371 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000372 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000373 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000374
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000375 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000376
377 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000378 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000379 # this test. Else, write it.
380 pass
381
Guido van Rossuma9e20242007-03-08 00:43:48 +0000382
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000383class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000384
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000385 def testWrite(self):
386 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000387 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000388 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000389
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000390 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000391
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000392 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000393
394 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000396 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000397
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000398 bufio.write(b"abc")
399 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000401 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000402
Guido van Rossum01a27522007-03-07 01:00:12 +0000403 def testWriteNonBlocking(self):
404 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000406
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000407 bufio.write(b"asdf")
408 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000409 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
410
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000411 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000412 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000413 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000414 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
415 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
416
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000417 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000418
Guido van Rossum78892e42007-04-06 17:31:18 +0000419 # XXX I don't like this test. It relies too heavily on how the
420 # algorithm actually works, which we might change. Refactor
421 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000422
423 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000424 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000425 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000426
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000427 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000428
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000429 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000430 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000431 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000432
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000433 bufio.write(b"abc")
434 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000435
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000436 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000437
Guido van Rossuma9e20242007-03-08 00:43:48 +0000438
Guido van Rossum01a27522007-03-07 01:00:12 +0000439class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000440
Guido van Rossum01a27522007-03-07 01:00:12 +0000441 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000442 r = MockRawIO(())
443 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000444 pair = io.BufferedRWPair(r, w)
445
446 # XXX need implementation
447
Guido van Rossuma9e20242007-03-08 00:43:48 +0000448
Guido van Rossum78892e42007-04-06 17:31:18 +0000449class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000450
Guido van Rossum01a27522007-03-07 01:00:12 +0000451 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000452 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000453 rw = io.BufferedRandom(raw, 8, 12)
454
455 self.assertEqual(b"as", rw.read(2))
456 rw.write(b"ddd")
457 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000458 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000459 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000460 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000461
462 def testSeekAndTell(self):
463 raw = io.BytesIO(b"asdfghjkl")
464 rw = io.BufferedRandom(raw)
465
466 self.assertEquals(b"as", rw.read(2))
467 self.assertEquals(2, rw.tell())
468 rw.seek(0, 0)
469 self.assertEquals(b"asdf", rw.read(4))
470
471 rw.write(b"asdf")
472 rw.seek(0, 0)
473 self.assertEquals(b"asdfasdfl", rw.read())
474 self.assertEquals(9, rw.tell())
475 rw.seek(-4, 2)
476 self.assertEquals(5, rw.tell())
477 rw.seek(2, 1)
478 self.assertEquals(7, rw.tell())
479 self.assertEquals(b"fl", rw.read(11))
480
Guido van Rossum78892e42007-04-06 17:31:18 +0000481
482class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000483
Guido van Rossumd0712812007-04-11 16:32:43 +0000484 def tearDown(self):
485 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000486
Guido van Rossum8358db22007-08-18 21:39:55 +0000487 def testNewlinesInput(self):
488 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
489 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
490 for newline, expected in [
491 (None, normalized.decode("ASCII").splitlines(True)),
492 ("", testdata.decode("ASCII").splitlines(True)),
493 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
494 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
495 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
496 ]:
497 buf = io.BytesIO(testdata)
498 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
499 self.assertEquals(txt.readlines(), expected)
500 txt.seek(0)
501 self.assertEquals(txt.read(), "".join(expected))
502
503 def testNewlinesOutput(self):
504 testdict = {
505 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
506 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
507 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
508 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
509 }
510 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
511 for newline, expected in tests:
512 buf = io.BytesIO()
513 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
514 txt.write("AAA\nB")
515 txt.write("BB\nCCC\n")
516 txt.write("X\rY\r\nZ")
517 txt.flush()
518 self.assertEquals(buf.getvalue(), expected)
519
Guido van Rossum78892e42007-04-06 17:31:18 +0000520 def testNewlines(self):
521 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
522
523 tests = [
524 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000525 [ '', input_lines ],
526 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
527 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
528 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000529 ]
530
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000531 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000532
Guido van Rossum8358db22007-08-18 21:39:55 +0000533 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000534 # character in TextIOWrapper._pending_line.
535 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000536 # XXX: str.encode() should return bytes
537 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000538 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000539 for bufsize in range(1, 10):
540 for newline, exp_lines in tests:
541 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000542 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000543 encoding=encoding)
544 if do_reads:
545 got_lines = []
546 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000547 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000548 if c2 == '':
549 break
550 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000551 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000552 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000553 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000554
555 for got_line, exp_line in zip(got_lines, exp_lines):
556 self.assertEquals(got_line, exp_line)
557 self.assertEquals(len(got_lines), len(exp_lines))
558
Guido van Rossum8358db22007-08-18 21:39:55 +0000559 def testNewlinesInput(self):
560 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
561 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
562 for newline, expected in [
563 (None, normalized.decode("ASCII").splitlines(True)),
564 ("", testdata.decode("ASCII").splitlines(True)),
565 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
566 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
567 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
568 ]:
569 buf = io.BytesIO(testdata)
570 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
571 self.assertEquals(txt.readlines(), expected)
572 txt.seek(0)
573 self.assertEquals(txt.read(), "".join(expected))
574
575 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000576 data = "AAA\nBBB\rCCC\n"
577 data_lf = b"AAA\nBBB\rCCC\n"
578 data_cr = b"AAA\rBBB\rCCC\r"
579 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000580 save_linesep = os.linesep
581 try:
582 for os.linesep, newline, expected in [
583 ("\n", None, data_lf),
584 ("\r\n", None, data_crlf),
585 ("\n", "", data_lf),
586 ("\r\n", "", data_lf),
587 ("\n", "\n", data_lf),
588 ("\r\n", "\n", data_lf),
589 ("\n", "\r", data_cr),
590 ("\r\n", "\r", data_cr),
591 ("\n", "\r\n", data_crlf),
592 ("\r\n", "\r\n", data_crlf),
593 ]:
594 buf = io.BytesIO()
595 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
596 txt.write(data)
597 txt.close()
598 self.assertEquals(buf.getvalue(), expected)
599 finally:
600 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000601
Guido van Rossum9b76da62007-04-11 01:09:03 +0000602 # Systematic tests of the text I/O API
603
604 def testBasicIO(self):
605 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
606 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
607 f = io.open(test_support.TESTFN, "w+", encoding=enc)
608 f._CHUNK_SIZE = chunksize
609 self.assertEquals(f.write("abc"), 3)
610 f.close()
611 f = io.open(test_support.TESTFN, "r+", encoding=enc)
612 f._CHUNK_SIZE = chunksize
613 self.assertEquals(f.tell(), 0)
614 self.assertEquals(f.read(), "abc")
615 cookie = f.tell()
616 self.assertEquals(f.seek(0), 0)
617 self.assertEquals(f.read(2), "ab")
618 self.assertEquals(f.read(1), "c")
619 self.assertEquals(f.read(1), "")
620 self.assertEquals(f.read(), "")
621 self.assertEquals(f.tell(), cookie)
622 self.assertEquals(f.seek(0), 0)
623 self.assertEquals(f.seek(0, 2), cookie)
624 self.assertEquals(f.write("def"), 3)
625 self.assertEquals(f.seek(cookie), cookie)
626 self.assertEquals(f.read(), "def")
627 if enc.startswith("utf"):
628 self.multi_line_test(f, enc)
629 f.close()
630
631 def multi_line_test(self, f, enc):
632 f.seek(0)
633 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000634 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000635 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000636 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000637 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000638 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000639 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000640 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000641 wlines.append((f.tell(), line))
642 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000643 f.seek(0)
644 rlines = []
645 while True:
646 pos = f.tell()
647 line = f.readline()
648 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000649 break
650 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000651 self.assertEquals(rlines, wlines)
652
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000653 def testTelling(self):
654 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
655 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000656 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000657 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000658 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000659 p2 = f.tell()
660 f.seek(0)
661 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000662 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000663 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000664 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000665 self.assertEquals(f.tell(), p2)
666 f.seek(0)
667 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000668 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000669 self.assertRaises(IOError, f.tell)
670 self.assertEquals(f.tell(), p2)
671 f.close()
672
Guido van Rossumd76e7792007-04-17 02:38:04 +0000673 def testSeeking(self):
674 chunk_size = io.TextIOWrapper._CHUNK_SIZE
675 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000676 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000677 prefix = bytes(u_prefix.encode("utf-8"))
678 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000679 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000680 suffix = bytes(u_suffix.encode("utf-8"))
681 line = prefix + suffix
682 f = io.open(test_support.TESTFN, "wb")
683 f.write(line*2)
684 f.close()
685 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
686 s = f.read(prefix_size)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000687 self.assertEquals(s, str(prefix))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000688 self.assertEquals(f.tell(), prefix_size)
689 self.assertEquals(f.readline(), u_suffix)
690
691 def testSeekingToo(self):
692 # Regression test for a specific bug
693 data = b'\xe0\xbf\xbf\n'
694 f = io.open(test_support.TESTFN, "wb")
695 f.write(data)
696 f.close()
697 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
698 f._CHUNK_SIZE # Just test that it exists
699 f._CHUNK_SIZE = 2
700 f.readline()
701 f.tell()
702
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000703 def timingTest(self):
704 timer = time.time
705 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000706 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000707 nlines = 10000
708 nchars = len(line)
709 nbytes = len(line.encode(enc))
710 for chunk_size in (32, 64, 128, 256):
711 f = io.open(test_support.TESTFN, "w+", encoding=enc)
712 f._CHUNK_SIZE = chunk_size
713 t0 = timer()
714 for i in range(nlines):
715 f.write(line)
716 f.flush()
717 t1 = timer()
718 f.seek(0)
719 for line in f:
720 pass
721 t2 = timer()
722 f.seek(0)
723 while f.readline():
724 pass
725 t3 = timer()
726 f.seek(0)
727 while f.readline():
728 f.tell()
729 t4 = timer()
730 f.close()
731 if test_support.verbose:
732 print("\nTiming test: %d lines of %d characters (%d bytes)" %
733 (nlines, nchars, nbytes))
734 print("File chunk size: %6s" % f._CHUNK_SIZE)
735 print("Writing: %6.3f seconds" % (t1-t0))
736 print("Reading using iteration: %6.3f seconds" % (t2-t1))
737 print("Reading using readline(): %6.3f seconds" % (t3-t2))
738 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
739
Guido van Rossum9b76da62007-04-11 01:09:03 +0000740
Guido van Rossum01a27522007-03-07 01:00:12 +0000741# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000742
Guido van Rossum5abbf752007-08-27 17:39:33 +0000743class MiscIOTest(unittest.TestCase):
744
745 def testImport__all__(self):
746 for name in io.__all__:
747 obj = getattr(io, name, None)
748 self.assert_(obj is not None, name)
749 if name == "open":
750 continue
751 elif "error" in name.lower():
752 self.assert_(issubclass(obj, Exception), name)
753 else:
754 self.assert_(issubclass(obj, io.IOBase))
755
756
Guido van Rossum28524c72007-02-27 05:47:44 +0000757def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000758 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
759 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000760 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +0000761 BufferedRandomTest, TextIOWrapperTest,
762 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000763
764if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000765 unittest.main()