blob: 7a2bdb2377d903ed2a3adb7c42b42b9ea9ac168d [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum34d69e52007-04-10 20:08:41 +00003import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00004import time
Guido van Rossum28524c72007-02-27 05:47:44 +00005import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00006from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00007from test import test_support
8
Guido van Rossum53807da2007-04-10 19:01:47 +00009import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000010
Guido van Rossuma9e20242007-03-08 00:43:48 +000011
Guido van Rossum53807da2007-04-10 19:01:47 +000012class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000013
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000014 def __init__(self, read_stack=()):
15 self._read_stack = list(read_stack)
16 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000017
18 def read(self, n=None):
19 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000020 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000022 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000023
Guido van Rossum01a27522007-03-07 01:00:12 +000024 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000025 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000026 return len(b)
27
28 def writable(self):
29 return True
30
Guido van Rossum68bbcd22007-02-27 17:19:33 +000031 def fileno(self):
32 return 42
33
34 def readable(self):
35 return True
36
Guido van Rossum01a27522007-03-07 01:00:12 +000037 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000038 return True
39
Guido van Rossum01a27522007-03-07 01:00:12 +000040 def seek(self, pos, whence):
41 pass
42
43 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000044 return 42
45
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Guido van Rossum78892e42007-04-06 17:31:18 +000047class MockFileIO(io.BytesIO):
48
49 def __init__(self, data):
50 self.read_history = []
51 io.BytesIO.__init__(self, data)
52
53 def read(self, n=None):
54 res = io.BytesIO.read(self, n)
55 self.read_history.append(None if res is None else len(res))
56 return res
57
58
Guido van Rossum01a27522007-03-07 01:00:12 +000059class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000060
Guido van Rossum53807da2007-04-10 19:01:47 +000061 def __init__(self, blocking_script):
62 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000063 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000064
Guido van Rossum01a27522007-03-07 01:00:12 +000065 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000066 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000067 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000068 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000069 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000070 else:
71 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000072
Guido van Rossum01a27522007-03-07 01:00:12 +000073 def writable(self):
74 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000075
Guido van Rossuma9e20242007-03-08 00:43:48 +000076
Guido van Rossum28524c72007-02-27 05:47:44 +000077class IOTest(unittest.TestCase):
78
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000079 def tearDown(self):
80 test_support.unlink(test_support.TESTFN)
81
Guido van Rossum28524c72007-02-27 05:47:44 +000082 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000083 self.assertEqual(f.write(b"blah."), 5)
84 self.assertEqual(f.seek(0), 0)
85 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000086 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000087 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000088 self.assertEqual(f.tell(), 5)
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.write(" world\n\n\n"), 9)
90 self.assertEqual(f.seek(0), 0)
91 self.assertEqual(f.write("h"), 1)
92 self.assertEqual(f.seek(-1, 2), 13)
93 self.assertEqual(f.tell(), 13)
94 self.assertEqual(f.truncate(12), 12)
95 self.assertEqual(f.tell(), 12)
Guido van Rossum28524c72007-02-27 05:47:44 +000096
Guido van Rossum9b76da62007-04-11 01:09:03 +000097 def read_ops(self, f, buffered=False):
98 data = f.read(5)
99 self.assertEqual(data, b"hello")
100 self.assertEqual(f.readinto(data), 5)
101 self.assertEqual(data, b" worl")
102 self.assertEqual(f.readinto(data), 2)
103 self.assertEqual(len(data), 5)
104 self.assertEqual(data[:2], b"d\n")
105 self.assertEqual(f.seek(0), 0)
106 self.assertEqual(f.read(20), b"hello world\n")
107 self.assertEqual(f.read(1), b"")
108 self.assertEqual(f.readinto(b"x"), 0)
109 self.assertEqual(f.seek(-6, 2), 6)
110 self.assertEqual(f.read(5), b"world")
111 self.assertEqual(f.read(0), b"")
112 self.assertEqual(f.readinto(b""), 0)
113 self.assertEqual(f.seek(-6, 1), 5)
114 self.assertEqual(f.read(5), b" worl")
115 self.assertEqual(f.tell(), 10)
116 if buffered:
117 f.seek(0)
118 self.assertEqual(f.read(), b"hello world\n")
119 f.seek(6)
120 self.assertEqual(f.read(), b"world\n")
121 self.assertEqual(f.read(), b"")
122
Guido van Rossum34d69e52007-04-10 20:08:41 +0000123 LARGE = 2**31
124
Guido van Rossum53807da2007-04-10 19:01:47 +0000125 def large_file_ops(self, f):
126 assert f.readable()
127 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000128 self.assertEqual(f.seek(self.LARGE), self.LARGE)
129 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000130 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000131 self.assertEqual(f.tell(), self.LARGE + 3)
132 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000133 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 self.assertEqual(f.tell(), self.LARGE + 2)
135 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000136 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000137 self.assertEqual(f.tell(), self.LARGE + 1)
138 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
139 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000140 self.assertEqual(f.read(2), b"x")
141
Guido van Rossum28524c72007-02-27 05:47:44 +0000142 def test_raw_file_io(self):
143 f = io.open(test_support.TESTFN, "wb", buffering=0)
144 self.assertEqual(f.readable(), False)
145 self.assertEqual(f.writable(), True)
146 self.assertEqual(f.seekable(), True)
147 self.write_ops(f)
148 f.close()
149 f = io.open(test_support.TESTFN, "rb", buffering=0)
150 self.assertEqual(f.readable(), True)
151 self.assertEqual(f.writable(), False)
152 self.assertEqual(f.seekable(), True)
153 self.read_ops(f)
154 f.close()
155
Guido van Rossum87429772007-04-10 21:06:59 +0000156 def test_buffered_file_io(self):
157 f = io.open(test_support.TESTFN, "wb")
158 self.assertEqual(f.readable(), False)
159 self.assertEqual(f.writable(), True)
160 self.assertEqual(f.seekable(), True)
161 self.write_ops(f)
162 f.close()
163 f = io.open(test_support.TESTFN, "rb")
164 self.assertEqual(f.readable(), True)
165 self.assertEqual(f.writable(), False)
166 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000167 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000168 f.close()
169
Guido van Rossum28524c72007-02-27 05:47:44 +0000170 def test_raw_bytes_io(self):
171 f = io.BytesIO()
172 self.write_ops(f)
173 data = f.getvalue()
174 self.assertEqual(data, b"hello world\n")
175 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000176 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000177
Guido van Rossum53807da2007-04-10 19:01:47 +0000178 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000179 # On Windows and Mac OSX this test comsumes large resources; It takes
180 # a long time to build the >2GB file and takes >2GB of disk space
181 # therefore the resource must be enabled to run this test.
182 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
183 if not test_support.is_resource_enabled("largefile"):
184 print("\nTesting large file ops skipped on %s." % sys.platform,
185 file=sys.stderr)
186 print("It requires %d bytes and a long time." % self.LARGE,
187 file=sys.stderr)
188 print("Use 'regrtest.py -u largefile test_io' to run it.",
189 file=sys.stderr)
190 return
Guido van Rossum87429772007-04-10 21:06:59 +0000191 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000192 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000193 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000194 f = io.open(test_support.TESTFN, "w+b")
195 self.large_file_ops(f)
196 f.close()
197
198 def test_with_open(self):
199 for bufsize in (0, 1, 100):
200 f = None
201 with open(test_support.TESTFN, "wb", bufsize) as f:
202 f.write("xxx")
203 self.assertEqual(f.closed, True)
204 f = None
205 try:
206 with open(test_support.TESTFN, "wb", bufsize) as f:
207 1/0
208 except ZeroDivisionError:
209 self.assertEqual(f.closed, True)
210 else:
211 self.fail("1/0 didn't raise an exception")
212
213 def test_destructor(self):
214 record = []
215 class MyFileIO(io.FileIO):
216 def __del__(self):
217 record.append(1)
218 io.FileIO.__del__(self)
219 def close(self):
220 record.append(2)
221 io.FileIO.close(self)
222 def flush(self):
223 record.append(3)
224 io.FileIO.flush(self)
225 f = MyFileIO(test_support.TESTFN, "w")
226 f.write("xxx")
227 del f
228 self.assertEqual(record, [1, 2, 3])
229
230 def test_close_flushes(self):
231 f = io.open(test_support.TESTFN, "wb")
232 f.write("xxx")
233 f.close()
234 f = io.open(test_support.TESTFN, "rb")
235 self.assertEqual(f.read(), b"xxx")
236 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossuma9e20242007-03-08 00:43:48 +0000238
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000239class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000240
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000241 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000242 buf = self.buftype("1234567890")
243 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000244
245 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000246 buf = self.buftype("1234567890")
247 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000248
249 self.assertEquals(buf[:1], bytesIo.read(1))
250 self.assertEquals(buf[1:5], bytesIo.read(4))
251 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000252 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000253
254 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000255 buf = self.buftype("1234567890")
256 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000257
258 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000259 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000260
261 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000262 buf = self.buftype("1234567890")
263 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000264
265 bytesIo.read(5)
266 bytesIo.seek(0)
267 self.assertEquals(buf, bytesIo.read())
268
269 bytesIo.seek(3)
270 self.assertEquals(buf[3:], bytesIo.read())
271
272 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000273 buf = self.buftype("1234567890")
274 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000275
276 self.assertEquals(0, bytesIo.tell())
277 bytesIo.seek(5)
278 self.assertEquals(5, bytesIo.tell())
279 bytesIo.seek(10000)
280 self.assertEquals(10000, bytesIo.tell())
281
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000283class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000284 buftype = bytes
285 ioclass = io.BytesIO
286 EOF = b""
287
288
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000289class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000290 buftype = str
291 ioclass = io.StringIO
292 EOF = ""
293
294
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000295class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000296
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000297 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000298 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000299 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000300
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000301 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000302
Guido van Rossum78892e42007-04-06 17:31:18 +0000303 def testBuffering(self):
304 data = b"abcdefghi"
305 dlen = len(data)
306
307 tests = [
308 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
309 [ 100, [ 3, 3, 3], [ dlen ] ],
310 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
311 ]
312
313 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000314 rawio = MockFileIO(data)
315 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000316 pos = 0
317 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000318 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000319 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000320 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000321
Guido van Rossum01a27522007-03-07 01:00:12 +0000322 def testReadNonBlocking(self):
323 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000324 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000325 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000326
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000327 self.assertEquals(b"abcd", bufio.read(6))
328 self.assertEquals(b"e", bufio.read(1))
329 self.assertEquals(b"fg", bufio.read())
330 self.assert_(None is bufio.read())
331 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000332
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000333 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000334 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000335 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000336
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000337 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000338
339 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000340 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000341 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000342
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000343 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000344
345 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000346 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000347 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000348
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000349 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000350
351 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000352 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000353 # this test. Else, write it.
354 pass
355
Guido van Rossuma9e20242007-03-08 00:43:48 +0000356
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000357class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000358
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000359 def testWrite(self):
360 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000361 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000362 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000363
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000364 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000365
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000366 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000367
368 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000369 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000370 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000371
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000372 bufio.write(b"abc")
373 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000374
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000375 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000376
Guido van Rossum01a27522007-03-07 01:00:12 +0000377 def testWriteNonBlocking(self):
378 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000379 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000380
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000381 bufio.write(b"asdf")
382 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000383 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
384
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000385 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000386 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000387 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000388 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
389 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
390
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000391 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000392
Guido van Rossum78892e42007-04-06 17:31:18 +0000393 # XXX I don't like this test. It relies too heavily on how the
394 # algorithm actually works, which we might change. Refactor
395 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000396
397 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000398 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000400
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000401 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000402
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000403 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000404 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000406
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000407 bufio.write(b"abc")
408 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000409
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000410 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000411
Guido van Rossuma9e20242007-03-08 00:43:48 +0000412
Guido van Rossum01a27522007-03-07 01:00:12 +0000413class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000414
Guido van Rossum01a27522007-03-07 01:00:12 +0000415 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000416 r = MockRawIO(())
417 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000418 pair = io.BufferedRWPair(r, w)
419
420 # XXX need implementation
421
Guido van Rossuma9e20242007-03-08 00:43:48 +0000422
Guido van Rossum78892e42007-04-06 17:31:18 +0000423class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000424
Guido van Rossum01a27522007-03-07 01:00:12 +0000425 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000426 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000427 rw = io.BufferedRandom(raw, 8, 12)
428
429 self.assertEqual(b"as", rw.read(2))
430 rw.write(b"ddd")
431 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000432 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000433 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000434 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000435
436 def testSeekAndTell(self):
437 raw = io.BytesIO(b"asdfghjkl")
438 rw = io.BufferedRandom(raw)
439
440 self.assertEquals(b"as", rw.read(2))
441 self.assertEquals(2, rw.tell())
442 rw.seek(0, 0)
443 self.assertEquals(b"asdf", rw.read(4))
444
445 rw.write(b"asdf")
446 rw.seek(0, 0)
447 self.assertEquals(b"asdfasdfl", rw.read())
448 self.assertEquals(9, rw.tell())
449 rw.seek(-4, 2)
450 self.assertEquals(5, rw.tell())
451 rw.seek(2, 1)
452 self.assertEquals(7, rw.tell())
453 self.assertEquals(b"fl", rw.read(11))
454
Guido van Rossum78892e42007-04-06 17:31:18 +0000455
456class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000457
458## def tearDown(self):
459## test_support.unlink(test_support.TESTFN)
460
Guido van Rossum78892e42007-04-06 17:31:18 +0000461 def testNewlines(self):
462 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
463
464 tests = [
465 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
466 [ '\n', input_lines ],
467 [ '\r\n', input_lines ],
468 ]
469
470 encodings = ('utf-8', 'bz2')
471
472 # Try a range of pad sizes to test the case where \r is the last
473 # character in TextIOWrapper._pending_line.
474 for encoding in encodings:
475 for do_reads in (False, True):
476 for padlen in chain(range(10), range(50, 60)):
477 pad = '.' * padlen
478 data_lines = [ pad + line for line in input_lines ]
479 # XXX: str.encode() should return bytes
480 data = bytes(''.join(data_lines).encode(encoding))
481
482 for newline, exp_line_ends in tests:
483 exp_lines = [ pad + line for line in exp_line_ends ]
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000484 bufio = io.BufferedReader(io.BytesIO(data))
485 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000486 encoding=encoding)
487 if do_reads:
488 got_lines = []
489 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000490 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000491 if c2 == '':
492 break
493 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000494 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000495 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000496 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000497
498 for got_line, exp_line in zip(got_lines, exp_lines):
499 self.assertEquals(got_line, exp_line)
500 self.assertEquals(len(got_lines), len(exp_lines))
501
Guido van Rossum9b76da62007-04-11 01:09:03 +0000502 # Systematic tests of the text I/O API
503
504 def testBasicIO(self):
505 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
506 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
507 f = io.open(test_support.TESTFN, "w+", encoding=enc)
508 f._CHUNK_SIZE = chunksize
509 self.assertEquals(f.write("abc"), 3)
510 f.close()
511 f = io.open(test_support.TESTFN, "r+", encoding=enc)
512 f._CHUNK_SIZE = chunksize
513 self.assertEquals(f.tell(), 0)
514 self.assertEquals(f.read(), "abc")
515 cookie = f.tell()
516 self.assertEquals(f.seek(0), 0)
517 self.assertEquals(f.read(2), "ab")
518 self.assertEquals(f.read(1), "c")
519 self.assertEquals(f.read(1), "")
520 self.assertEquals(f.read(), "")
521 self.assertEquals(f.tell(), cookie)
522 self.assertEquals(f.seek(0), 0)
523 self.assertEquals(f.seek(0, 2), cookie)
524 self.assertEquals(f.write("def"), 3)
525 self.assertEquals(f.seek(cookie), cookie)
526 self.assertEquals(f.read(), "def")
527 if enc.startswith("utf"):
528 self.multi_line_test(f, enc)
529 f.close()
530
531 def multi_line_test(self, f, enc):
532 f.seek(0)
533 f.truncate()
534 sample = u"s\xff\u0fff\uffff"
535 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000536 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000537 chars = []
538 for i in xrange(size):
539 chars.append(sample[i % len(sample)])
540 line = u"".join(chars) + "\n"
541 wlines.append((f.tell(), line))
542 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000543 f.seek(0)
544 rlines = []
545 while True:
546 pos = f.tell()
547 line = f.readline()
548 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000549 break
550 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000551 self.assertEquals(rlines, wlines)
552
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000553 def testTelling(self):
554 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
555 p0 = f.tell()
556 f.write(u"\xff\n")
557 p1 = f.tell()
558 f.write(u"\xff\n")
559 p2 = f.tell()
560 f.seek(0)
561 self.assertEquals(f.tell(), p0)
562 self.assertEquals(f.readline(), u"\xff\n")
563 self.assertEquals(f.tell(), p1)
564 self.assertEquals(f.readline(), u"\xff\n")
565 self.assertEquals(f.tell(), p2)
566 f.seek(0)
567 for line in f:
568 self.assertEquals(line, u"\xff\n")
569 self.assertRaises(IOError, f.tell)
570 self.assertEquals(f.tell(), p2)
571 f.close()
572
573 def timingTest(self):
574 timer = time.time
575 enc = "utf8"
576 line = u"\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
577 nlines = 10000
578 nchars = len(line)
579 nbytes = len(line.encode(enc))
580 for chunk_size in (32, 64, 128, 256):
581 f = io.open(test_support.TESTFN, "w+", encoding=enc)
582 f._CHUNK_SIZE = chunk_size
583 t0 = timer()
584 for i in range(nlines):
585 f.write(line)
586 f.flush()
587 t1 = timer()
588 f.seek(0)
589 for line in f:
590 pass
591 t2 = timer()
592 f.seek(0)
593 while f.readline():
594 pass
595 t3 = timer()
596 f.seek(0)
597 while f.readline():
598 f.tell()
599 t4 = timer()
600 f.close()
601 if test_support.verbose:
602 print("\nTiming test: %d lines of %d characters (%d bytes)" %
603 (nlines, nchars, nbytes))
604 print("File chunk size: %6s" % f._CHUNK_SIZE)
605 print("Writing: %6.3f seconds" % (t1-t0))
606 print("Reading using iteration: %6.3f seconds" % (t2-t1))
607 print("Reading using readline(): %6.3f seconds" % (t3-t2))
608 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
609
Guido van Rossum9b76da62007-04-11 01:09:03 +0000610
Guido van Rossum01a27522007-03-07 01:00:12 +0000611# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000612
Guido van Rossum28524c72007-02-27 05:47:44 +0000613def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000614 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
615 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000616 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum78892e42007-04-06 17:31:18 +0000617 BufferedRandomTest, TextIOWrapperTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000618
619if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000620 unittest.main()