blob: 6091e89497a8557d581bc1a17b2cacf9250d6d86 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Guido van Rossum53807da2007-04-10 19:01:47 +000011import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000012
Guido van Rossuma9e20242007-03-08 00:43:48 +000013
Guido van Rossum53807da2007-04-10 19:01:47 +000014class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000015
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000016 def __init__(self, read_stack=()):
17 self._read_stack = list(read_stack)
18 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000019
20 def read(self, n=None):
21 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000022 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000023 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000024 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000025
Guido van Rossum01a27522007-03-07 01:00:12 +000026 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000027 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000028 return len(b)
29
30 def writable(self):
31 return True
32
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 def fileno(self):
34 return 42
35
36 def readable(self):
37 return True
38
Guido van Rossum01a27522007-03-07 01:00:12 +000039 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000040 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seek(self, pos, whence):
43 pass
44
45 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000046 return 42
47
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum78892e42007-04-06 17:31:18 +000049class MockFileIO(io.BytesIO):
50
51 def __init__(self, data):
52 self.read_history = []
53 io.BytesIO.__init__(self, data)
54
55 def read(self, n=None):
56 res = io.BytesIO.read(self, n)
57 self.read_history.append(None if res is None else len(res))
58 return res
59
60
Guido van Rossum01a27522007-03-07 01:00:12 +000061class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000062
Guido van Rossum53807da2007-04-10 19:01:47 +000063 def __init__(self, blocking_script):
64 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000065 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000068 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000069 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000070 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000071 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000072 else:
73 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000074
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def writable(self):
76 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossuma9e20242007-03-08 00:43:48 +000078
Guido van Rossum28524c72007-02-27 05:47:44 +000079class IOTest(unittest.TestCase):
80
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000081 def tearDown(self):
82 test_support.unlink(test_support.TESTFN)
83
Guido van Rossum28524c72007-02-27 05:47:44 +000084 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000085 self.assertEqual(f.write(b"blah."), 5)
86 self.assertEqual(f.seek(0), 0)
87 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000088 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000090 self.assertEqual(f.tell(), 5)
Guido van Rossum98297ee2007-11-06 21:34:58 +000091 self.assertEqual(f.write(buffer(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000092 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000093 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000094 self.assertEqual(f.seek(-1, 2), 13)
95 self.assertEqual(f.tell(), 13)
96 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000097 self.assertEqual(f.tell(), 13)
Guido van Rossum28524c72007-02-27 05:47:44 +000098
Guido van Rossum9b76da62007-04-11 01:09:03 +000099 def read_ops(self, f, buffered=False):
100 data = f.read(5)
101 self.assertEqual(data, b"hello")
Guido van Rossum98297ee2007-11-06 21:34:58 +0000102 data = buffer(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000103 self.assertEqual(f.readinto(data), 5)
104 self.assertEqual(data, b" worl")
105 self.assertEqual(f.readinto(data), 2)
106 self.assertEqual(len(data), 5)
107 self.assertEqual(data[:2], b"d\n")
108 self.assertEqual(f.seek(0), 0)
109 self.assertEqual(f.read(20), b"hello world\n")
110 self.assertEqual(f.read(1), b"")
Guido van Rossum98297ee2007-11-06 21:34:58 +0000111 self.assertEqual(f.readinto(buffer(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000112 self.assertEqual(f.seek(-6, 2), 6)
113 self.assertEqual(f.read(5), b"world")
114 self.assertEqual(f.read(0), b"")
Guido van Rossum98297ee2007-11-06 21:34:58 +0000115 self.assertEqual(f.readinto(buffer()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000116 self.assertEqual(f.seek(-6, 1), 5)
117 self.assertEqual(f.read(5), b" worl")
118 self.assertEqual(f.tell(), 10)
119 if buffered:
120 f.seek(0)
121 self.assertEqual(f.read(), b"hello world\n")
122 f.seek(6)
123 self.assertEqual(f.read(), b"world\n")
124 self.assertEqual(f.read(), b"")
125
Guido van Rossum34d69e52007-04-10 20:08:41 +0000126 LARGE = 2**31
127
Guido van Rossum53807da2007-04-10 19:01:47 +0000128 def large_file_ops(self, f):
129 assert f.readable()
130 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000131 self.assertEqual(f.seek(self.LARGE), self.LARGE)
132 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000133 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 self.assertEqual(f.tell(), self.LARGE + 3)
135 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000136 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000137 self.assertEqual(f.tell(), self.LARGE + 2)
138 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000139 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000140 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000141 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
142 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000143 self.assertEqual(f.read(2), b"x")
144
Guido van Rossum28524c72007-02-27 05:47:44 +0000145 def test_raw_file_io(self):
146 f = io.open(test_support.TESTFN, "wb", buffering=0)
147 self.assertEqual(f.readable(), False)
148 self.assertEqual(f.writable(), True)
149 self.assertEqual(f.seekable(), True)
150 self.write_ops(f)
151 f.close()
152 f = io.open(test_support.TESTFN, "rb", buffering=0)
153 self.assertEqual(f.readable(), True)
154 self.assertEqual(f.writable(), False)
155 self.assertEqual(f.seekable(), True)
156 self.read_ops(f)
157 f.close()
158
Guido van Rossum87429772007-04-10 21:06:59 +0000159 def test_buffered_file_io(self):
160 f = io.open(test_support.TESTFN, "wb")
161 self.assertEqual(f.readable(), False)
162 self.assertEqual(f.writable(), True)
163 self.assertEqual(f.seekable(), True)
164 self.write_ops(f)
165 f.close()
166 f = io.open(test_support.TESTFN, "rb")
167 self.assertEqual(f.readable(), True)
168 self.assertEqual(f.writable(), False)
169 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000170 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000171 f.close()
172
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000173 def test_readline(self):
174 f = io.open(test_support.TESTFN, "wb")
175 f.write(b"abc\ndef\nxyzzy\nfoo")
176 f.close()
177 f = io.open(test_support.TESTFN, "rb")
178 self.assertEqual(f.readline(), b"abc\n")
179 self.assertEqual(f.readline(10), b"def\n")
180 self.assertEqual(f.readline(2), b"xy")
181 self.assertEqual(f.readline(4), b"zzy\n")
182 self.assertEqual(f.readline(), b"foo")
183 f.close()
184
Guido van Rossum28524c72007-02-27 05:47:44 +0000185 def test_raw_bytes_io(self):
186 f = io.BytesIO()
187 self.write_ops(f)
188 data = f.getvalue()
189 self.assertEqual(data, b"hello world\n")
190 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000191 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000192
Guido van Rossum53807da2007-04-10 19:01:47 +0000193 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000194 # On Windows and Mac OSX this test comsumes large resources; It takes
195 # a long time to build the >2GB file and takes >2GB of disk space
196 # therefore the resource must be enabled to run this test.
197 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
198 if not test_support.is_resource_enabled("largefile"):
199 print("\nTesting large file ops skipped on %s." % sys.platform,
200 file=sys.stderr)
201 print("It requires %d bytes and a long time." % self.LARGE,
202 file=sys.stderr)
203 print("Use 'regrtest.py -u largefile test_io' to run it.",
204 file=sys.stderr)
205 return
Guido van Rossum87429772007-04-10 21:06:59 +0000206 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000207 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000208 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000209 f = io.open(test_support.TESTFN, "w+b")
210 self.large_file_ops(f)
211 f.close()
212
213 def test_with_open(self):
214 for bufsize in (0, 1, 100):
215 f = None
216 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000217 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000218 self.assertEqual(f.closed, True)
219 f = None
220 try:
221 with open(test_support.TESTFN, "wb", bufsize) as f:
222 1/0
223 except ZeroDivisionError:
224 self.assertEqual(f.closed, True)
225 else:
226 self.fail("1/0 didn't raise an exception")
227
228 def test_destructor(self):
229 record = []
230 class MyFileIO(io.FileIO):
231 def __del__(self):
232 record.append(1)
233 io.FileIO.__del__(self)
234 def close(self):
235 record.append(2)
236 io.FileIO.close(self)
237 def flush(self):
238 record.append(3)
239 io.FileIO.flush(self)
240 f = MyFileIO(test_support.TESTFN, "w")
241 f.write("xxx")
242 del f
243 self.assertEqual(record, [1, 2, 3])
244
245 def test_close_flushes(self):
246 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000247 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000248 f.close()
249 f = io.open(test_support.TESTFN, "rb")
250 self.assertEqual(f.read(), b"xxx")
251 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000252
Guido van Rossumd4103952007-04-12 05:44:49 +0000253 def test_array_writes(self):
254 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000255 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000256 f = io.open(test_support.TESTFN, "wb", 0)
257 self.assertEqual(f.write(a), n)
258 f.close()
259 f = io.open(test_support.TESTFN, "wb")
260 self.assertEqual(f.write(a), n)
261 f.close()
262
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000263 def test_closefd(self):
264 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
265 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000267class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000268
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000269 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000270 buf = self.buftype("1234567890")
271 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000272
273 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000274 buf = self.buftype("1234567890")
275 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000276
277 self.assertEquals(buf[:1], bytesIo.read(1))
278 self.assertEquals(buf[1:5], bytesIo.read(4))
279 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000280 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000281
282 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000283 buf = self.buftype("1234567890")
284 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000285
286 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000287 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288
289 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000290 buf = self.buftype("1234567890")
291 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000292
293 bytesIo.read(5)
294 bytesIo.seek(0)
295 self.assertEquals(buf, bytesIo.read())
296
297 bytesIo.seek(3)
298 self.assertEquals(buf[3:], bytesIo.read())
299
300 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000301 buf = self.buftype("1234567890")
302 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000303
304 self.assertEquals(0, bytesIo.tell())
305 bytesIo.seek(5)
306 self.assertEquals(5, bytesIo.tell())
307 bytesIo.seek(10000)
308 self.assertEquals(10000, bytesIo.tell())
309
Guido van Rossuma9e20242007-03-08 00:43:48 +0000310
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000311class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000312 @staticmethod
313 def buftype(s):
314 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000315 ioclass = io.BytesIO
316 EOF = b""
317
318
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000319class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000320 buftype = str
321 ioclass = io.StringIO
322 EOF = ""
323
324
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000325class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000326
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000327 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000328 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000329 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000330
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000331 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000332
Guido van Rossum78892e42007-04-06 17:31:18 +0000333 def testBuffering(self):
334 data = b"abcdefghi"
335 dlen = len(data)
336
337 tests = [
338 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
339 [ 100, [ 3, 3, 3], [ dlen ] ],
340 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
341 ]
342
343 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000344 rawio = MockFileIO(data)
345 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000346 pos = 0
347 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000348 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000349 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000350 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000351
Guido van Rossum01a27522007-03-07 01:00:12 +0000352 def testReadNonBlocking(self):
353 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000354 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000355 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000356
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000357 self.assertEquals(b"abcd", bufio.read(6))
358 self.assertEquals(b"e", bufio.read(1))
359 self.assertEquals(b"fg", bufio.read())
360 self.assert_(None is bufio.read())
361 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000362
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000363 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000364 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000365 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000366
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000367 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000368
369 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000370 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000371 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000372
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000373 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000374
375 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000376 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000377 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000378
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000379 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000380
381 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000382 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000383 # this test. Else, write it.
384 pass
385
Guido van Rossuma9e20242007-03-08 00:43:48 +0000386
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000388
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000389 def testWrite(self):
390 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000391 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000392 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000393
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000394 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000395
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000396 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000397
398 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000399 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000400 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000401
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000402 bufio.write(b"abc")
403 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000406
Guido van Rossum01a27522007-03-07 01:00:12 +0000407 def testWriteNonBlocking(self):
408 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000410
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000411 bufio.write(b"asdf")
412 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000413 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
414
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000415 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000416 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000417 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000418 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
419 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
420
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000421 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000422
Guido van Rossum78892e42007-04-06 17:31:18 +0000423 # XXX I don't like this test. It relies too heavily on how the
424 # algorithm actually works, which we might change. Refactor
425 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000426
427 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000428 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000429 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000430
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000431 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000432
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000433 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000434 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000435 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000436
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000437 bufio.write(b"abc")
438 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000439
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000440 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000441
Guido van Rossuma9e20242007-03-08 00:43:48 +0000442
Guido van Rossum01a27522007-03-07 01:00:12 +0000443class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000444
Guido van Rossum01a27522007-03-07 01:00:12 +0000445 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000446 r = MockRawIO(())
447 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000448 pair = io.BufferedRWPair(r, w)
449
450 # XXX need implementation
451
Guido van Rossuma9e20242007-03-08 00:43:48 +0000452
Guido van Rossum78892e42007-04-06 17:31:18 +0000453class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000454
Guido van Rossum01a27522007-03-07 01:00:12 +0000455 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000456 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000457 rw = io.BufferedRandom(raw, 8, 12)
458
459 self.assertEqual(b"as", rw.read(2))
460 rw.write(b"ddd")
461 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000462 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000463 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000464 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000465
466 def testSeekAndTell(self):
467 raw = io.BytesIO(b"asdfghjkl")
468 rw = io.BufferedRandom(raw)
469
470 self.assertEquals(b"as", rw.read(2))
471 self.assertEquals(2, rw.tell())
472 rw.seek(0, 0)
473 self.assertEquals(b"asdf", rw.read(4))
474
475 rw.write(b"asdf")
476 rw.seek(0, 0)
477 self.assertEquals(b"asdfasdfl", rw.read())
478 self.assertEquals(9, rw.tell())
479 rw.seek(-4, 2)
480 self.assertEquals(5, rw.tell())
481 rw.seek(2, 1)
482 self.assertEquals(7, rw.tell())
483 self.assertEquals(b"fl", rw.read(11))
484
Guido van Rossum78892e42007-04-06 17:31:18 +0000485
486class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000487
Guido van Rossumd0712812007-04-11 16:32:43 +0000488 def tearDown(self):
489 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000490
Guido van Rossum8358db22007-08-18 21:39:55 +0000491 def testNewlinesInput(self):
492 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
493 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
494 for newline, expected in [
495 (None, normalized.decode("ASCII").splitlines(True)),
496 ("", testdata.decode("ASCII").splitlines(True)),
497 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
498 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
499 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
500 ]:
501 buf = io.BytesIO(testdata)
502 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
503 self.assertEquals(txt.readlines(), expected)
504 txt.seek(0)
505 self.assertEquals(txt.read(), "".join(expected))
506
507 def testNewlinesOutput(self):
508 testdict = {
509 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
510 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
511 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
512 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
513 }
514 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
515 for newline, expected in tests:
516 buf = io.BytesIO()
517 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
518 txt.write("AAA\nB")
519 txt.write("BB\nCCC\n")
520 txt.write("X\rY\r\nZ")
521 txt.flush()
522 self.assertEquals(buf.getvalue(), expected)
523
Guido van Rossum78892e42007-04-06 17:31:18 +0000524 def testNewlines(self):
525 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
526
527 tests = [
528 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000529 [ '', input_lines ],
530 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
531 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
532 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000533 ]
534
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000535 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000536
Guido van Rossum8358db22007-08-18 21:39:55 +0000537 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000538 # character in TextIOWrapper._pending_line.
539 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000540 # XXX: str.encode() should return bytes
541 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000542 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000543 for bufsize in range(1, 10):
544 for newline, exp_lines in tests:
545 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000546 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000547 encoding=encoding)
548 if do_reads:
549 got_lines = []
550 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000551 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000552 if c2 == '':
553 break
554 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000555 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000556 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000557 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000558
559 for got_line, exp_line in zip(got_lines, exp_lines):
560 self.assertEquals(got_line, exp_line)
561 self.assertEquals(len(got_lines), len(exp_lines))
562
Guido van Rossum8358db22007-08-18 21:39:55 +0000563 def testNewlinesInput(self):
564 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
565 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
566 for newline, expected in [
567 (None, normalized.decode("ASCII").splitlines(True)),
568 ("", testdata.decode("ASCII").splitlines(True)),
569 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
570 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
571 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
572 ]:
573 buf = io.BytesIO(testdata)
574 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
575 self.assertEquals(txt.readlines(), expected)
576 txt.seek(0)
577 self.assertEquals(txt.read(), "".join(expected))
578
579 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000580 data = "AAA\nBBB\rCCC\n"
581 data_lf = b"AAA\nBBB\rCCC\n"
582 data_cr = b"AAA\rBBB\rCCC\r"
583 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000584 save_linesep = os.linesep
585 try:
586 for os.linesep, newline, expected in [
587 ("\n", None, data_lf),
588 ("\r\n", None, data_crlf),
589 ("\n", "", data_lf),
590 ("\r\n", "", data_lf),
591 ("\n", "\n", data_lf),
592 ("\r\n", "\n", data_lf),
593 ("\n", "\r", data_cr),
594 ("\r\n", "\r", data_cr),
595 ("\n", "\r\n", data_crlf),
596 ("\r\n", "\r\n", data_crlf),
597 ]:
598 buf = io.BytesIO()
599 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
600 txt.write(data)
601 txt.close()
602 self.assertEquals(buf.getvalue(), expected)
603 finally:
604 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000605
Guido van Rossum9b76da62007-04-11 01:09:03 +0000606 # Systematic tests of the text I/O API
607
608 def testBasicIO(self):
609 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
610 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
611 f = io.open(test_support.TESTFN, "w+", encoding=enc)
612 f._CHUNK_SIZE = chunksize
613 self.assertEquals(f.write("abc"), 3)
614 f.close()
615 f = io.open(test_support.TESTFN, "r+", encoding=enc)
616 f._CHUNK_SIZE = chunksize
617 self.assertEquals(f.tell(), 0)
618 self.assertEquals(f.read(), "abc")
619 cookie = f.tell()
620 self.assertEquals(f.seek(0), 0)
621 self.assertEquals(f.read(2), "ab")
622 self.assertEquals(f.read(1), "c")
623 self.assertEquals(f.read(1), "")
624 self.assertEquals(f.read(), "")
625 self.assertEquals(f.tell(), cookie)
626 self.assertEquals(f.seek(0), 0)
627 self.assertEquals(f.seek(0, 2), cookie)
628 self.assertEquals(f.write("def"), 3)
629 self.assertEquals(f.seek(cookie), cookie)
630 self.assertEquals(f.read(), "def")
631 if enc.startswith("utf"):
632 self.multi_line_test(f, enc)
633 f.close()
634
635 def multi_line_test(self, f, enc):
636 f.seek(0)
637 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000638 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000639 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000640 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000641 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000642 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000643 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000644 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000645 wlines.append((f.tell(), line))
646 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000647 f.seek(0)
648 rlines = []
649 while True:
650 pos = f.tell()
651 line = f.readline()
652 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000653 break
654 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000655 self.assertEquals(rlines, wlines)
656
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000657 def testTelling(self):
658 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
659 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000660 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000661 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000662 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000663 p2 = f.tell()
664 f.seek(0)
665 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000666 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000667 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000668 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000669 self.assertEquals(f.tell(), p2)
670 f.seek(0)
671 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000672 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000673 self.assertRaises(IOError, f.tell)
674 self.assertEquals(f.tell(), p2)
675 f.close()
676
Guido van Rossumd76e7792007-04-17 02:38:04 +0000677 def testSeeking(self):
678 chunk_size = io.TextIOWrapper._CHUNK_SIZE
679 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000680 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000681 prefix = bytes(u_prefix.encode("utf-8"))
682 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000683 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000684 suffix = bytes(u_suffix.encode("utf-8"))
685 line = prefix + suffix
686 f = io.open(test_support.TESTFN, "wb")
687 f.write(line*2)
688 f.close()
689 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
690 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000691 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000692 self.assertEquals(f.tell(), prefix_size)
693 self.assertEquals(f.readline(), u_suffix)
694
695 def testSeekingToo(self):
696 # Regression test for a specific bug
697 data = b'\xe0\xbf\xbf\n'
698 f = io.open(test_support.TESTFN, "wb")
699 f.write(data)
700 f.close()
701 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
702 f._CHUNK_SIZE # Just test that it exists
703 f._CHUNK_SIZE = 2
704 f.readline()
705 f.tell()
706
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000707 def timingTest(self):
708 timer = time.time
709 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000710 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000711 nlines = 10000
712 nchars = len(line)
713 nbytes = len(line.encode(enc))
714 for chunk_size in (32, 64, 128, 256):
715 f = io.open(test_support.TESTFN, "w+", encoding=enc)
716 f._CHUNK_SIZE = chunk_size
717 t0 = timer()
718 for i in range(nlines):
719 f.write(line)
720 f.flush()
721 t1 = timer()
722 f.seek(0)
723 for line in f:
724 pass
725 t2 = timer()
726 f.seek(0)
727 while f.readline():
728 pass
729 t3 = timer()
730 f.seek(0)
731 while f.readline():
732 f.tell()
733 t4 = timer()
734 f.close()
735 if test_support.verbose:
736 print("\nTiming test: %d lines of %d characters (%d bytes)" %
737 (nlines, nchars, nbytes))
738 print("File chunk size: %6s" % f._CHUNK_SIZE)
739 print("Writing: %6.3f seconds" % (t1-t0))
740 print("Reading using iteration: %6.3f seconds" % (t2-t1))
741 print("Reading using readline(): %6.3f seconds" % (t3-t2))
742 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
743
Guido van Rossum9b76da62007-04-11 01:09:03 +0000744
Guido van Rossum01a27522007-03-07 01:00:12 +0000745# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000746
Guido van Rossum5abbf752007-08-27 17:39:33 +0000747class MiscIOTest(unittest.TestCase):
748
749 def testImport__all__(self):
750 for name in io.__all__:
751 obj = getattr(io, name, None)
752 self.assert_(obj is not None, name)
753 if name == "open":
754 continue
755 elif "error" in name.lower():
756 self.assert_(issubclass(obj, Exception), name)
757 else:
758 self.assert_(issubclass(obj, io.IOBase))
759
760
Guido van Rossum28524c72007-02-27 05:47:44 +0000761def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000762 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
763 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000764 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +0000765 BufferedRandomTest, TextIOWrapperTest,
766 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000767
768if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000769 unittest.main()