blob: 92e1567cffbaedf8400637a3527312c894a8eb22 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Guido van Rossum53807da2007-04-10 19:01:47 +000011import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000012
Guido van Rossuma9e20242007-03-08 00:43:48 +000013
Guido van Rossum53807da2007-04-10 19:01:47 +000014class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000015
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000016 def __init__(self, read_stack=()):
17 self._read_stack = list(read_stack)
18 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000019
20 def read(self, n=None):
21 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000022 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000023 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000024 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000025
Guido van Rossum01a27522007-03-07 01:00:12 +000026 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000027 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000028 return len(b)
29
30 def writable(self):
31 return True
32
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 def fileno(self):
34 return 42
35
36 def readable(self):
37 return True
38
Guido van Rossum01a27522007-03-07 01:00:12 +000039 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000040 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seek(self, pos, whence):
43 pass
44
45 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000046 return 42
47
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum78892e42007-04-06 17:31:18 +000049class MockFileIO(io.BytesIO):
50
51 def __init__(self, data):
52 self.read_history = []
53 io.BytesIO.__init__(self, data)
54
55 def read(self, n=None):
56 res = io.BytesIO.read(self, n)
57 self.read_history.append(None if res is None else len(res))
58 return res
59
60
Guido van Rossum01a27522007-03-07 01:00:12 +000061class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000062
Guido van Rossum53807da2007-04-10 19:01:47 +000063 def __init__(self, blocking_script):
64 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000065 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000068 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000069 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000070 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000071 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000072 else:
73 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000074
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def writable(self):
76 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossuma9e20242007-03-08 00:43:48 +000078
Guido van Rossum28524c72007-02-27 05:47:44 +000079class IOTest(unittest.TestCase):
80
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000081 def tearDown(self):
82 test_support.unlink(test_support.TESTFN)
83
Guido van Rossum28524c72007-02-27 05:47:44 +000084 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000085 self.assertEqual(f.write(b"blah."), 5)
86 self.assertEqual(f.seek(0), 0)
87 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000088 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000090 self.assertEqual(f.tell(), 5)
Guido van Rossum2b08b382007-05-08 20:18:39 +000091 self.assertEqual(f.write(str8(" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000092 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000093 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000094 self.assertEqual(f.seek(-1, 2), 13)
95 self.assertEqual(f.tell(), 13)
96 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000097 self.assertEqual(f.tell(), 13)
Guido van Rossum28524c72007-02-27 05:47:44 +000098
Guido van Rossum9b76da62007-04-11 01:09:03 +000099 def read_ops(self, f, buffered=False):
100 data = f.read(5)
101 self.assertEqual(data, b"hello")
102 self.assertEqual(f.readinto(data), 5)
103 self.assertEqual(data, b" worl")
104 self.assertEqual(f.readinto(data), 2)
105 self.assertEqual(len(data), 5)
106 self.assertEqual(data[:2], b"d\n")
107 self.assertEqual(f.seek(0), 0)
108 self.assertEqual(f.read(20), b"hello world\n")
109 self.assertEqual(f.read(1), b"")
110 self.assertEqual(f.readinto(b"x"), 0)
111 self.assertEqual(f.seek(-6, 2), 6)
112 self.assertEqual(f.read(5), b"world")
113 self.assertEqual(f.read(0), b"")
114 self.assertEqual(f.readinto(b""), 0)
115 self.assertEqual(f.seek(-6, 1), 5)
116 self.assertEqual(f.read(5), b" worl")
117 self.assertEqual(f.tell(), 10)
118 if buffered:
119 f.seek(0)
120 self.assertEqual(f.read(), b"hello world\n")
121 f.seek(6)
122 self.assertEqual(f.read(), b"world\n")
123 self.assertEqual(f.read(), b"")
124
Guido van Rossum34d69e52007-04-10 20:08:41 +0000125 LARGE = 2**31
126
Guido van Rossum53807da2007-04-10 19:01:47 +0000127 def large_file_ops(self, f):
128 assert f.readable()
129 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000130 self.assertEqual(f.seek(self.LARGE), self.LARGE)
131 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000132 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000133 self.assertEqual(f.tell(), self.LARGE + 3)
134 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000135 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000136 self.assertEqual(f.tell(), self.LARGE + 2)
137 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000138 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000139 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000140 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
141 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000142 self.assertEqual(f.read(2), b"x")
143
Guido van Rossum28524c72007-02-27 05:47:44 +0000144 def test_raw_file_io(self):
145 f = io.open(test_support.TESTFN, "wb", buffering=0)
146 self.assertEqual(f.readable(), False)
147 self.assertEqual(f.writable(), True)
148 self.assertEqual(f.seekable(), True)
149 self.write_ops(f)
150 f.close()
151 f = io.open(test_support.TESTFN, "rb", buffering=0)
152 self.assertEqual(f.readable(), True)
153 self.assertEqual(f.writable(), False)
154 self.assertEqual(f.seekable(), True)
155 self.read_ops(f)
156 f.close()
157
Guido van Rossum87429772007-04-10 21:06:59 +0000158 def test_buffered_file_io(self):
159 f = io.open(test_support.TESTFN, "wb")
160 self.assertEqual(f.readable(), False)
161 self.assertEqual(f.writable(), True)
162 self.assertEqual(f.seekable(), True)
163 self.write_ops(f)
164 f.close()
165 f = io.open(test_support.TESTFN, "rb")
166 self.assertEqual(f.readable(), True)
167 self.assertEqual(f.writable(), False)
168 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000169 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000170 f.close()
171
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000172 def test_readline(self):
173 f = io.open(test_support.TESTFN, "wb")
174 f.write(b"abc\ndef\nxyzzy\nfoo")
175 f.close()
176 f = io.open(test_support.TESTFN, "rb")
177 self.assertEqual(f.readline(), b"abc\n")
178 self.assertEqual(f.readline(10), b"def\n")
179 self.assertEqual(f.readline(2), b"xy")
180 self.assertEqual(f.readline(4), b"zzy\n")
181 self.assertEqual(f.readline(), b"foo")
182 f.close()
183
Guido van Rossum28524c72007-02-27 05:47:44 +0000184 def test_raw_bytes_io(self):
185 f = io.BytesIO()
186 self.write_ops(f)
187 data = f.getvalue()
188 self.assertEqual(data, b"hello world\n")
189 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000190 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000191
Guido van Rossum53807da2007-04-10 19:01:47 +0000192 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000193 # On Windows and Mac OSX this test comsumes large resources; It takes
194 # a long time to build the >2GB file and takes >2GB of disk space
195 # therefore the resource must be enabled to run this test.
196 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
197 if not test_support.is_resource_enabled("largefile"):
198 print("\nTesting large file ops skipped on %s." % sys.platform,
199 file=sys.stderr)
200 print("It requires %d bytes and a long time." % self.LARGE,
201 file=sys.stderr)
202 print("Use 'regrtest.py -u largefile test_io' to run it.",
203 file=sys.stderr)
204 return
Guido van Rossum87429772007-04-10 21:06:59 +0000205 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000206 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000207 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000208 f = io.open(test_support.TESTFN, "w+b")
209 self.large_file_ops(f)
210 f.close()
211
212 def test_with_open(self):
213 for bufsize in (0, 1, 100):
214 f = None
215 with open(test_support.TESTFN, "wb", bufsize) as f:
216 f.write("xxx")
217 self.assertEqual(f.closed, True)
218 f = None
219 try:
220 with open(test_support.TESTFN, "wb", bufsize) as f:
221 1/0
222 except ZeroDivisionError:
223 self.assertEqual(f.closed, True)
224 else:
225 self.fail("1/0 didn't raise an exception")
226
227 def test_destructor(self):
228 record = []
229 class MyFileIO(io.FileIO):
230 def __del__(self):
231 record.append(1)
232 io.FileIO.__del__(self)
233 def close(self):
234 record.append(2)
235 io.FileIO.close(self)
236 def flush(self):
237 record.append(3)
238 io.FileIO.flush(self)
239 f = MyFileIO(test_support.TESTFN, "w")
240 f.write("xxx")
241 del f
242 self.assertEqual(record, [1, 2, 3])
243
244 def test_close_flushes(self):
245 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000246 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000247 f.close()
248 f = io.open(test_support.TESTFN, "rb")
249 self.assertEqual(f.read(), b"xxx")
250 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000251
Guido van Rossumd4103952007-04-12 05:44:49 +0000252 def test_array_writes(self):
253 a = array.array('i', range(10))
254 n = len(buffer(a))
255 f = io.open(test_support.TESTFN, "wb", 0)
256 self.assertEqual(f.write(a), n)
257 f.close()
258 f = io.open(test_support.TESTFN, "wb")
259 self.assertEqual(f.write(a), n)
260 f.close()
261
Guido van Rossuma9e20242007-03-08 00:43:48 +0000262
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000263class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000264
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000265 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000266 buf = self.buftype("1234567890")
267 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000268
269 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000270 buf = self.buftype("1234567890")
271 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000272
273 self.assertEquals(buf[:1], bytesIo.read(1))
274 self.assertEquals(buf[1:5], bytesIo.read(4))
275 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000276 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000277
278 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000279 buf = self.buftype("1234567890")
280 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000281
282 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000283 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000284
285 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000286 buf = self.buftype("1234567890")
287 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288
289 bytesIo.read(5)
290 bytesIo.seek(0)
291 self.assertEquals(buf, bytesIo.read())
292
293 bytesIo.seek(3)
294 self.assertEquals(buf[3:], bytesIo.read())
295
296 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000297 buf = self.buftype("1234567890")
298 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000299
300 self.assertEquals(0, bytesIo.tell())
301 bytesIo.seek(5)
302 self.assertEquals(5, bytesIo.tell())
303 bytesIo.seek(10000)
304 self.assertEquals(10000, bytesIo.tell())
305
Guido van Rossuma9e20242007-03-08 00:43:48 +0000306
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000307class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000308 buftype = bytes
309 ioclass = io.BytesIO
310 EOF = b""
311
312
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000313class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000314 buftype = str
315 ioclass = io.StringIO
316 EOF = ""
317
318
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000319class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000320
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000321 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000322 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000323 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000324
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000325 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000326
Guido van Rossum78892e42007-04-06 17:31:18 +0000327 def testBuffering(self):
328 data = b"abcdefghi"
329 dlen = len(data)
330
331 tests = [
332 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
333 [ 100, [ 3, 3, 3], [ dlen ] ],
334 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
335 ]
336
337 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000338 rawio = MockFileIO(data)
339 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000340 pos = 0
341 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000342 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000343 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000344 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000345
Guido van Rossum01a27522007-03-07 01:00:12 +0000346 def testReadNonBlocking(self):
347 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000348 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000349 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000350
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351 self.assertEquals(b"abcd", bufio.read(6))
352 self.assertEquals(b"e", bufio.read(1))
353 self.assertEquals(b"fg", bufio.read())
354 self.assert_(None is bufio.read())
355 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000356
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000357 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000359 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000360
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000362
363 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000364 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000365 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000366
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000367 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000368
369 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000370 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000371 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000372
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000373 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000374
375 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000376 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000377 # this test. Else, write it.
378 pass
379
Guido van Rossuma9e20242007-03-08 00:43:48 +0000380
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000381class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000382
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000383 def testWrite(self):
384 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000385 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000386 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000388 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000389
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000390 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000391
392 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000393 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000394 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000395
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000396 bufio.write(b"abc")
397 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000398
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
Guido van Rossum01a27522007-03-07 01:00:12 +0000401 def testWriteNonBlocking(self):
402 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000404
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 bufio.write(b"asdf")
406 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000407 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
408
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000410 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000411 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000412 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
413 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
414
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000415 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000416
Guido van Rossum78892e42007-04-06 17:31:18 +0000417 # XXX I don't like this test. It relies too heavily on how the
418 # algorithm actually works, which we might change. Refactor
419 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000420
421 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000422 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000423 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000424
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000425 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000426
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000427 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000428 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000429 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000430
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000431 bufio.write(b"abc")
432 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000433
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000434 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000435
Guido van Rossuma9e20242007-03-08 00:43:48 +0000436
Guido van Rossum01a27522007-03-07 01:00:12 +0000437class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000438
Guido van Rossum01a27522007-03-07 01:00:12 +0000439 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000440 r = MockRawIO(())
441 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000442 pair = io.BufferedRWPair(r, w)
443
444 # XXX need implementation
445
Guido van Rossuma9e20242007-03-08 00:43:48 +0000446
Guido van Rossum78892e42007-04-06 17:31:18 +0000447class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000448
Guido van Rossum01a27522007-03-07 01:00:12 +0000449 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000450 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000451 rw = io.BufferedRandom(raw, 8, 12)
452
453 self.assertEqual(b"as", rw.read(2))
454 rw.write(b"ddd")
455 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000456 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000457 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000458 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000459
460 def testSeekAndTell(self):
461 raw = io.BytesIO(b"asdfghjkl")
462 rw = io.BufferedRandom(raw)
463
464 self.assertEquals(b"as", rw.read(2))
465 self.assertEquals(2, rw.tell())
466 rw.seek(0, 0)
467 self.assertEquals(b"asdf", rw.read(4))
468
469 rw.write(b"asdf")
470 rw.seek(0, 0)
471 self.assertEquals(b"asdfasdfl", rw.read())
472 self.assertEquals(9, rw.tell())
473 rw.seek(-4, 2)
474 self.assertEquals(5, rw.tell())
475 rw.seek(2, 1)
476 self.assertEquals(7, rw.tell())
477 self.assertEquals(b"fl", rw.read(11))
478
Guido van Rossum78892e42007-04-06 17:31:18 +0000479
480class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000481
Guido van Rossumd0712812007-04-11 16:32:43 +0000482 def tearDown(self):
483 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000484
Guido van Rossum8358db22007-08-18 21:39:55 +0000485 def testNewlinesInput(self):
486 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
487 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
488 for newline, expected in [
489 (None, normalized.decode("ASCII").splitlines(True)),
490 ("", testdata.decode("ASCII").splitlines(True)),
491 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
492 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
493 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
494 ]:
495 buf = io.BytesIO(testdata)
496 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
497 self.assertEquals(txt.readlines(), expected)
498 txt.seek(0)
499 self.assertEquals(txt.read(), "".join(expected))
500
501 def testNewlinesOutput(self):
502 testdict = {
503 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
504 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
505 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
506 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
507 }
508 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
509 for newline, expected in tests:
510 buf = io.BytesIO()
511 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
512 txt.write("AAA\nB")
513 txt.write("BB\nCCC\n")
514 txt.write("X\rY\r\nZ")
515 txt.flush()
516 self.assertEquals(buf.getvalue(), expected)
517
Guido van Rossum78892e42007-04-06 17:31:18 +0000518 def testNewlines(self):
519 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
520
521 tests = [
522 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000523 [ '', input_lines ],
524 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
525 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
526 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000527 ]
528
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000529 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000530
Guido van Rossum8358db22007-08-18 21:39:55 +0000531 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000532 # character in TextIOWrapper._pending_line.
533 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000534 # XXX: str.encode() should return bytes
535 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000536 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000537 for bufsize in range(1, 10):
538 for newline, exp_lines in tests:
539 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000540 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000541 encoding=encoding)
542 if do_reads:
543 got_lines = []
544 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000545 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000546 if c2 == '':
547 break
548 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000549 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000550 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000551 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000552
553 for got_line, exp_line in zip(got_lines, exp_lines):
554 self.assertEquals(got_line, exp_line)
555 self.assertEquals(len(got_lines), len(exp_lines))
556
Guido van Rossum8358db22007-08-18 21:39:55 +0000557 def testNewlinesInput(self):
558 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
559 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
560 for newline, expected in [
561 (None, normalized.decode("ASCII").splitlines(True)),
562 ("", testdata.decode("ASCII").splitlines(True)),
563 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
564 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
565 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
566 ]:
567 buf = io.BytesIO(testdata)
568 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
569 self.assertEquals(txt.readlines(), expected)
570 txt.seek(0)
571 self.assertEquals(txt.read(), "".join(expected))
572
573 def testNewlinesOutput(self):
574 import os
575 orig_linesep = os.linesep
576 data = "AAA\nBBB\rCCC\n"
577 data_lf = b"AAA\nBBB\rCCC\n"
578 data_cr = b"AAA\rBBB\rCCC\r"
579 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
580 for os.linesep, newline, expected in [
581 ("\n", None, data_lf),
582 ("\r\n", None, data_crlf),
583 ("\n", "", data_lf),
584 ("\r\n", "", data_lf),
585 ("\n", "\n", data_lf),
586 ("\r\n", "\n", data_lf),
587 ("\n", "\r", data_cr),
588 ("\r\n", "\r", data_cr),
589 ("\n", "\r\n", data_crlf),
590 ("\r\n", "\r\n", data_crlf),
591 ]:
592 buf = io.BytesIO()
593 txt = io.TextIOWrapper(buf, encoding="ASCII", newline=newline)
594 txt.write(data)
595 txt.close()
596 self.assertEquals(buf.getvalue(), expected)
597
Guido van Rossum9b76da62007-04-11 01:09:03 +0000598 # Systematic tests of the text I/O API
599
600 def testBasicIO(self):
601 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
602 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
603 f = io.open(test_support.TESTFN, "w+", encoding=enc)
604 f._CHUNK_SIZE = chunksize
605 self.assertEquals(f.write("abc"), 3)
606 f.close()
607 f = io.open(test_support.TESTFN, "r+", encoding=enc)
608 f._CHUNK_SIZE = chunksize
609 self.assertEquals(f.tell(), 0)
610 self.assertEquals(f.read(), "abc")
611 cookie = f.tell()
612 self.assertEquals(f.seek(0), 0)
613 self.assertEquals(f.read(2), "ab")
614 self.assertEquals(f.read(1), "c")
615 self.assertEquals(f.read(1), "")
616 self.assertEquals(f.read(), "")
617 self.assertEquals(f.tell(), cookie)
618 self.assertEquals(f.seek(0), 0)
619 self.assertEquals(f.seek(0, 2), cookie)
620 self.assertEquals(f.write("def"), 3)
621 self.assertEquals(f.seek(cookie), cookie)
622 self.assertEquals(f.read(), "def")
623 if enc.startswith("utf"):
624 self.multi_line_test(f, enc)
625 f.close()
626
627 def multi_line_test(self, f, enc):
628 f.seek(0)
629 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000630 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000631 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000632 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000633 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000634 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000635 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000636 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000637 wlines.append((f.tell(), line))
638 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000639 f.seek(0)
640 rlines = []
641 while True:
642 pos = f.tell()
643 line = f.readline()
644 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000645 break
646 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000647 self.assertEquals(rlines, wlines)
648
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000649 def testTelling(self):
650 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
651 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000652 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000653 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000654 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000655 p2 = f.tell()
656 f.seek(0)
657 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000658 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000659 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000660 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000661 self.assertEquals(f.tell(), p2)
662 f.seek(0)
663 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000664 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000665 self.assertRaises(IOError, f.tell)
666 self.assertEquals(f.tell(), p2)
667 f.close()
668
Guido van Rossumd76e7792007-04-17 02:38:04 +0000669 def testSeeking(self):
670 chunk_size = io.TextIOWrapper._CHUNK_SIZE
671 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000672 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000673 prefix = bytes(u_prefix.encode("utf-8"))
674 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000675 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000676 suffix = bytes(u_suffix.encode("utf-8"))
677 line = prefix + suffix
678 f = io.open(test_support.TESTFN, "wb")
679 f.write(line*2)
680 f.close()
681 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
682 s = f.read(prefix_size)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000683 self.assertEquals(s, str(prefix))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000684 self.assertEquals(f.tell(), prefix_size)
685 self.assertEquals(f.readline(), u_suffix)
686
687 def testSeekingToo(self):
688 # Regression test for a specific bug
689 data = b'\xe0\xbf\xbf\n'
690 f = io.open(test_support.TESTFN, "wb")
691 f.write(data)
692 f.close()
693 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
694 f._CHUNK_SIZE # Just test that it exists
695 f._CHUNK_SIZE = 2
696 f.readline()
697 f.tell()
698
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000699 def timingTest(self):
700 timer = time.time
701 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000702 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000703 nlines = 10000
704 nchars = len(line)
705 nbytes = len(line.encode(enc))
706 for chunk_size in (32, 64, 128, 256):
707 f = io.open(test_support.TESTFN, "w+", encoding=enc)
708 f._CHUNK_SIZE = chunk_size
709 t0 = timer()
710 for i in range(nlines):
711 f.write(line)
712 f.flush()
713 t1 = timer()
714 f.seek(0)
715 for line in f:
716 pass
717 t2 = timer()
718 f.seek(0)
719 while f.readline():
720 pass
721 t3 = timer()
722 f.seek(0)
723 while f.readline():
724 f.tell()
725 t4 = timer()
726 f.close()
727 if test_support.verbose:
728 print("\nTiming test: %d lines of %d characters (%d bytes)" %
729 (nlines, nchars, nbytes))
730 print("File chunk size: %6s" % f._CHUNK_SIZE)
731 print("Writing: %6.3f seconds" % (t1-t0))
732 print("Reading using iteration: %6.3f seconds" % (t2-t1))
733 print("Reading using readline(): %6.3f seconds" % (t3-t2))
734 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
735
Guido van Rossum9b76da62007-04-11 01:09:03 +0000736
Guido van Rossum01a27522007-03-07 01:00:12 +0000737# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000738
Guido van Rossum28524c72007-02-27 05:47:44 +0000739def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000740 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
741 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000742 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum78892e42007-04-06 17:31:18 +0000743 BufferedRandomTest, TextIOWrapperTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000744
745if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000746 unittest.main()