blob: 33b32e0a1bbac42704c2c3b52d9c1105c02246b6 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Guido van Rossum53807da2007-04-10 19:01:47 +000011import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000012
Guido van Rossuma9e20242007-03-08 00:43:48 +000013
Guido van Rossum53807da2007-04-10 19:01:47 +000014class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000015
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000016 def __init__(self, read_stack=()):
17 self._read_stack = list(read_stack)
18 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000019
20 def read(self, n=None):
21 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000022 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000023 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000024 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000025
Guido van Rossum01a27522007-03-07 01:00:12 +000026 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000027 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000028 return len(b)
29
30 def writable(self):
31 return True
32
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 def fileno(self):
34 return 42
35
36 def readable(self):
37 return True
38
Guido van Rossum01a27522007-03-07 01:00:12 +000039 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000040 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seek(self, pos, whence):
43 pass
44
45 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000046 return 42
47
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum78892e42007-04-06 17:31:18 +000049class MockFileIO(io.BytesIO):
50
51 def __init__(self, data):
52 self.read_history = []
53 io.BytesIO.__init__(self, data)
54
55 def read(self, n=None):
56 res = io.BytesIO.read(self, n)
57 self.read_history.append(None if res is None else len(res))
58 return res
59
60
Guido van Rossum01a27522007-03-07 01:00:12 +000061class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000062
Guido van Rossum53807da2007-04-10 19:01:47 +000063 def __init__(self, blocking_script):
64 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000065 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000068 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000069 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000070 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000071 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000072 else:
73 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000074
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def writable(self):
76 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossuma9e20242007-03-08 00:43:48 +000078
Guido van Rossum28524c72007-02-27 05:47:44 +000079class IOTest(unittest.TestCase):
80
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000081 def tearDown(self):
82 test_support.unlink(test_support.TESTFN)
83
Guido van Rossum28524c72007-02-27 05:47:44 +000084 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000085 self.assertEqual(f.write(b"blah."), 5)
86 self.assertEqual(f.seek(0), 0)
87 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000088 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000090 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000091 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000092 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000093 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000094 self.assertEqual(f.seek(-1, 2), 13)
95 self.assertEqual(f.tell(), 13)
96 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000097 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +000098 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +000099
Guido van Rossum9b76da62007-04-11 01:09:03 +0000100 def read_ops(self, f, buffered=False):
101 data = f.read(5)
102 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000103 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000104 self.assertEqual(f.readinto(data), 5)
105 self.assertEqual(data, b" worl")
106 self.assertEqual(f.readinto(data), 2)
107 self.assertEqual(len(data), 5)
108 self.assertEqual(data[:2], b"d\n")
109 self.assertEqual(f.seek(0), 0)
110 self.assertEqual(f.read(20), b"hello world\n")
111 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000112 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000113 self.assertEqual(f.seek(-6, 2), 6)
114 self.assertEqual(f.read(5), b"world")
115 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000116 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000117 self.assertEqual(f.seek(-6, 1), 5)
118 self.assertEqual(f.read(5), b" worl")
119 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000120 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000121 if buffered:
122 f.seek(0)
123 self.assertEqual(f.read(), b"hello world\n")
124 f.seek(6)
125 self.assertEqual(f.read(), b"world\n")
126 self.assertEqual(f.read(), b"")
127
Guido van Rossum34d69e52007-04-10 20:08:41 +0000128 LARGE = 2**31
129
Guido van Rossum53807da2007-04-10 19:01:47 +0000130 def large_file_ops(self, f):
131 assert f.readable()
132 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000133 self.assertEqual(f.seek(self.LARGE), self.LARGE)
134 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000135 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000136 self.assertEqual(f.tell(), self.LARGE + 3)
137 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000138 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.tell(), self.LARGE + 2)
140 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000141 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000142 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000143 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
144 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000145 self.assertEqual(f.read(2), b"x")
146
Guido van Rossum28524c72007-02-27 05:47:44 +0000147 def test_raw_file_io(self):
148 f = io.open(test_support.TESTFN, "wb", buffering=0)
149 self.assertEqual(f.readable(), False)
150 self.assertEqual(f.writable(), True)
151 self.assertEqual(f.seekable(), True)
152 self.write_ops(f)
153 f.close()
154 f = io.open(test_support.TESTFN, "rb", buffering=0)
155 self.assertEqual(f.readable(), True)
156 self.assertEqual(f.writable(), False)
157 self.assertEqual(f.seekable(), True)
158 self.read_ops(f)
159 f.close()
160
Guido van Rossum87429772007-04-10 21:06:59 +0000161 def test_buffered_file_io(self):
162 f = io.open(test_support.TESTFN, "wb")
163 self.assertEqual(f.readable(), False)
164 self.assertEqual(f.writable(), True)
165 self.assertEqual(f.seekable(), True)
166 self.write_ops(f)
167 f.close()
168 f = io.open(test_support.TESTFN, "rb")
169 self.assertEqual(f.readable(), True)
170 self.assertEqual(f.writable(), False)
171 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000172 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000173 f.close()
174
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000175 def test_readline(self):
176 f = io.open(test_support.TESTFN, "wb")
177 f.write(b"abc\ndef\nxyzzy\nfoo")
178 f.close()
179 f = io.open(test_support.TESTFN, "rb")
180 self.assertEqual(f.readline(), b"abc\n")
181 self.assertEqual(f.readline(10), b"def\n")
182 self.assertEqual(f.readline(2), b"xy")
183 self.assertEqual(f.readline(4), b"zzy\n")
184 self.assertEqual(f.readline(), b"foo")
185 f.close()
186
Guido van Rossum28524c72007-02-27 05:47:44 +0000187 def test_raw_bytes_io(self):
188 f = io.BytesIO()
189 self.write_ops(f)
190 data = f.getvalue()
191 self.assertEqual(data, b"hello world\n")
192 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000193 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000194
Guido van Rossum53807da2007-04-10 19:01:47 +0000195 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000196 # On Windows and Mac OSX this test comsumes large resources; It takes
197 # a long time to build the >2GB file and takes >2GB of disk space
198 # therefore the resource must be enabled to run this test.
199 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
200 if not test_support.is_resource_enabled("largefile"):
201 print("\nTesting large file ops skipped on %s." % sys.platform,
202 file=sys.stderr)
203 print("It requires %d bytes and a long time." % self.LARGE,
204 file=sys.stderr)
205 print("Use 'regrtest.py -u largefile test_io' to run it.",
206 file=sys.stderr)
207 return
Guido van Rossum87429772007-04-10 21:06:59 +0000208 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000209 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000210 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000211 f = io.open(test_support.TESTFN, "w+b")
212 self.large_file_ops(f)
213 f.close()
214
215 def test_with_open(self):
216 for bufsize in (0, 1, 100):
217 f = None
218 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000219 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000220 self.assertEqual(f.closed, True)
221 f = None
222 try:
223 with open(test_support.TESTFN, "wb", bufsize) as f:
224 1/0
225 except ZeroDivisionError:
226 self.assertEqual(f.closed, True)
227 else:
228 self.fail("1/0 didn't raise an exception")
229
230 def test_destructor(self):
231 record = []
232 class MyFileIO(io.FileIO):
233 def __del__(self):
234 record.append(1)
235 io.FileIO.__del__(self)
236 def close(self):
237 record.append(2)
238 io.FileIO.close(self)
239 def flush(self):
240 record.append(3)
241 io.FileIO.flush(self)
242 f = MyFileIO(test_support.TESTFN, "w")
243 f.write("xxx")
244 del f
245 self.assertEqual(record, [1, 2, 3])
246
247 def test_close_flushes(self):
248 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000249 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000250 f.close()
251 f = io.open(test_support.TESTFN, "rb")
252 self.assertEqual(f.read(), b"xxx")
253 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000254
Guido van Rossumd4103952007-04-12 05:44:49 +0000255 def test_array_writes(self):
256 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000257 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000258 f = io.open(test_support.TESTFN, "wb", 0)
259 self.assertEqual(f.write(a), n)
260 f.close()
261 f = io.open(test_support.TESTFN, "wb")
262 self.assertEqual(f.write(a), n)
263 f.close()
264
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000265 def test_closefd(self):
266 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
267 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000268
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000269class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000270
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000271 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000272 buf = self.buftype("1234567890")
273 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000274
275 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000278
279 self.assertEquals(buf[:1], bytesIo.read(1))
280 self.assertEquals(buf[1:5], bytesIo.read(4))
281 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000282 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000283
284 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000285 buf = self.buftype("1234567890")
286 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000287
288 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000289 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000290
291 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000292 buf = self.buftype("1234567890")
293 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000294
295 bytesIo.read(5)
296 bytesIo.seek(0)
297 self.assertEquals(buf, bytesIo.read())
298
299 bytesIo.seek(3)
300 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000301 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000302
303 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000304 buf = self.buftype("1234567890")
305 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000306
307 self.assertEquals(0, bytesIo.tell())
308 bytesIo.seek(5)
309 self.assertEquals(5, bytesIo.tell())
310 bytesIo.seek(10000)
311 self.assertEquals(10000, bytesIo.tell())
312
Guido van Rossuma9e20242007-03-08 00:43:48 +0000313
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000314class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000315 @staticmethod
316 def buftype(s):
317 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000318 ioclass = io.BytesIO
319 EOF = b""
320
321
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000322class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000323 buftype = str
324 ioclass = io.StringIO
325 EOF = ""
326
327
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000328class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000329
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000330 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000332 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000333
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000334 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000335
Guido van Rossum78892e42007-04-06 17:31:18 +0000336 def testBuffering(self):
337 data = b"abcdefghi"
338 dlen = len(data)
339
340 tests = [
341 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
342 [ 100, [ 3, 3, 3], [ dlen ] ],
343 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
344 ]
345
346 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000347 rawio = MockFileIO(data)
348 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000349 pos = 0
350 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000352 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000353 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000354
Guido van Rossum01a27522007-03-07 01:00:12 +0000355 def testReadNonBlocking(self):
356 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000357 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000358 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000359
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000360 self.assertEquals(b"abcd", bufio.read(6))
361 self.assertEquals(b"e", bufio.read(1))
362 self.assertEquals(b"fg", bufio.read())
363 self.assert_(None is bufio.read())
364 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000365
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000366 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000367 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000368 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000369
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000370 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000371
372 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000373 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000374 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000375
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000376 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000377
378 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000379 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000381
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000382 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000383
384 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000385 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000386 # this test. Else, write it.
387 pass
388
Guido van Rossuma9e20242007-03-08 00:43:48 +0000389
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000390class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000391
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000392 def testWrite(self):
393 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000394 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000395 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000396
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000397 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000398
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
401 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 bufio.write(b"abc")
406 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000407
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000408 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000409
Guido van Rossum01a27522007-03-07 01:00:12 +0000410 def testWriteNonBlocking(self):
411 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000412 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000413
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000414 bufio.write(b"asdf")
415 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000416 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
417
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000418 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000419 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000420 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000421 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
422 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
423
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000424 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000425
Guido van Rossum78892e42007-04-06 17:31:18 +0000426 # XXX I don't like this test. It relies too heavily on how the
427 # algorithm actually works, which we might change. Refactor
428 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000429
430 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000431 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000432 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000433
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000434 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000435
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000436 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000437 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000438 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000439
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000440 bufio.write(b"abc")
441 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000442
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000443 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000444
Guido van Rossuma9e20242007-03-08 00:43:48 +0000445
Guido van Rossum01a27522007-03-07 01:00:12 +0000446class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000447
Guido van Rossum01a27522007-03-07 01:00:12 +0000448 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000449 r = MockRawIO(())
450 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000451 pair = io.BufferedRWPair(r, w)
452
453 # XXX need implementation
454
Guido van Rossuma9e20242007-03-08 00:43:48 +0000455
Guido van Rossum78892e42007-04-06 17:31:18 +0000456class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000457
Guido van Rossum01a27522007-03-07 01:00:12 +0000458 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000459 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000460 rw = io.BufferedRandom(raw, 8, 12)
461
462 self.assertEqual(b"as", rw.read(2))
463 rw.write(b"ddd")
464 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000465 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000466 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000467 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000468
469 def testSeekAndTell(self):
470 raw = io.BytesIO(b"asdfghjkl")
471 rw = io.BufferedRandom(raw)
472
473 self.assertEquals(b"as", rw.read(2))
474 self.assertEquals(2, rw.tell())
475 rw.seek(0, 0)
476 self.assertEquals(b"asdf", rw.read(4))
477
478 rw.write(b"asdf")
479 rw.seek(0, 0)
480 self.assertEquals(b"asdfasdfl", rw.read())
481 self.assertEquals(9, rw.tell())
482 rw.seek(-4, 2)
483 self.assertEquals(5, rw.tell())
484 rw.seek(2, 1)
485 self.assertEquals(7, rw.tell())
486 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000487 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000488
Guido van Rossum78892e42007-04-06 17:31:18 +0000489
490class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000491
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000492 def setUp(self):
493 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
494 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
495
Guido van Rossumd0712812007-04-11 16:32:43 +0000496 def tearDown(self):
497 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000498
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000499 def testLineBuffering(self):
500 r = io.BytesIO()
501 b = io.BufferedWriter(r, 1000)
502 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
503 t.write("X")
504 self.assertEquals(r.getvalue(), b"") # No flush happened
505 t.write("Y\nZ")
506 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
507 t.write("A\rB")
508 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
509
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000510 def testEncodingErrorsReading(self):
511 # (1) default
512 b = io.BytesIO(b"abc\n\xff\n")
513 t = io.TextIOWrapper(b, encoding="ascii")
514 self.assertRaises(UnicodeError, t.read)
515 # (2) explicit strict
516 b = io.BytesIO(b"abc\n\xff\n")
517 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
518 self.assertRaises(UnicodeError, t.read)
519 # (3) ignore
520 b = io.BytesIO(b"abc\n\xff\n")
521 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
522 self.assertEquals(t.read(), "abc\n\n")
523 # (4) replace
524 b = io.BytesIO(b"abc\n\xff\n")
525 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
526 self.assertEquals(t.read(), "abc\n\ufffd\n")
527
528 def testEncodingErrorsWriting(self):
529 # (1) default
530 b = io.BytesIO()
531 t = io.TextIOWrapper(b, encoding="ascii")
532 self.assertRaises(UnicodeError, t.write, "\xff")
533 # (2) explicit strict
534 b = io.BytesIO()
535 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
536 self.assertRaises(UnicodeError, t.write, "\xff")
537 # (3) ignore
538 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000539 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
540 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000541 t.write("abc\xffdef\n")
542 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000543 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000544 # (4) replace
545 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000546 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
547 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000548 t.write("abc\xffdef\n")
549 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000550 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000551
Guido van Rossum8358db22007-08-18 21:39:55 +0000552 def testNewlinesInput(self):
553 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
554 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
555 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000556 (None, normalized.decode("ascii").splitlines(True)),
557 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000558 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
559 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
560 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
561 ]:
562 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000563 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000564 self.assertEquals(txt.readlines(), expected)
565 txt.seek(0)
566 self.assertEquals(txt.read(), "".join(expected))
567
568 def testNewlinesOutput(self):
569 testdict = {
570 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
571 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
572 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
573 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
574 }
575 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
576 for newline, expected in tests:
577 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000578 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000579 txt.write("AAA\nB")
580 txt.write("BB\nCCC\n")
581 txt.write("X\rY\r\nZ")
582 txt.flush()
583 self.assertEquals(buf.getvalue(), expected)
584
Guido van Rossum78892e42007-04-06 17:31:18 +0000585 def testNewlines(self):
586 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
587
588 tests = [
589 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000590 [ '', input_lines ],
591 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
592 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
593 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000594 ]
595
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000596 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000597
Guido van Rossum8358db22007-08-18 21:39:55 +0000598 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000599 # character in TextIOWrapper._pending_line.
600 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000601 # XXX: str.encode() should return bytes
602 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000603 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000604 for bufsize in range(1, 10):
605 for newline, exp_lines in tests:
606 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000607 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000608 encoding=encoding)
609 if do_reads:
610 got_lines = []
611 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000612 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000613 if c2 == '':
614 break
615 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000616 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000617 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000618 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000619
620 for got_line, exp_line in zip(got_lines, exp_lines):
621 self.assertEquals(got_line, exp_line)
622 self.assertEquals(len(got_lines), len(exp_lines))
623
Guido van Rossum8358db22007-08-18 21:39:55 +0000624 def testNewlinesInput(self):
625 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
626 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
627 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000628 (None, normalized.decode("ascii").splitlines(True)),
629 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000630 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
631 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
632 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
633 ]:
634 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000635 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000636 self.assertEquals(txt.readlines(), expected)
637 txt.seek(0)
638 self.assertEquals(txt.read(), "".join(expected))
639
640 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000641 data = "AAA\nBBB\rCCC\n"
642 data_lf = b"AAA\nBBB\rCCC\n"
643 data_cr = b"AAA\rBBB\rCCC\r"
644 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000645 save_linesep = os.linesep
646 try:
647 for os.linesep, newline, expected in [
648 ("\n", None, data_lf),
649 ("\r\n", None, data_crlf),
650 ("\n", "", data_lf),
651 ("\r\n", "", data_lf),
652 ("\n", "\n", data_lf),
653 ("\r\n", "\n", data_lf),
654 ("\n", "\r", data_cr),
655 ("\r\n", "\r", data_cr),
656 ("\n", "\r\n", data_crlf),
657 ("\r\n", "\r\n", data_crlf),
658 ]:
659 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000660 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000661 txt.write(data)
662 txt.close()
663 self.assertEquals(buf.getvalue(), expected)
664 finally:
665 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000666
Guido van Rossum9b76da62007-04-11 01:09:03 +0000667 # Systematic tests of the text I/O API
668
669 def testBasicIO(self):
670 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
671 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
672 f = io.open(test_support.TESTFN, "w+", encoding=enc)
673 f._CHUNK_SIZE = chunksize
674 self.assertEquals(f.write("abc"), 3)
675 f.close()
676 f = io.open(test_support.TESTFN, "r+", encoding=enc)
677 f._CHUNK_SIZE = chunksize
678 self.assertEquals(f.tell(), 0)
679 self.assertEquals(f.read(), "abc")
680 cookie = f.tell()
681 self.assertEquals(f.seek(0), 0)
682 self.assertEquals(f.read(2), "ab")
683 self.assertEquals(f.read(1), "c")
684 self.assertEquals(f.read(1), "")
685 self.assertEquals(f.read(), "")
686 self.assertEquals(f.tell(), cookie)
687 self.assertEquals(f.seek(0), 0)
688 self.assertEquals(f.seek(0, 2), cookie)
689 self.assertEquals(f.write("def"), 3)
690 self.assertEquals(f.seek(cookie), cookie)
691 self.assertEquals(f.read(), "def")
692 if enc.startswith("utf"):
693 self.multi_line_test(f, enc)
694 f.close()
695
696 def multi_line_test(self, f, enc):
697 f.seek(0)
698 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000699 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000700 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000701 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000702 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000703 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000704 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000705 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000706 wlines.append((f.tell(), line))
707 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000708 f.seek(0)
709 rlines = []
710 while True:
711 pos = f.tell()
712 line = f.readline()
713 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000714 break
715 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000716 self.assertEquals(rlines, wlines)
717
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000718 def testTelling(self):
719 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
720 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000721 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000722 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000723 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000724 p2 = f.tell()
725 f.seek(0)
726 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000727 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000728 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000729 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000730 self.assertEquals(f.tell(), p2)
731 f.seek(0)
732 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000733 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000734 self.assertRaises(IOError, f.tell)
735 self.assertEquals(f.tell(), p2)
736 f.close()
737
Guido van Rossumd76e7792007-04-17 02:38:04 +0000738 def testSeeking(self):
739 chunk_size = io.TextIOWrapper._CHUNK_SIZE
740 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000741 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000742 prefix = bytes(u_prefix.encode("utf-8"))
743 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000744 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000745 suffix = bytes(u_suffix.encode("utf-8"))
746 line = prefix + suffix
747 f = io.open(test_support.TESTFN, "wb")
748 f.write(line*2)
749 f.close()
750 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
751 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000752 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000753 self.assertEquals(f.tell(), prefix_size)
754 self.assertEquals(f.readline(), u_suffix)
755
756 def testSeekingToo(self):
757 # Regression test for a specific bug
758 data = b'\xe0\xbf\xbf\n'
759 f = io.open(test_support.TESTFN, "wb")
760 f.write(data)
761 f.close()
762 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
763 f._CHUNK_SIZE # Just test that it exists
764 f._CHUNK_SIZE = 2
765 f.readline()
766 f.tell()
767
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000768 def timingTest(self):
769 timer = time.time
770 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000771 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000772 nlines = 10000
773 nchars = len(line)
774 nbytes = len(line.encode(enc))
775 for chunk_size in (32, 64, 128, 256):
776 f = io.open(test_support.TESTFN, "w+", encoding=enc)
777 f._CHUNK_SIZE = chunk_size
778 t0 = timer()
779 for i in range(nlines):
780 f.write(line)
781 f.flush()
782 t1 = timer()
783 f.seek(0)
784 for line in f:
785 pass
786 t2 = timer()
787 f.seek(0)
788 while f.readline():
789 pass
790 t3 = timer()
791 f.seek(0)
792 while f.readline():
793 f.tell()
794 t4 = timer()
795 f.close()
796 if test_support.verbose:
797 print("\nTiming test: %d lines of %d characters (%d bytes)" %
798 (nlines, nchars, nbytes))
799 print("File chunk size: %6s" % f._CHUNK_SIZE)
800 print("Writing: %6.3f seconds" % (t1-t0))
801 print("Reading using iteration: %6.3f seconds" % (t2-t1))
802 print("Reading using readline(): %6.3f seconds" % (t3-t2))
803 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
804
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000805 def testReadOneByOne(self):
806 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
807 reads = ""
808 while True:
809 c = txt.read(1)
810 if not c:
811 break
812 reads += c
813 self.assertEquals(reads, "AA\nBB")
814
815 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
816 def testReadByChunk(self):
817 # make sure "\r\n" straddles 128 char boundary.
818 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
819 reads = ""
820 while True:
821 c = txt.read(128)
822 if not c:
823 break
824 reads += c
825 self.assertEquals(reads, "A"*127+"\nB")
826
827 def test_issue1395_1(self):
828 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
829
830 # read one char at a time
831 reads = ""
832 while True:
833 c = txt.read(1)
834 if not c:
835 break
836 reads += c
837 self.assertEquals(reads, self.normalized)
838
839 def test_issue1395_2(self):
840 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
841 txt._CHUNK_SIZE = 4
842
843 reads = ""
844 while True:
845 c = txt.read(4)
846 if not c:
847 break
848 reads += c
849 self.assertEquals(reads, self.normalized)
850
851 def test_issue1395_3(self):
852 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
853 txt._CHUNK_SIZE = 4
854
855 reads = txt.read(4)
856 reads += txt.read(4)
857 reads += txt.readline()
858 reads += txt.readline()
859 reads += txt.readline()
860 self.assertEquals(reads, self.normalized)
861
862 def test_issue1395_4(self):
863 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
864 txt._CHUNK_SIZE = 4
865
866 reads = txt.read(4)
867 reads += txt.read()
868 self.assertEquals(reads, self.normalized)
869
870 def test_issue1395_5(self):
871 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
872 txt._CHUNK_SIZE = 4
873
874 reads = txt.read(4)
875 pos = txt.tell()
876 txt.seek(0)
877 txt.seek(pos)
878 self.assertEquals(txt.read(4), "BBB\n")
879
880 def test_newline_decoder(self):
881 import codecs
882 decoder = codecs.getincrementaldecoder("utf-8")()
883 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
884
885 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
886
887 self.assertEquals(decoder.decode(b'\xe8'), "")
888 self.assertEquals(decoder.decode(b'\xa2'), "")
889 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
890
891 self.assertEquals(decoder.decode(b'\xe8'), "")
892 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
893
894 decoder.setstate((b'', 0))
895 self.assertEquals(decoder.decode(b'\n'), "\n")
896 self.assertEquals(decoder.decode(b'\r'), "")
897 self.assertEquals(decoder.decode(b'', final=True), "\n")
898 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
899
900 self.assertEquals(decoder.decode(b'\r'), "")
901 self.assertEquals(decoder.decode(b'a'), "\na")
902
903 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
904 self.assertEquals(decoder.decode(b'\r'), "")
905 self.assertEquals(decoder.decode(b'\r'), "\n")
906 self.assertEquals(decoder.decode(b'\na'), "\na")
907
908 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
909 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
910 self.assertEquals(decoder.decode(b'\n'), "\n")
911 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
912 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000913
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +0000914 decoder = codecs.getincrementaldecoder("utf-8")()
915 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
916 self.assertEquals(decoder.newlines, None)
917 decoder.decode(b"abc\n\r")
918 self.assertEquals(decoder.newlines, '\n')
919 decoder.decode(b"\nabc")
920 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
921 decoder.decode(b"abc\r")
922 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
923 decoder.decode(b"abc")
924 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
925 decoder.decode(b"abc\r")
926 decoder.reset()
927 self.assertEquals(decoder.decode(b"abc"), "abc")
928 self.assertEquals(decoder.newlines, None)
929
Guido van Rossum01a27522007-03-07 01:00:12 +0000930# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000931
Guido van Rossum5abbf752007-08-27 17:39:33 +0000932class MiscIOTest(unittest.TestCase):
933
934 def testImport__all__(self):
935 for name in io.__all__:
936 obj = getattr(io, name, None)
937 self.assert_(obj is not None, name)
938 if name == "open":
939 continue
940 elif "error" in name.lower():
941 self.assert_(issubclass(obj, Exception), name)
942 else:
943 self.assert_(issubclass(obj, io.IOBase))
944
945
Guido van Rossum28524c72007-02-27 05:47:44 +0000946def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000947 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
948 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000949 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +0000950 BufferedRandomTest, TextIOWrapperTest,
951 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000952
953if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000954 unittest.main()