blob: 36aaf14fc8cb1faddfea75ad68a07758aab279aa [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Guido van Rossum53807da2007-04-10 19:01:47 +000011import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000012
Guido van Rossuma9e20242007-03-08 00:43:48 +000013
Guido van Rossum53807da2007-04-10 19:01:47 +000014class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000015
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000016 def __init__(self, read_stack=()):
17 self._read_stack = list(read_stack)
18 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000019
20 def read(self, n=None):
21 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000022 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000023 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000024 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000025
Guido van Rossum01a27522007-03-07 01:00:12 +000026 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000027 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000028 return len(b)
29
30 def writable(self):
31 return True
32
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 def fileno(self):
34 return 42
35
36 def readable(self):
37 return True
38
Guido van Rossum01a27522007-03-07 01:00:12 +000039 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000040 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seek(self, pos, whence):
43 pass
44
45 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000046 return 42
47
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum78892e42007-04-06 17:31:18 +000049class MockFileIO(io.BytesIO):
50
51 def __init__(self, data):
52 self.read_history = []
53 io.BytesIO.__init__(self, data)
54
55 def read(self, n=None):
56 res = io.BytesIO.read(self, n)
57 self.read_history.append(None if res is None else len(res))
58 return res
59
60
Guido van Rossum01a27522007-03-07 01:00:12 +000061class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000062
Guido van Rossum53807da2007-04-10 19:01:47 +000063 def __init__(self, blocking_script):
64 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000065 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000068 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000069 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000070 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000071 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000072 else:
73 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000074
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def writable(self):
76 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossuma9e20242007-03-08 00:43:48 +000078
Guido van Rossum28524c72007-02-27 05:47:44 +000079class IOTest(unittest.TestCase):
80
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000081 def tearDown(self):
82 test_support.unlink(test_support.TESTFN)
83
Guido van Rossum28524c72007-02-27 05:47:44 +000084 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000085 self.assertEqual(f.write(b"blah."), 5)
86 self.assertEqual(f.seek(0), 0)
87 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000088 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000090 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000091 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000092 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000093 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000094 self.assertEqual(f.seek(-1, 2), 13)
95 self.assertEqual(f.tell(), 13)
96 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000097 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +000098 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +000099
Guido van Rossum9b76da62007-04-11 01:09:03 +0000100 def read_ops(self, f, buffered=False):
101 data = f.read(5)
102 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000103 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000104 self.assertEqual(f.readinto(data), 5)
105 self.assertEqual(data, b" worl")
106 self.assertEqual(f.readinto(data), 2)
107 self.assertEqual(len(data), 5)
108 self.assertEqual(data[:2], b"d\n")
109 self.assertEqual(f.seek(0), 0)
110 self.assertEqual(f.read(20), b"hello world\n")
111 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000112 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000113 self.assertEqual(f.seek(-6, 2), 6)
114 self.assertEqual(f.read(5), b"world")
115 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000116 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000117 self.assertEqual(f.seek(-6, 1), 5)
118 self.assertEqual(f.read(5), b" worl")
119 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000120 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000121 if buffered:
122 f.seek(0)
123 self.assertEqual(f.read(), b"hello world\n")
124 f.seek(6)
125 self.assertEqual(f.read(), b"world\n")
126 self.assertEqual(f.read(), b"")
127
Guido van Rossum34d69e52007-04-10 20:08:41 +0000128 LARGE = 2**31
129
Guido van Rossum53807da2007-04-10 19:01:47 +0000130 def large_file_ops(self, f):
131 assert f.readable()
132 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000133 self.assertEqual(f.seek(self.LARGE), self.LARGE)
134 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000135 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000136 self.assertEqual(f.tell(), self.LARGE + 3)
137 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000138 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.tell(), self.LARGE + 2)
140 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000141 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000142 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000143 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
144 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000145 self.assertEqual(f.read(2), b"x")
146
Guido van Rossum28524c72007-02-27 05:47:44 +0000147 def test_raw_file_io(self):
148 f = io.open(test_support.TESTFN, "wb", buffering=0)
149 self.assertEqual(f.readable(), False)
150 self.assertEqual(f.writable(), True)
151 self.assertEqual(f.seekable(), True)
152 self.write_ops(f)
153 f.close()
154 f = io.open(test_support.TESTFN, "rb", buffering=0)
155 self.assertEqual(f.readable(), True)
156 self.assertEqual(f.writable(), False)
157 self.assertEqual(f.seekable(), True)
158 self.read_ops(f)
159 f.close()
160
Guido van Rossum87429772007-04-10 21:06:59 +0000161 def test_buffered_file_io(self):
162 f = io.open(test_support.TESTFN, "wb")
163 self.assertEqual(f.readable(), False)
164 self.assertEqual(f.writable(), True)
165 self.assertEqual(f.seekable(), True)
166 self.write_ops(f)
167 f.close()
168 f = io.open(test_support.TESTFN, "rb")
169 self.assertEqual(f.readable(), True)
170 self.assertEqual(f.writable(), False)
171 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000172 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000173 f.close()
174
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000175 def test_readline(self):
176 f = io.open(test_support.TESTFN, "wb")
177 f.write(b"abc\ndef\nxyzzy\nfoo")
178 f.close()
179 f = io.open(test_support.TESTFN, "rb")
180 self.assertEqual(f.readline(), b"abc\n")
181 self.assertEqual(f.readline(10), b"def\n")
182 self.assertEqual(f.readline(2), b"xy")
183 self.assertEqual(f.readline(4), b"zzy\n")
184 self.assertEqual(f.readline(), b"foo")
185 f.close()
186
Guido van Rossum28524c72007-02-27 05:47:44 +0000187 def test_raw_bytes_io(self):
188 f = io.BytesIO()
189 self.write_ops(f)
190 data = f.getvalue()
191 self.assertEqual(data, b"hello world\n")
192 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000193 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000194
Guido van Rossum53807da2007-04-10 19:01:47 +0000195 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000196 # On Windows and Mac OSX this test comsumes large resources; It takes
197 # a long time to build the >2GB file and takes >2GB of disk space
198 # therefore the resource must be enabled to run this test.
199 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
200 if not test_support.is_resource_enabled("largefile"):
201 print("\nTesting large file ops skipped on %s." % sys.platform,
202 file=sys.stderr)
203 print("It requires %d bytes and a long time." % self.LARGE,
204 file=sys.stderr)
205 print("Use 'regrtest.py -u largefile test_io' to run it.",
206 file=sys.stderr)
207 return
Guido van Rossum87429772007-04-10 21:06:59 +0000208 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000209 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000210 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000211 f = io.open(test_support.TESTFN, "w+b")
212 self.large_file_ops(f)
213 f.close()
214
215 def test_with_open(self):
216 for bufsize in (0, 1, 100):
217 f = None
218 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000219 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000220 self.assertEqual(f.closed, True)
221 f = None
222 try:
223 with open(test_support.TESTFN, "wb", bufsize) as f:
224 1/0
225 except ZeroDivisionError:
226 self.assertEqual(f.closed, True)
227 else:
228 self.fail("1/0 didn't raise an exception")
229
230 def test_destructor(self):
231 record = []
232 class MyFileIO(io.FileIO):
233 def __del__(self):
234 record.append(1)
235 io.FileIO.__del__(self)
236 def close(self):
237 record.append(2)
238 io.FileIO.close(self)
239 def flush(self):
240 record.append(3)
241 io.FileIO.flush(self)
242 f = MyFileIO(test_support.TESTFN, "w")
243 f.write("xxx")
244 del f
245 self.assertEqual(record, [1, 2, 3])
246
247 def test_close_flushes(self):
248 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000249 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000250 f.close()
251 f = io.open(test_support.TESTFN, "rb")
252 self.assertEqual(f.read(), b"xxx")
253 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000254
Guido van Rossumd4103952007-04-12 05:44:49 +0000255 def test_array_writes(self):
256 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000257 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000258 f = io.open(test_support.TESTFN, "wb", 0)
259 self.assertEqual(f.write(a), n)
260 f.close()
261 f = io.open(test_support.TESTFN, "wb")
262 self.assertEqual(f.write(a), n)
263 f.close()
264
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000265 def test_closefd(self):
266 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
267 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000268
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000269class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000270
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000271 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000272 buf = self.buftype("1234567890")
273 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000274
275 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000278
279 self.assertEquals(buf[:1], bytesIo.read(1))
280 self.assertEquals(buf[1:5], bytesIo.read(4))
281 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000282 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000283
284 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000285 buf = self.buftype("1234567890")
286 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000287
288 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000289 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000290
291 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000292 buf = self.buftype("1234567890")
293 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000294
295 bytesIo.read(5)
296 bytesIo.seek(0)
297 self.assertEquals(buf, bytesIo.read())
298
299 bytesIo.seek(3)
300 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000301 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000302
303 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000304 buf = self.buftype("1234567890")
305 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000306
307 self.assertEquals(0, bytesIo.tell())
308 bytesIo.seek(5)
309 self.assertEquals(5, bytesIo.tell())
310 bytesIo.seek(10000)
311 self.assertEquals(10000, bytesIo.tell())
312
Guido van Rossuma9e20242007-03-08 00:43:48 +0000313
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000314class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000315 @staticmethod
316 def buftype(s):
317 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000318 ioclass = io.BytesIO
319 EOF = b""
320
321
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000322class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000323 buftype = str
324 ioclass = io.StringIO
325 EOF = ""
326
327
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000328class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000329
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000330 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000332 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000333
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000334 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000335
Guido van Rossum78892e42007-04-06 17:31:18 +0000336 def testBuffering(self):
337 data = b"abcdefghi"
338 dlen = len(data)
339
340 tests = [
341 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
342 [ 100, [ 3, 3, 3], [ dlen ] ],
343 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
344 ]
345
346 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000347 rawio = MockFileIO(data)
348 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000349 pos = 0
350 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000352 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000353 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000354
Guido van Rossum01a27522007-03-07 01:00:12 +0000355 def testReadNonBlocking(self):
356 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000357 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000358 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000359
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000360 self.assertEquals(b"abcd", bufio.read(6))
361 self.assertEquals(b"e", bufio.read(1))
362 self.assertEquals(b"fg", bufio.read())
363 self.assert_(None is bufio.read())
364 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000365
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000366 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000367 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000368 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000369
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000370 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000371
372 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000373 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000374 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000375
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000376 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000377
378 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000379 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000381
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000382 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000383
384 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000385 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000386 # this test. Else, write it.
387 pass
388
Guido van Rossuma9e20242007-03-08 00:43:48 +0000389
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000390class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000391
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000392 def testWrite(self):
393 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000394 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000395 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000396
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000397 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000398
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
401 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 bufio.write(b"abc")
406 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000407
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000408 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000409
Guido van Rossum01a27522007-03-07 01:00:12 +0000410 def testWriteNonBlocking(self):
411 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000412 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000413
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000414 bufio.write(b"asdf")
415 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000416 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
417
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000418 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000419 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000420 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000421 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
422 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
423
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000424 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000425
Guido van Rossum78892e42007-04-06 17:31:18 +0000426 # XXX I don't like this test. It relies too heavily on how the
427 # algorithm actually works, which we might change. Refactor
428 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000429
430 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000431 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000432 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000433
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000434 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000435
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000436 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000437 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000438 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000439
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000440 bufio.write(b"abc")
441 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000442
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000443 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000444
Guido van Rossuma9e20242007-03-08 00:43:48 +0000445
Guido van Rossum01a27522007-03-07 01:00:12 +0000446class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000447
Guido van Rossum01a27522007-03-07 01:00:12 +0000448 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000449 r = MockRawIO(())
450 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000451 pair = io.BufferedRWPair(r, w)
452
453 # XXX need implementation
454
Guido van Rossuma9e20242007-03-08 00:43:48 +0000455
Guido van Rossum78892e42007-04-06 17:31:18 +0000456class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000457
Guido van Rossum01a27522007-03-07 01:00:12 +0000458 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000459 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000460 rw = io.BufferedRandom(raw, 8, 12)
461
462 self.assertEqual(b"as", rw.read(2))
463 rw.write(b"ddd")
464 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000465 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000466 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000467 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000468
469 def testSeekAndTell(self):
470 raw = io.BytesIO(b"asdfghjkl")
471 rw = io.BufferedRandom(raw)
472
473 self.assertEquals(b"as", rw.read(2))
474 self.assertEquals(2, rw.tell())
475 rw.seek(0, 0)
476 self.assertEquals(b"asdf", rw.read(4))
477
478 rw.write(b"asdf")
479 rw.seek(0, 0)
480 self.assertEquals(b"asdfasdfl", rw.read())
481 self.assertEquals(9, rw.tell())
482 rw.seek(-4, 2)
483 self.assertEquals(5, rw.tell())
484 rw.seek(2, 1)
485 self.assertEquals(7, rw.tell())
486 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000487 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000488
Guido van Rossum78892e42007-04-06 17:31:18 +0000489
490class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000491
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000492 def setUp(self):
493 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
494 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
495
Guido van Rossumd0712812007-04-11 16:32:43 +0000496 def tearDown(self):
497 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000498
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000499 def testEncodingErrorsReading(self):
500 # (1) default
501 b = io.BytesIO(b"abc\n\xff\n")
502 t = io.TextIOWrapper(b, encoding="ascii")
503 self.assertRaises(UnicodeError, t.read)
504 # (2) explicit strict
505 b = io.BytesIO(b"abc\n\xff\n")
506 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
507 self.assertRaises(UnicodeError, t.read)
508 # (3) ignore
509 b = io.BytesIO(b"abc\n\xff\n")
510 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
511 self.assertEquals(t.read(), "abc\n\n")
512 # (4) replace
513 b = io.BytesIO(b"abc\n\xff\n")
514 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
515 self.assertEquals(t.read(), "abc\n\ufffd\n")
516
517 def testEncodingErrorsWriting(self):
518 # (1) default
519 b = io.BytesIO()
520 t = io.TextIOWrapper(b, encoding="ascii")
521 self.assertRaises(UnicodeError, t.write, "\xff")
522 # (2) explicit strict
523 b = io.BytesIO()
524 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
525 self.assertRaises(UnicodeError, t.write, "\xff")
526 # (3) ignore
527 b = io.BytesIO()
528 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
529 t.write("abc\xffdef\n")
530 t.flush()
531 self.assertEquals(b.getvalue(), b"abcdef\n")
532 # (4) replace
533 b = io.BytesIO()
534 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
535 t.write("abc\xffdef\n")
536 t.flush()
537 self.assertEquals(b.getvalue(), b"abc?def\n")
538
Guido van Rossum8358db22007-08-18 21:39:55 +0000539 def testNewlinesInput(self):
540 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
541 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
542 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000543 (None, normalized.decode("ascii").splitlines(True)),
544 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000545 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
546 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
547 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
548 ]:
549 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000550 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000551 self.assertEquals(txt.readlines(), expected)
552 txt.seek(0)
553 self.assertEquals(txt.read(), "".join(expected))
554
555 def testNewlinesOutput(self):
556 testdict = {
557 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
558 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
559 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
560 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
561 }
562 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
563 for newline, expected in tests:
564 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000565 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000566 txt.write("AAA\nB")
567 txt.write("BB\nCCC\n")
568 txt.write("X\rY\r\nZ")
569 txt.flush()
570 self.assertEquals(buf.getvalue(), expected)
571
Guido van Rossum78892e42007-04-06 17:31:18 +0000572 def testNewlines(self):
573 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
574
575 tests = [
576 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000577 [ '', input_lines ],
578 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
579 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
580 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000581 ]
582
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000583 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000584
Guido van Rossum8358db22007-08-18 21:39:55 +0000585 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000586 # character in TextIOWrapper._pending_line.
587 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000588 # XXX: str.encode() should return bytes
589 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000590 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000591 for bufsize in range(1, 10):
592 for newline, exp_lines in tests:
593 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000594 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000595 encoding=encoding)
596 if do_reads:
597 got_lines = []
598 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000599 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000600 if c2 == '':
601 break
602 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000603 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000604 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000605 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000606
607 for got_line, exp_line in zip(got_lines, exp_lines):
608 self.assertEquals(got_line, exp_line)
609 self.assertEquals(len(got_lines), len(exp_lines))
610
Guido van Rossum8358db22007-08-18 21:39:55 +0000611 def testNewlinesInput(self):
612 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
613 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
614 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000615 (None, normalized.decode("ascii").splitlines(True)),
616 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000617 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
618 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
619 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
620 ]:
621 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000622 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000623 self.assertEquals(txt.readlines(), expected)
624 txt.seek(0)
625 self.assertEquals(txt.read(), "".join(expected))
626
627 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000628 data = "AAA\nBBB\rCCC\n"
629 data_lf = b"AAA\nBBB\rCCC\n"
630 data_cr = b"AAA\rBBB\rCCC\r"
631 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000632 save_linesep = os.linesep
633 try:
634 for os.linesep, newline, expected in [
635 ("\n", None, data_lf),
636 ("\r\n", None, data_crlf),
637 ("\n", "", data_lf),
638 ("\r\n", "", data_lf),
639 ("\n", "\n", data_lf),
640 ("\r\n", "\n", data_lf),
641 ("\n", "\r", data_cr),
642 ("\r\n", "\r", data_cr),
643 ("\n", "\r\n", data_crlf),
644 ("\r\n", "\r\n", data_crlf),
645 ]:
646 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000647 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000648 txt.write(data)
649 txt.close()
650 self.assertEquals(buf.getvalue(), expected)
651 finally:
652 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000653
Guido van Rossum9b76da62007-04-11 01:09:03 +0000654 # Systematic tests of the text I/O API
655
656 def testBasicIO(self):
657 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
658 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
659 f = io.open(test_support.TESTFN, "w+", encoding=enc)
660 f._CHUNK_SIZE = chunksize
661 self.assertEquals(f.write("abc"), 3)
662 f.close()
663 f = io.open(test_support.TESTFN, "r+", encoding=enc)
664 f._CHUNK_SIZE = chunksize
665 self.assertEquals(f.tell(), 0)
666 self.assertEquals(f.read(), "abc")
667 cookie = f.tell()
668 self.assertEquals(f.seek(0), 0)
669 self.assertEquals(f.read(2), "ab")
670 self.assertEquals(f.read(1), "c")
671 self.assertEquals(f.read(1), "")
672 self.assertEquals(f.read(), "")
673 self.assertEquals(f.tell(), cookie)
674 self.assertEquals(f.seek(0), 0)
675 self.assertEquals(f.seek(0, 2), cookie)
676 self.assertEquals(f.write("def"), 3)
677 self.assertEquals(f.seek(cookie), cookie)
678 self.assertEquals(f.read(), "def")
679 if enc.startswith("utf"):
680 self.multi_line_test(f, enc)
681 f.close()
682
683 def multi_line_test(self, f, enc):
684 f.seek(0)
685 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000686 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000687 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000688 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000689 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000690 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000691 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000692 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000693 wlines.append((f.tell(), line))
694 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000695 f.seek(0)
696 rlines = []
697 while True:
698 pos = f.tell()
699 line = f.readline()
700 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000701 break
702 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000703 self.assertEquals(rlines, wlines)
704
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000705 def testTelling(self):
706 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
707 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000708 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000709 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000710 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000711 p2 = f.tell()
712 f.seek(0)
713 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000714 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000715 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000716 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000717 self.assertEquals(f.tell(), p2)
718 f.seek(0)
719 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000720 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000721 self.assertRaises(IOError, f.tell)
722 self.assertEquals(f.tell(), p2)
723 f.close()
724
Guido van Rossumd76e7792007-04-17 02:38:04 +0000725 def testSeeking(self):
726 chunk_size = io.TextIOWrapper._CHUNK_SIZE
727 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000728 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000729 prefix = bytes(u_prefix.encode("utf-8"))
730 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000731 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000732 suffix = bytes(u_suffix.encode("utf-8"))
733 line = prefix + suffix
734 f = io.open(test_support.TESTFN, "wb")
735 f.write(line*2)
736 f.close()
737 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
738 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000739 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000740 self.assertEquals(f.tell(), prefix_size)
741 self.assertEquals(f.readline(), u_suffix)
742
743 def testSeekingToo(self):
744 # Regression test for a specific bug
745 data = b'\xe0\xbf\xbf\n'
746 f = io.open(test_support.TESTFN, "wb")
747 f.write(data)
748 f.close()
749 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
750 f._CHUNK_SIZE # Just test that it exists
751 f._CHUNK_SIZE = 2
752 f.readline()
753 f.tell()
754
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000755 def timingTest(self):
756 timer = time.time
757 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000758 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000759 nlines = 10000
760 nchars = len(line)
761 nbytes = len(line.encode(enc))
762 for chunk_size in (32, 64, 128, 256):
763 f = io.open(test_support.TESTFN, "w+", encoding=enc)
764 f._CHUNK_SIZE = chunk_size
765 t0 = timer()
766 for i in range(nlines):
767 f.write(line)
768 f.flush()
769 t1 = timer()
770 f.seek(0)
771 for line in f:
772 pass
773 t2 = timer()
774 f.seek(0)
775 while f.readline():
776 pass
777 t3 = timer()
778 f.seek(0)
779 while f.readline():
780 f.tell()
781 t4 = timer()
782 f.close()
783 if test_support.verbose:
784 print("\nTiming test: %d lines of %d characters (%d bytes)" %
785 (nlines, nchars, nbytes))
786 print("File chunk size: %6s" % f._CHUNK_SIZE)
787 print("Writing: %6.3f seconds" % (t1-t0))
788 print("Reading using iteration: %6.3f seconds" % (t2-t1))
789 print("Reading using readline(): %6.3f seconds" % (t3-t2))
790 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
791
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000792 def testReadOneByOne(self):
793 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
794 reads = ""
795 while True:
796 c = txt.read(1)
797 if not c:
798 break
799 reads += c
800 self.assertEquals(reads, "AA\nBB")
801
802 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
803 def testReadByChunk(self):
804 # make sure "\r\n" straddles 128 char boundary.
805 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
806 reads = ""
807 while True:
808 c = txt.read(128)
809 if not c:
810 break
811 reads += c
812 self.assertEquals(reads, "A"*127+"\nB")
813
814 def test_issue1395_1(self):
815 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
816
817 # read one char at a time
818 reads = ""
819 while True:
820 c = txt.read(1)
821 if not c:
822 break
823 reads += c
824 self.assertEquals(reads, self.normalized)
825
826 def test_issue1395_2(self):
827 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
828 txt._CHUNK_SIZE = 4
829
830 reads = ""
831 while True:
832 c = txt.read(4)
833 if not c:
834 break
835 reads += c
836 self.assertEquals(reads, self.normalized)
837
838 def test_issue1395_3(self):
839 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
840 txt._CHUNK_SIZE = 4
841
842 reads = txt.read(4)
843 reads += txt.read(4)
844 reads += txt.readline()
845 reads += txt.readline()
846 reads += txt.readline()
847 self.assertEquals(reads, self.normalized)
848
849 def test_issue1395_4(self):
850 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
851 txt._CHUNK_SIZE = 4
852
853 reads = txt.read(4)
854 reads += txt.read()
855 self.assertEquals(reads, self.normalized)
856
857 def test_issue1395_5(self):
858 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
859 txt._CHUNK_SIZE = 4
860
861 reads = txt.read(4)
862 pos = txt.tell()
863 txt.seek(0)
864 txt.seek(pos)
865 self.assertEquals(txt.read(4), "BBB\n")
866
867 def test_newline_decoder(self):
868 import codecs
869 decoder = codecs.getincrementaldecoder("utf-8")()
870 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
871
872 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
873
874 self.assertEquals(decoder.decode(b'\xe8'), "")
875 self.assertEquals(decoder.decode(b'\xa2'), "")
876 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
877
878 self.assertEquals(decoder.decode(b'\xe8'), "")
879 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
880
881 decoder.setstate((b'', 0))
882 self.assertEquals(decoder.decode(b'\n'), "\n")
883 self.assertEquals(decoder.decode(b'\r'), "")
884 self.assertEquals(decoder.decode(b'', final=True), "\n")
885 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
886
887 self.assertEquals(decoder.decode(b'\r'), "")
888 self.assertEquals(decoder.decode(b'a'), "\na")
889
890 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
891 self.assertEquals(decoder.decode(b'\r'), "")
892 self.assertEquals(decoder.decode(b'\r'), "\n")
893 self.assertEquals(decoder.decode(b'\na'), "\na")
894
895 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
896 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
897 self.assertEquals(decoder.decode(b'\n'), "\n")
898 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
899 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000900
Guido van Rossum01a27522007-03-07 01:00:12 +0000901# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000902
Guido van Rossum5abbf752007-08-27 17:39:33 +0000903class MiscIOTest(unittest.TestCase):
904
905 def testImport__all__(self):
906 for name in io.__all__:
907 obj = getattr(io, name, None)
908 self.assert_(obj is not None, name)
909 if name == "open":
910 continue
911 elif "error" in name.lower():
912 self.assert_(issubclass(obj, Exception), name)
913 else:
914 self.assert_(issubclass(obj, io.IOBase))
915
916
Guido van Rossum28524c72007-02-27 05:47:44 +0000917def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000918 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
919 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000920 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +0000921 BufferedRandomTest, TextIOWrapperTest,
922 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000923
924if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000925 unittest.main()