blob: 5542a5b2bda00e91651e842e201a67122c4a7c1f [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum34d69e52007-04-10 20:08:41 +00003import sys
Guido van Rossum28524c72007-02-27 05:47:44 +00004import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00005from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00006from test import test_support
7
Guido van Rossum53807da2007-04-10 19:01:47 +00008import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +00009
Guido van Rossuma9e20242007-03-08 00:43:48 +000010
Guido van Rossum53807da2007-04-10 19:01:47 +000011class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000012
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000013 def __init__(self, read_stack=()):
14 self._read_stack = list(read_stack)
15 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000016
17 def read(self, n=None):
18 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000019 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000020 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000021 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000022
Guido van Rossum01a27522007-03-07 01:00:12 +000023 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000024 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000025 return len(b)
26
27 def writable(self):
28 return True
29
Guido van Rossum68bbcd22007-02-27 17:19:33 +000030 def fileno(self):
31 return 42
32
33 def readable(self):
34 return True
35
Guido van Rossum01a27522007-03-07 01:00:12 +000036 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000037 return True
38
Guido van Rossum01a27522007-03-07 01:00:12 +000039 def seek(self, pos, whence):
40 pass
41
42 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000043 return 42
44
Guido van Rossuma9e20242007-03-08 00:43:48 +000045
Guido van Rossum78892e42007-04-06 17:31:18 +000046class MockFileIO(io.BytesIO):
47
48 def __init__(self, data):
49 self.read_history = []
50 io.BytesIO.__init__(self, data)
51
52 def read(self, n=None):
53 res = io.BytesIO.read(self, n)
54 self.read_history.append(None if res is None else len(res))
55 return res
56
57
Guido van Rossum01a27522007-03-07 01:00:12 +000058class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000059
Guido van Rossum53807da2007-04-10 19:01:47 +000060 def __init__(self, blocking_script):
61 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000062 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000063
Guido van Rossum01a27522007-03-07 01:00:12 +000064 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000065 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000066 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000067 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000068 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000069 else:
70 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000071
Guido van Rossum01a27522007-03-07 01:00:12 +000072 def writable(self):
73 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000074
Guido van Rossuma9e20242007-03-08 00:43:48 +000075
Guido van Rossum28524c72007-02-27 05:47:44 +000076class IOTest(unittest.TestCase):
77
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000078 def tearDown(self):
79 test_support.unlink(test_support.TESTFN)
80
Guido van Rossum28524c72007-02-27 05:47:44 +000081 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000082 self.assertEqual(f.write(b"blah."), 5)
83 self.assertEqual(f.seek(0), 0)
84 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000085 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000086 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000087 self.assertEqual(f.tell(), 5)
Guido van Rossum87429772007-04-10 21:06:59 +000088 self.assertEqual(f.write(" world\n\n\n"), 9)
89 self.assertEqual(f.seek(0), 0)
90 self.assertEqual(f.write("h"), 1)
91 self.assertEqual(f.seek(-1, 2), 13)
92 self.assertEqual(f.tell(), 13)
93 self.assertEqual(f.truncate(12), 12)
94 self.assertEqual(f.tell(), 12)
Guido van Rossum28524c72007-02-27 05:47:44 +000095
Guido van Rossum9b76da62007-04-11 01:09:03 +000096 def read_ops(self, f, buffered=False):
97 data = f.read(5)
98 self.assertEqual(data, b"hello")
99 self.assertEqual(f.readinto(data), 5)
100 self.assertEqual(data, b" worl")
101 self.assertEqual(f.readinto(data), 2)
102 self.assertEqual(len(data), 5)
103 self.assertEqual(data[:2], b"d\n")
104 self.assertEqual(f.seek(0), 0)
105 self.assertEqual(f.read(20), b"hello world\n")
106 self.assertEqual(f.read(1), b"")
107 self.assertEqual(f.readinto(b"x"), 0)
108 self.assertEqual(f.seek(-6, 2), 6)
109 self.assertEqual(f.read(5), b"world")
110 self.assertEqual(f.read(0), b"")
111 self.assertEqual(f.readinto(b""), 0)
112 self.assertEqual(f.seek(-6, 1), 5)
113 self.assertEqual(f.read(5), b" worl")
114 self.assertEqual(f.tell(), 10)
115 if buffered:
116 f.seek(0)
117 self.assertEqual(f.read(), b"hello world\n")
118 f.seek(6)
119 self.assertEqual(f.read(), b"world\n")
120 self.assertEqual(f.read(), b"")
121
Guido van Rossum34d69e52007-04-10 20:08:41 +0000122 LARGE = 2**31
123
Guido van Rossum53807da2007-04-10 19:01:47 +0000124 def large_file_ops(self, f):
125 assert f.readable()
126 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000127 self.assertEqual(f.seek(self.LARGE), self.LARGE)
128 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000129 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000130 self.assertEqual(f.tell(), self.LARGE + 3)
131 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000132 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000133 self.assertEqual(f.tell(), self.LARGE + 2)
134 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000135 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000136 self.assertEqual(f.tell(), self.LARGE + 1)
137 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
138 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000139 self.assertEqual(f.read(2), b"x")
140
Guido van Rossum28524c72007-02-27 05:47:44 +0000141 def test_raw_file_io(self):
142 f = io.open(test_support.TESTFN, "wb", buffering=0)
143 self.assertEqual(f.readable(), False)
144 self.assertEqual(f.writable(), True)
145 self.assertEqual(f.seekable(), True)
146 self.write_ops(f)
147 f.close()
148 f = io.open(test_support.TESTFN, "rb", buffering=0)
149 self.assertEqual(f.readable(), True)
150 self.assertEqual(f.writable(), False)
151 self.assertEqual(f.seekable(), True)
152 self.read_ops(f)
153 f.close()
154
Guido van Rossum87429772007-04-10 21:06:59 +0000155 def test_buffered_file_io(self):
156 f = io.open(test_support.TESTFN, "wb")
157 self.assertEqual(f.readable(), False)
158 self.assertEqual(f.writable(), True)
159 self.assertEqual(f.seekable(), True)
160 self.write_ops(f)
161 f.close()
162 f = io.open(test_support.TESTFN, "rb")
163 self.assertEqual(f.readable(), True)
164 self.assertEqual(f.writable(), False)
165 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000166 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000167 f.close()
168
Guido van Rossum28524c72007-02-27 05:47:44 +0000169 def test_raw_bytes_io(self):
170 f = io.BytesIO()
171 self.write_ops(f)
172 data = f.getvalue()
173 self.assertEqual(data, b"hello world\n")
174 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000175 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000176
Guido van Rossum53807da2007-04-10 19:01:47 +0000177 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000178 # On Windows and Mac OSX this test comsumes large resources; It takes
179 # a long time to build the >2GB file and takes >2GB of disk space
180 # therefore the resource must be enabled to run this test.
181 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
182 if not test_support.is_resource_enabled("largefile"):
183 print("\nTesting large file ops skipped on %s." % sys.platform,
184 file=sys.stderr)
185 print("It requires %d bytes and a long time." % self.LARGE,
186 file=sys.stderr)
187 print("Use 'regrtest.py -u largefile test_io' to run it.",
188 file=sys.stderr)
189 return
Guido van Rossum87429772007-04-10 21:06:59 +0000190 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000191 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000192 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000193 f = io.open(test_support.TESTFN, "w+b")
194 self.large_file_ops(f)
195 f.close()
196
197 def test_with_open(self):
198 for bufsize in (0, 1, 100):
199 f = None
200 with open(test_support.TESTFN, "wb", bufsize) as f:
201 f.write("xxx")
202 self.assertEqual(f.closed, True)
203 f = None
204 try:
205 with open(test_support.TESTFN, "wb", bufsize) as f:
206 1/0
207 except ZeroDivisionError:
208 self.assertEqual(f.closed, True)
209 else:
210 self.fail("1/0 didn't raise an exception")
211
212 def test_destructor(self):
213 record = []
214 class MyFileIO(io.FileIO):
215 def __del__(self):
216 record.append(1)
217 io.FileIO.__del__(self)
218 def close(self):
219 record.append(2)
220 io.FileIO.close(self)
221 def flush(self):
222 record.append(3)
223 io.FileIO.flush(self)
224 f = MyFileIO(test_support.TESTFN, "w")
225 f.write("xxx")
226 del f
227 self.assertEqual(record, [1, 2, 3])
228
229 def test_close_flushes(self):
230 f = io.open(test_support.TESTFN, "wb")
231 f.write("xxx")
232 f.close()
233 f = io.open(test_support.TESTFN, "rb")
234 self.assertEqual(f.read(), b"xxx")
235 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000238class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000239
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000241 buf = self.buftype("1234567890")
242 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000243
244 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000245 buf = self.buftype("1234567890")
246 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000247
248 self.assertEquals(buf[:1], bytesIo.read(1))
249 self.assertEquals(buf[1:5], bytesIo.read(4))
250 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000251 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000252
253 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000254 buf = self.buftype("1234567890")
255 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000256
257 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000258 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000259
260 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000261 buf = self.buftype("1234567890")
262 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000263
264 bytesIo.read(5)
265 bytesIo.seek(0)
266 self.assertEquals(buf, bytesIo.read())
267
268 bytesIo.seek(3)
269 self.assertEquals(buf[3:], bytesIo.read())
270
271 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000272 buf = self.buftype("1234567890")
273 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000274
275 self.assertEquals(0, bytesIo.tell())
276 bytesIo.seek(5)
277 self.assertEquals(5, bytesIo.tell())
278 bytesIo.seek(10000)
279 self.assertEquals(10000, bytesIo.tell())
280
Guido van Rossuma9e20242007-03-08 00:43:48 +0000281
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000282class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000283 buftype = bytes
284 ioclass = io.BytesIO
285 EOF = b""
286
287
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000288class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000289 buftype = str
290 ioclass = io.StringIO
291 EOF = ""
292
293
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000294class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000295
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000296 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000297 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000298 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000299
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000300 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000301
Guido van Rossum78892e42007-04-06 17:31:18 +0000302 def testBuffering(self):
303 data = b"abcdefghi"
304 dlen = len(data)
305
306 tests = [
307 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
308 [ 100, [ 3, 3, 3], [ dlen ] ],
309 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
310 ]
311
312 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000313 rawio = MockFileIO(data)
314 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000315 pos = 0
316 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000317 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000318 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000319 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000320
Guido van Rossum01a27522007-03-07 01:00:12 +0000321 def testReadNonBlocking(self):
322 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000323 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000324 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000325
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000326 self.assertEquals(b"abcd", bufio.read(6))
327 self.assertEquals(b"e", bufio.read(1))
328 self.assertEquals(b"fg", bufio.read())
329 self.assert_(None is bufio.read())
330 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000331
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000332 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000333 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000334 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000335
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000336 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000337
338 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000339 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000340 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000341
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000342 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000343
344 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000345 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000346 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000347
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000348 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000349
350 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000351 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000352 # this test. Else, write it.
353 pass
354
Guido van Rossuma9e20242007-03-08 00:43:48 +0000355
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000356class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000357
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000358 def testWrite(self):
359 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000360 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000362
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000363 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000364
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000365 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000366
367 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000369 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000371 bufio.write(b"abc")
372 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000373
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000374 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000375
Guido van Rossum01a27522007-03-07 01:00:12 +0000376 def testWriteNonBlocking(self):
377 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000378 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000379
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 bufio.write(b"asdf")
381 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000382 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
383
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000384 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000385 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000386 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000387 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
388 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
389
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000390 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000391
Guido van Rossum78892e42007-04-06 17:31:18 +0000392 # XXX I don't like this test. It relies too heavily on how the
393 # algorithm actually works, which we might change. Refactor
394 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000395
396 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000397 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000398 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000399
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000400 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000401
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000402 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000403 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000404 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000405
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000406 bufio.write(b"abc")
407 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000408
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000410
Guido van Rossuma9e20242007-03-08 00:43:48 +0000411
Guido van Rossum01a27522007-03-07 01:00:12 +0000412class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000413
Guido van Rossum01a27522007-03-07 01:00:12 +0000414 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000415 r = MockRawIO(())
416 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000417 pair = io.BufferedRWPair(r, w)
418
419 # XXX need implementation
420
Guido van Rossuma9e20242007-03-08 00:43:48 +0000421
Guido van Rossum78892e42007-04-06 17:31:18 +0000422class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000423
Guido van Rossum01a27522007-03-07 01:00:12 +0000424 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000425 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000426 rw = io.BufferedRandom(raw, 8, 12)
427
428 self.assertEqual(b"as", rw.read(2))
429 rw.write(b"ddd")
430 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000431 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000432 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000433 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000434
435 def testSeekAndTell(self):
436 raw = io.BytesIO(b"asdfghjkl")
437 rw = io.BufferedRandom(raw)
438
439 self.assertEquals(b"as", rw.read(2))
440 self.assertEquals(2, rw.tell())
441 rw.seek(0, 0)
442 self.assertEquals(b"asdf", rw.read(4))
443
444 rw.write(b"asdf")
445 rw.seek(0, 0)
446 self.assertEquals(b"asdfasdfl", rw.read())
447 self.assertEquals(9, rw.tell())
448 rw.seek(-4, 2)
449 self.assertEquals(5, rw.tell())
450 rw.seek(2, 1)
451 self.assertEquals(7, rw.tell())
452 self.assertEquals(b"fl", rw.read(11))
453
Guido van Rossum78892e42007-04-06 17:31:18 +0000454
455class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000456
457## def tearDown(self):
458## test_support.unlink(test_support.TESTFN)
459
Guido van Rossum78892e42007-04-06 17:31:18 +0000460 def testNewlines(self):
461 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
462
463 tests = [
464 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
465 [ '\n', input_lines ],
466 [ '\r\n', input_lines ],
467 ]
468
469 encodings = ('utf-8', 'bz2')
470
471 # Try a range of pad sizes to test the case where \r is the last
472 # character in TextIOWrapper._pending_line.
473 for encoding in encodings:
474 for do_reads in (False, True):
475 for padlen in chain(range(10), range(50, 60)):
476 pad = '.' * padlen
477 data_lines = [ pad + line for line in input_lines ]
478 # XXX: str.encode() should return bytes
479 data = bytes(''.join(data_lines).encode(encoding))
480
481 for newline, exp_line_ends in tests:
482 exp_lines = [ pad + line for line in exp_line_ends ]
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000483 bufio = io.BufferedReader(io.BytesIO(data))
484 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000485 encoding=encoding)
486 if do_reads:
487 got_lines = []
488 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000489 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000490 if c2 == '':
491 break
492 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000493 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000494 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000495 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000496
497 for got_line, exp_line in zip(got_lines, exp_lines):
498 self.assertEquals(got_line, exp_line)
499 self.assertEquals(len(got_lines), len(exp_lines))
500
Guido van Rossum9b76da62007-04-11 01:09:03 +0000501 # Systematic tests of the text I/O API
502
503 def testBasicIO(self):
504 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
505 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
506 f = io.open(test_support.TESTFN, "w+", encoding=enc)
507 f._CHUNK_SIZE = chunksize
508 self.assertEquals(f.write("abc"), 3)
509 f.close()
510 f = io.open(test_support.TESTFN, "r+", encoding=enc)
511 f._CHUNK_SIZE = chunksize
512 self.assertEquals(f.tell(), 0)
513 self.assertEquals(f.read(), "abc")
514 cookie = f.tell()
515 self.assertEquals(f.seek(0), 0)
516 self.assertEquals(f.read(2), "ab")
517 self.assertEquals(f.read(1), "c")
518 self.assertEquals(f.read(1), "")
519 self.assertEquals(f.read(), "")
520 self.assertEquals(f.tell(), cookie)
521 self.assertEquals(f.seek(0), 0)
522 self.assertEquals(f.seek(0, 2), cookie)
523 self.assertEquals(f.write("def"), 3)
524 self.assertEquals(f.seek(cookie), cookie)
525 self.assertEquals(f.read(), "def")
526 if enc.startswith("utf"):
527 self.multi_line_test(f, enc)
528 f.close()
529
530 def multi_line_test(self, f, enc):
531 f.seek(0)
532 f.truncate()
533 sample = u"s\xff\u0fff\uffff"
534 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000535 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000536 chars = []
537 for i in xrange(size):
538 chars.append(sample[i % len(sample)])
539 line = u"".join(chars) + "\n"
540 wlines.append((f.tell(), line))
541 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000542 f.seek(0)
543 rlines = []
544 while True:
545 pos = f.tell()
546 line = f.readline()
547 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000548 break
549 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000550 self.assertEquals(rlines, wlines)
551
552
Guido van Rossum01a27522007-03-07 01:00:12 +0000553# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000554
Guido van Rossum28524c72007-02-27 05:47:44 +0000555def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000556 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
557 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000558 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum78892e42007-04-06 17:31:18 +0000559 BufferedRandomTest, TextIOWrapperTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000560
561if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000562 unittest.main()