blob: 27814a0fda85d6e526b3c3e21b064fcf19ee5a4b [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000011import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000012import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000013
Guido van Rossuma9e20242007-03-08 00:43:48 +000014
Guido van Rossum53807da2007-04-10 19:01:47 +000015class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000017 def __init__(self, read_stack=()):
18 self._read_stack = list(read_stack)
19 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000020
21 def read(self, n=None):
22 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000023 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000024 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000025 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026
Guido van Rossum01a27522007-03-07 01:00:12 +000027 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000028 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000029 return len(b)
30
31 def writable(self):
32 return True
33
Guido van Rossum68bbcd22007-02-27 17:19:33 +000034 def fileno(self):
35 return 42
36
37 def readable(self):
38 return True
39
Guido van Rossum01a27522007-03-07 01:00:12 +000040 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000041 return True
42
Guido van Rossum01a27522007-03-07 01:00:12 +000043 def seek(self, pos, whence):
44 pass
45
46 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000047 return 42
48
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum78892e42007-04-06 17:31:18 +000050class MockFileIO(io.BytesIO):
51
52 def __init__(self, data):
53 self.read_history = []
54 io.BytesIO.__init__(self, data)
55
56 def read(self, n=None):
57 res = io.BytesIO.read(self, n)
58 self.read_history.append(None if res is None else len(res))
59 return res
60
61
Guido van Rossum01a27522007-03-07 01:00:12 +000062class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000063
Guido van Rossum53807da2007-04-10 19:01:47 +000064 def __init__(self, blocking_script):
65 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000066 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000069 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000070 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000071 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000072 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 else:
74 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000075
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def writable(self):
77 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078
Guido van Rossuma9e20242007-03-08 00:43:48 +000079
Guido van Rossum28524c72007-02-27 05:47:44 +000080class IOTest(unittest.TestCase):
81
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000082 def tearDown(self):
83 test_support.unlink(test_support.TESTFN)
84
Guido van Rossum28524c72007-02-27 05:47:44 +000085 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000086 self.assertEqual(f.write(b"blah."), 5)
87 self.assertEqual(f.seek(0), 0)
88 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000089 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000090 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000091 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000092 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000093 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000094 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000095 self.assertEqual(f.seek(-1, 2), 13)
96 self.assertEqual(f.tell(), 13)
97 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000098 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +000099 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000100
Guido van Rossum9b76da62007-04-11 01:09:03 +0000101 def read_ops(self, f, buffered=False):
102 data = f.read(5)
103 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000104 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000105 self.assertEqual(f.readinto(data), 5)
106 self.assertEqual(data, b" worl")
107 self.assertEqual(f.readinto(data), 2)
108 self.assertEqual(len(data), 5)
109 self.assertEqual(data[:2], b"d\n")
110 self.assertEqual(f.seek(0), 0)
111 self.assertEqual(f.read(20), b"hello world\n")
112 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000113 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000114 self.assertEqual(f.seek(-6, 2), 6)
115 self.assertEqual(f.read(5), b"world")
116 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000117 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000118 self.assertEqual(f.seek(-6, 1), 5)
119 self.assertEqual(f.read(5), b" worl")
120 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000121 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000122 if buffered:
123 f.seek(0)
124 self.assertEqual(f.read(), b"hello world\n")
125 f.seek(6)
126 self.assertEqual(f.read(), b"world\n")
127 self.assertEqual(f.read(), b"")
128
Guido van Rossum34d69e52007-04-10 20:08:41 +0000129 LARGE = 2**31
130
Guido van Rossum53807da2007-04-10 19:01:47 +0000131 def large_file_ops(self, f):
132 assert f.readable()
133 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 self.assertEqual(f.seek(self.LARGE), self.LARGE)
135 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000136 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000137 self.assertEqual(f.tell(), self.LARGE + 3)
138 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000139 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000140 self.assertEqual(f.tell(), self.LARGE + 2)
141 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000142 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000143 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000144 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
145 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000146 self.assertEqual(f.read(2), b"x")
147
Guido van Rossum28524c72007-02-27 05:47:44 +0000148 def test_raw_file_io(self):
149 f = io.open(test_support.TESTFN, "wb", buffering=0)
150 self.assertEqual(f.readable(), False)
151 self.assertEqual(f.writable(), True)
152 self.assertEqual(f.seekable(), True)
153 self.write_ops(f)
154 f.close()
155 f = io.open(test_support.TESTFN, "rb", buffering=0)
156 self.assertEqual(f.readable(), True)
157 self.assertEqual(f.writable(), False)
158 self.assertEqual(f.seekable(), True)
159 self.read_ops(f)
160 f.close()
161
Guido van Rossum87429772007-04-10 21:06:59 +0000162 def test_buffered_file_io(self):
163 f = io.open(test_support.TESTFN, "wb")
164 self.assertEqual(f.readable(), False)
165 self.assertEqual(f.writable(), True)
166 self.assertEqual(f.seekable(), True)
167 self.write_ops(f)
168 f.close()
169 f = io.open(test_support.TESTFN, "rb")
170 self.assertEqual(f.readable(), True)
171 self.assertEqual(f.writable(), False)
172 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000173 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000174 f.close()
175
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000176 def test_readline(self):
177 f = io.open(test_support.TESTFN, "wb")
178 f.write(b"abc\ndef\nxyzzy\nfoo")
179 f.close()
180 f = io.open(test_support.TESTFN, "rb")
181 self.assertEqual(f.readline(), b"abc\n")
182 self.assertEqual(f.readline(10), b"def\n")
183 self.assertEqual(f.readline(2), b"xy")
184 self.assertEqual(f.readline(4), b"zzy\n")
185 self.assertEqual(f.readline(), b"foo")
186 f.close()
187
Guido van Rossum28524c72007-02-27 05:47:44 +0000188 def test_raw_bytes_io(self):
189 f = io.BytesIO()
190 self.write_ops(f)
191 data = f.getvalue()
192 self.assertEqual(data, b"hello world\n")
193 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000194 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000195
Guido van Rossum53807da2007-04-10 19:01:47 +0000196 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000197 # On Windows and Mac OSX this test comsumes large resources; It takes
198 # a long time to build the >2GB file and takes >2GB of disk space
199 # therefore the resource must be enabled to run this test.
200 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
201 if not test_support.is_resource_enabled("largefile"):
202 print("\nTesting large file ops skipped on %s." % sys.platform,
203 file=sys.stderr)
204 print("It requires %d bytes and a long time." % self.LARGE,
205 file=sys.stderr)
206 print("Use 'regrtest.py -u largefile test_io' to run it.",
207 file=sys.stderr)
208 return
Guido van Rossum87429772007-04-10 21:06:59 +0000209 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000210 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000211 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000212 f = io.open(test_support.TESTFN, "w+b")
213 self.large_file_ops(f)
214 f.close()
215
216 def test_with_open(self):
217 for bufsize in (0, 1, 100):
218 f = None
219 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000220 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000221 self.assertEqual(f.closed, True)
222 f = None
223 try:
224 with open(test_support.TESTFN, "wb", bufsize) as f:
225 1/0
226 except ZeroDivisionError:
227 self.assertEqual(f.closed, True)
228 else:
229 self.fail("1/0 didn't raise an exception")
230
231 def test_destructor(self):
232 record = []
233 class MyFileIO(io.FileIO):
234 def __del__(self):
235 record.append(1)
236 io.FileIO.__del__(self)
237 def close(self):
238 record.append(2)
239 io.FileIO.close(self)
240 def flush(self):
241 record.append(3)
242 io.FileIO.flush(self)
243 f = MyFileIO(test_support.TESTFN, "w")
244 f.write("xxx")
245 del f
246 self.assertEqual(record, [1, 2, 3])
247
248 def test_close_flushes(self):
249 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000250 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000251 f.close()
252 f = io.open(test_support.TESTFN, "rb")
253 self.assertEqual(f.read(), b"xxx")
254 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000255
Guido van Rossumd4103952007-04-12 05:44:49 +0000256 def test_array_writes(self):
257 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000258 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000259 f = io.open(test_support.TESTFN, "wb", 0)
260 self.assertEqual(f.write(a), n)
261 f.close()
262 f = io.open(test_support.TESTFN, "wb")
263 self.assertEqual(f.write(a), n)
264 f.close()
265
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000266 def test_closefd(self):
267 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
268 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000269
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000270class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000271
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000272 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000273 buf = self.buftype("1234567890")
274 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000275
276 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000277 buf = self.buftype("1234567890")
278 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000279
280 self.assertEquals(buf[:1], bytesIo.read(1))
281 self.assertEquals(buf[1:5], bytesIo.read(4))
282 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000283 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000284
285 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000286 buf = self.buftype("1234567890")
287 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288
289 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000290 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000291
292 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000293 buf = self.buftype("1234567890")
294 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000295
296 bytesIo.read(5)
297 bytesIo.seek(0)
298 self.assertEquals(buf, bytesIo.read())
299
300 bytesIo.seek(3)
301 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000302 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000303
304 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000305 buf = self.buftype("1234567890")
306 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000307
308 self.assertEquals(0, bytesIo.tell())
309 bytesIo.seek(5)
310 self.assertEquals(5, bytesIo.tell())
311 bytesIo.seek(10000)
312 self.assertEquals(10000, bytesIo.tell())
313
Guido van Rossuma9e20242007-03-08 00:43:48 +0000314
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000315class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000316 @staticmethod
317 def buftype(s):
318 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000319 ioclass = io.BytesIO
320 EOF = b""
321
322
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000323class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000324 buftype = str
325 ioclass = io.StringIO
326 EOF = ""
327
328
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000329class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000330
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000331 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000333 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000334
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000335 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000336
Guido van Rossum78892e42007-04-06 17:31:18 +0000337 def testBuffering(self):
338 data = b"abcdefghi"
339 dlen = len(data)
340
341 tests = [
342 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
343 [ 100, [ 3, 3, 3], [ dlen ] ],
344 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
345 ]
346
347 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000348 rawio = MockFileIO(data)
349 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000350 pos = 0
351 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000352 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000353 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000354 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000355
Guido van Rossum01a27522007-03-07 01:00:12 +0000356 def testReadNonBlocking(self):
357 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000359 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000360
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 self.assertEquals(b"abcd", bufio.read(6))
362 self.assertEquals(b"e", bufio.read(1))
363 self.assertEquals(b"fg", bufio.read())
364 self.assert_(None is bufio.read())
365 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000366
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000367 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000369 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000371 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000372
373 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000374 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000375 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000376
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000377 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000378
379 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000380 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000381 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000382
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000383 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000384
385 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000386 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387 # this test. Else, write it.
388 pass
389
Guido van Rossuma9e20242007-03-08 00:43:48 +0000390
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000391class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000392
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000393 def testWrite(self):
394 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000396 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000397
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000398 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000399
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000400 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000401
402 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000403 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000404 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000405
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000406 bufio.write(b"abc")
407 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000408
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000410
Guido van Rossum01a27522007-03-07 01:00:12 +0000411 def testWriteNonBlocking(self):
412 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000413 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000414
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000415 bufio.write(b"asdf")
416 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000417 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
418
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000419 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000420 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000421 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000422 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
424
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000425 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000426
Guido van Rossum78892e42007-04-06 17:31:18 +0000427 # XXX I don't like this test. It relies too heavily on how the
428 # algorithm actually works, which we might change. Refactor
429 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000430
431 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000432 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000433 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000434
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000435 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000436
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000437 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000438 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000439 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000440
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000441 bufio.write(b"abc")
442 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000443
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000444 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000445
Guido van Rossuma9e20242007-03-08 00:43:48 +0000446
Guido van Rossum01a27522007-03-07 01:00:12 +0000447class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000448
Guido van Rossum01a27522007-03-07 01:00:12 +0000449 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000450 r = MockRawIO(())
451 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000452 pair = io.BufferedRWPair(r, w)
453
454 # XXX need implementation
455
Guido van Rossuma9e20242007-03-08 00:43:48 +0000456
Guido van Rossum78892e42007-04-06 17:31:18 +0000457class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000458
Guido van Rossum01a27522007-03-07 01:00:12 +0000459 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000460 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000461 rw = io.BufferedRandom(raw, 8, 12)
462
463 self.assertEqual(b"as", rw.read(2))
464 rw.write(b"ddd")
465 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000466 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000467 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000468 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000469
470 def testSeekAndTell(self):
471 raw = io.BytesIO(b"asdfghjkl")
472 rw = io.BufferedRandom(raw)
473
474 self.assertEquals(b"as", rw.read(2))
475 self.assertEquals(2, rw.tell())
476 rw.seek(0, 0)
477 self.assertEquals(b"asdf", rw.read(4))
478
479 rw.write(b"asdf")
480 rw.seek(0, 0)
481 self.assertEquals(b"asdfasdfl", rw.read())
482 self.assertEquals(9, rw.tell())
483 rw.seek(-4, 2)
484 self.assertEquals(5, rw.tell())
485 rw.seek(2, 1)
486 self.assertEquals(7, rw.tell())
487 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000488 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000489
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000490# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
491# properties:
492# - A single output character can correspond to many bytes of input.
493# - The number of input bytes to complete the character can be
494# undetermined until the last input byte is received.
495# - The number of input bytes can vary depending on previous input.
496# - A single input byte can correspond to many characters of output.
497# - The number of output characters can be undetermined until the
498# last input byte is received.
499# - The number of output characters can vary depending on previous input.
500
501class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
502 """
503 For testing seek/tell behavior with a stateful, buffering decoder.
504
505 Input is a sequence of words. Words may be fixed-length (length set
506 by input) or variable-length (period-terminated). In variable-length
507 mode, extra periods are ignored. Possible words are:
508 - 'i' followed by a number sets the input length, I (maximum 99).
509 When I is set to 0, words are space-terminated.
510 - 'o' followed by a number sets the output length, O (maximum 99).
511 - Any other word is converted into a word followed by a period on
512 the output. The output word consists of the input word truncated
513 or padded out with hyphens to make its length equal to O. If O
514 is 0, the word is output verbatim without truncating or padding.
515 I and O are initially set to 1. When I changes, any buffered input is
516 re-scanned according to the new I. EOF also terminates the last word.
517 """
518
519 def __init__(self, errors='strict'):
520 codecs.IncrementalEncoder.__init__(self, errors)
521 self.reset()
522
523 def __repr__(self):
524 return '<SID %x>' % id(self)
525
526 def reset(self):
527 self.i = 1
528 self.o = 1
529 self.buffer = bytearray()
530
531 def getstate(self):
532 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
533 return bytes(self.buffer), i*100 + o
534
535 def setstate(self, state):
536 buffer, io = state
537 self.buffer = bytearray(buffer)
538 i, o = divmod(io, 100)
539 self.i, self.o = i ^ 1, o ^ 1
540
541 def decode(self, input, final=False):
542 output = ''
543 for b in input:
544 if self.i == 0: # variable-length, terminated with period
545 if b == ord('.'):
546 if self.buffer:
547 output += self.process_word()
548 else:
549 self.buffer.append(b)
550 else: # fixed-length, terminate after self.i bytes
551 self.buffer.append(b)
552 if len(self.buffer) == self.i:
553 output += self.process_word()
554 if final and self.buffer: # EOF terminates the last word
555 output += self.process_word()
556 return output
557
558 def process_word(self):
559 output = ''
560 if self.buffer[0] == ord('i'):
561 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
562 elif self.buffer[0] == ord('o'):
563 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
564 else:
565 output = self.buffer.decode('ascii')
566 if len(output) < self.o:
567 output += '-'*self.o # pad out with hyphens
568 if self.o:
569 output = output[:self.o] # truncate to output length
570 output += '.'
571 self.buffer = bytearray()
572 return output
573
574class StatefulIncrementalDecoderTest(unittest.TestCase):
575 """
576 Make sure the StatefulIncrementalDecoder actually works.
577 """
578
579 test_cases = [
580 # I=1 fixed-length mode
581 (b'abcd', False, 'a.b.c.d.'),
582 # I=0, O=0, variable-length mode
583 (b'oiabcd', True, 'abcd.'),
584 # I=0, O=0, variable-length mode, should ignore extra periods
585 (b'oi...abcd...', True, 'abcd.'),
586 # I=0, O=6
587 (b'i.o6.xyz.', False, 'xyz---.'),
588 # I=2, O=6
589 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
590 # I=0, O=3
591 (b'i.o3.x.xyz.toolong.', False, 'x--.xyz.too.'),
592 # I=6, O=3
Ka-Ping Yee30cc8382008-03-18 07:01:49 +0000593 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
594 # I=5, O=8 with newlines
595 (b'i.o8.i5.abc\ndef\nghy\nz', True, 'abc\nd---.ef\ngh---.y\nz-----.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000596 ]
597
598 def testDecoder(self):
599 # Try a few one-shot test cases.
600 for input, eof, output in self.test_cases:
601 d = StatefulIncrementalDecoder()
602 self.assertEquals(d.decode(input, eof), output)
603
604 # Also test an unfinished decode, followed by forcing EOF.
605 d = StatefulIncrementalDecoder()
606 self.assertEquals(d.decode(b'oiabcd'), '')
607 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000608
609class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000610
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000611 def setUp(self):
612 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
613 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
614
Guido van Rossumd0712812007-04-11 16:32:43 +0000615 def tearDown(self):
616 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000617
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000618 def testLineBuffering(self):
619 r = io.BytesIO()
620 b = io.BufferedWriter(r, 1000)
621 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
622 t.write("X")
623 self.assertEquals(r.getvalue(), b"") # No flush happened
624 t.write("Y\nZ")
625 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
626 t.write("A\rB")
627 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
628
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000629 def testEncodingErrorsReading(self):
630 # (1) default
631 b = io.BytesIO(b"abc\n\xff\n")
632 t = io.TextIOWrapper(b, encoding="ascii")
633 self.assertRaises(UnicodeError, t.read)
634 # (2) explicit strict
635 b = io.BytesIO(b"abc\n\xff\n")
636 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
637 self.assertRaises(UnicodeError, t.read)
638 # (3) ignore
639 b = io.BytesIO(b"abc\n\xff\n")
640 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
641 self.assertEquals(t.read(), "abc\n\n")
642 # (4) replace
643 b = io.BytesIO(b"abc\n\xff\n")
644 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
645 self.assertEquals(t.read(), "abc\n\ufffd\n")
646
647 def testEncodingErrorsWriting(self):
648 # (1) default
649 b = io.BytesIO()
650 t = io.TextIOWrapper(b, encoding="ascii")
651 self.assertRaises(UnicodeError, t.write, "\xff")
652 # (2) explicit strict
653 b = io.BytesIO()
654 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
655 self.assertRaises(UnicodeError, t.write, "\xff")
656 # (3) ignore
657 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000658 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
659 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000660 t.write("abc\xffdef\n")
661 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000662 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000663 # (4) replace
664 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000665 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
666 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000667 t.write("abc\xffdef\n")
668 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000669 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000670
Guido van Rossum8358db22007-08-18 21:39:55 +0000671 def testNewlinesInput(self):
672 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
673 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
674 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000675 (None, normalized.decode("ascii").splitlines(True)),
676 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000677 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
678 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
679 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
680 ]:
681 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000682 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000683 self.assertEquals(txt.readlines(), expected)
684 txt.seek(0)
685 self.assertEquals(txt.read(), "".join(expected))
686
687 def testNewlinesOutput(self):
688 testdict = {
689 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
690 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
691 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
692 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
693 }
694 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
695 for newline, expected in tests:
696 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000697 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000698 txt.write("AAA\nB")
699 txt.write("BB\nCCC\n")
700 txt.write("X\rY\r\nZ")
701 txt.flush()
702 self.assertEquals(buf.getvalue(), expected)
703
Guido van Rossum78892e42007-04-06 17:31:18 +0000704 def testNewlines(self):
705 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
706
707 tests = [
708 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000709 [ '', input_lines ],
710 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
711 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
712 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000713 ]
714
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000715 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000716
Guido van Rossum8358db22007-08-18 21:39:55 +0000717 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000718 # character in TextIOWrapper._pending_line.
719 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000720 # XXX: str.encode() should return bytes
721 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000722 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000723 for bufsize in range(1, 10):
724 for newline, exp_lines in tests:
725 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000726 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000727 encoding=encoding)
728 if do_reads:
729 got_lines = []
730 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000731 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000732 if c2 == '':
733 break
734 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000735 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000736 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000737 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000738
739 for got_line, exp_line in zip(got_lines, exp_lines):
740 self.assertEquals(got_line, exp_line)
741 self.assertEquals(len(got_lines), len(exp_lines))
742
Guido van Rossum8358db22007-08-18 21:39:55 +0000743 def testNewlinesInput(self):
744 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
745 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
746 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000747 (None, normalized.decode("ascii").splitlines(True)),
748 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000749 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
750 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
751 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
752 ]:
753 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000754 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000755 self.assertEquals(txt.readlines(), expected)
756 txt.seek(0)
757 self.assertEquals(txt.read(), "".join(expected))
758
759 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000760 data = "AAA\nBBB\rCCC\n"
761 data_lf = b"AAA\nBBB\rCCC\n"
762 data_cr = b"AAA\rBBB\rCCC\r"
763 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000764 save_linesep = os.linesep
765 try:
766 for os.linesep, newline, expected in [
767 ("\n", None, data_lf),
768 ("\r\n", None, data_crlf),
769 ("\n", "", data_lf),
770 ("\r\n", "", data_lf),
771 ("\n", "\n", data_lf),
772 ("\r\n", "\n", data_lf),
773 ("\n", "\r", data_cr),
774 ("\r\n", "\r", data_cr),
775 ("\n", "\r\n", data_crlf),
776 ("\r\n", "\r\n", data_crlf),
777 ]:
778 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000779 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000780 txt.write(data)
781 txt.close()
782 self.assertEquals(buf.getvalue(), expected)
783 finally:
784 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000785
Guido van Rossum9b76da62007-04-11 01:09:03 +0000786 # Systematic tests of the text I/O API
787
788 def testBasicIO(self):
789 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
790 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
791 f = io.open(test_support.TESTFN, "w+", encoding=enc)
792 f._CHUNK_SIZE = chunksize
793 self.assertEquals(f.write("abc"), 3)
794 f.close()
795 f = io.open(test_support.TESTFN, "r+", encoding=enc)
796 f._CHUNK_SIZE = chunksize
797 self.assertEquals(f.tell(), 0)
798 self.assertEquals(f.read(), "abc")
799 cookie = f.tell()
800 self.assertEquals(f.seek(0), 0)
801 self.assertEquals(f.read(2), "ab")
802 self.assertEquals(f.read(1), "c")
803 self.assertEquals(f.read(1), "")
804 self.assertEquals(f.read(), "")
805 self.assertEquals(f.tell(), cookie)
806 self.assertEquals(f.seek(0), 0)
807 self.assertEquals(f.seek(0, 2), cookie)
808 self.assertEquals(f.write("def"), 3)
809 self.assertEquals(f.seek(cookie), cookie)
810 self.assertEquals(f.read(), "def")
811 if enc.startswith("utf"):
812 self.multi_line_test(f, enc)
813 f.close()
814
815 def multi_line_test(self, f, enc):
816 f.seek(0)
817 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000818 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000819 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000820 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000821 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000822 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000823 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000824 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000825 wlines.append((f.tell(), line))
826 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000827 f.seek(0)
828 rlines = []
829 while True:
830 pos = f.tell()
831 line = f.readline()
832 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000833 break
834 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000835 self.assertEquals(rlines, wlines)
836
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000837 def testTelling(self):
838 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
839 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000840 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000841 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000842 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000843 p2 = f.tell()
844 f.seek(0)
845 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000846 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000847 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000848 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000849 self.assertEquals(f.tell(), p2)
850 f.seek(0)
851 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000852 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000853 self.assertRaises(IOError, f.tell)
854 self.assertEquals(f.tell(), p2)
855 f.close()
856
Guido van Rossumd76e7792007-04-17 02:38:04 +0000857 def testSeeking(self):
858 chunk_size = io.TextIOWrapper._CHUNK_SIZE
859 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000860 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000861 prefix = bytes(u_prefix.encode("utf-8"))
862 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000863 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000864 suffix = bytes(u_suffix.encode("utf-8"))
865 line = prefix + suffix
866 f = io.open(test_support.TESTFN, "wb")
867 f.write(line*2)
868 f.close()
869 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
870 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000871 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000872 self.assertEquals(f.tell(), prefix_size)
873 self.assertEquals(f.readline(), u_suffix)
874
875 def testSeekingToo(self):
876 # Regression test for a specific bug
877 data = b'\xe0\xbf\xbf\n'
878 f = io.open(test_support.TESTFN, "wb")
879 f.write(data)
880 f.close()
881 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
882 f._CHUNK_SIZE # Just test that it exists
883 f._CHUNK_SIZE = 2
884 f.readline()
885 f.tell()
886
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000887 def testSeekAndTell(self):
888 """Test seek/tell using the StatefulIncrementalDecoder."""
889
890 def lookupTestDecoder(name):
891 if self.codecEnabled and name == 'test_decoder':
892 return codecs.CodecInfo(
893 name='test_decoder', encode=None, decode=None,
894 incrementalencoder=None,
Ka-Ping Yee30cc8382008-03-18 07:01:49 +0000895 incrementaldecoder=StatefulIncrementalDecoder,
896 streamreader=None, streamwriter=None)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000897
898 def testSeekAndTellWithData(data, min_pos=0):
899 """Tell/seek to various points within a data stream and ensure
900 that the decoded data returned by read() is consistent."""
901 f = io.open(test_support.TESTFN, 'wb')
902 f.write(data)
903 f.close()
904 f = io.open(test_support.TESTFN, encoding='test_decoder')
905 decoded = f.read()
906 f.close()
907
Ka-Ping Yee30cc8382008-03-18 07:01:49 +0000908 # Use read() to move to various positions in the input;
909 # then tell, read some more data, and seek back.
910 for i in range(min_pos, len(decoded) + 1): # to read before tell
911 for j in [1, 5, len(decoded)]: # to read after tell
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000912 f = io.open(test_support.TESTFN, encoding='test_decoder')
913 self.assertEquals(f.read(i), decoded[:i])
914 cookie = f.tell()
915 self.assertEquals(f.read(j), decoded[i:i + j])
916 f.seek(cookie)
Ka-Ping Yee30cc8382008-03-18 07:01:49 +0000917 self.assertEquals(f.tell(), cookie)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000918 self.assertEquals(f.read(), decoded[i:])
919 f.close()
920
Ka-Ping Yee30cc8382008-03-18 07:01:49 +0000921 lines = len(decoded.split('\n'))
922
923 # Use readline() to move to various positions in the input;
924 # then tell, read some more data, and seek back.
925 for limit in [-1, 4, 128]: # 'limit' argument for readline()
926 for j in [1, 5, len(decoded)]: # to read after tell()
927 f = io.open(test_support.TESTFN, encoding='test_decoder')
928 text = ''
929 for k in range(lines): # repeatedly call readline()
930 line = f.readline(limit=limit)
931 if limit >= 0:
932 self.assert_(len(line) <= limit)
933 text += line
934 i = len(text)
935 self.assertEquals(text, decoded[:i])
936 cookie = f.tell()
937 self.assertEquals(f.read(j), decoded[i:i + j])
938 f.seek(cookie)
939 self.assertEquals(f.tell(), cookie)
940 self.assertEquals(f.read(), decoded[i:])
941 f.seek(cookie)
942 f.close()
943
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000944 # Register a special incremental decoder for testing.
945 codecs.register(lookupTestDecoder)
946 self.codecEnabled = 1
947
948 # Run the tests.
949 try:
950 # Try each test case.
951 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
952 testSeekAndTellWithData(input)
953
954 # Position each test case so that it crosses a chunk boundary.
955 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
956 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
957 offset = CHUNK_SIZE - len(input)//2
958 prefix = b'.'*offset
959 # Don't bother seeking into the prefix (takes too long).
960 min_pos = offset*2
961 testSeekAndTellWithData(prefix + input, min_pos)
962
963 # Ensure our test decoder won't interfere with subsequent tests.
964 finally:
965 self.codecEnabled = 0
966
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +0000967 def testEncodedWrites(self):
968 data = "1234567890"
969 tests = ("utf-16",
970 "utf-16-le",
971 "utf-16-be",
972 "utf-32",
973 "utf-32-le",
974 "utf-32-be")
975 for encoding in tests:
976 buf = io.BytesIO()
977 f = io.TextIOWrapper(buf, encoding=encoding)
978 # Check if the BOM is written only once (see issue1753).
979 f.write(data)
980 f.write(data)
981 f.seek(0)
982 self.assertEquals(f.read(), data * 2)
983 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
984
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000985 def timingTest(self):
986 timer = time.time
987 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000988 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000989 nlines = 10000
990 nchars = len(line)
991 nbytes = len(line.encode(enc))
992 for chunk_size in (32, 64, 128, 256):
993 f = io.open(test_support.TESTFN, "w+", encoding=enc)
994 f._CHUNK_SIZE = chunk_size
995 t0 = timer()
996 for i in range(nlines):
997 f.write(line)
998 f.flush()
999 t1 = timer()
1000 f.seek(0)
1001 for line in f:
1002 pass
1003 t2 = timer()
1004 f.seek(0)
1005 while f.readline():
1006 pass
1007 t3 = timer()
1008 f.seek(0)
1009 while f.readline():
1010 f.tell()
1011 t4 = timer()
1012 f.close()
1013 if test_support.verbose:
1014 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1015 (nlines, nchars, nbytes))
1016 print("File chunk size: %6s" % f._CHUNK_SIZE)
1017 print("Writing: %6.3f seconds" % (t1-t0))
1018 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1019 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1020 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1021
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001022 def testReadOneByOne(self):
1023 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1024 reads = ""
1025 while True:
1026 c = txt.read(1)
1027 if not c:
1028 break
1029 reads += c
1030 self.assertEquals(reads, "AA\nBB")
1031
1032 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1033 def testReadByChunk(self):
1034 # make sure "\r\n" straddles 128 char boundary.
1035 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1036 reads = ""
1037 while True:
1038 c = txt.read(128)
1039 if not c:
1040 break
1041 reads += c
1042 self.assertEquals(reads, "A"*127+"\nB")
1043
1044 def test_issue1395_1(self):
1045 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1046
1047 # read one char at a time
1048 reads = ""
1049 while True:
1050 c = txt.read(1)
1051 if not c:
1052 break
1053 reads += c
1054 self.assertEquals(reads, self.normalized)
1055
1056 def test_issue1395_2(self):
1057 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1058 txt._CHUNK_SIZE = 4
1059
1060 reads = ""
1061 while True:
1062 c = txt.read(4)
1063 if not c:
1064 break
1065 reads += c
1066 self.assertEquals(reads, self.normalized)
1067
1068 def test_issue1395_3(self):
1069 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1070 txt._CHUNK_SIZE = 4
1071
1072 reads = txt.read(4)
1073 reads += txt.read(4)
1074 reads += txt.readline()
1075 reads += txt.readline()
1076 reads += txt.readline()
1077 self.assertEquals(reads, self.normalized)
1078
1079 def test_issue1395_4(self):
1080 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1081 txt._CHUNK_SIZE = 4
1082
1083 reads = txt.read(4)
1084 reads += txt.read()
1085 self.assertEquals(reads, self.normalized)
1086
1087 def test_issue1395_5(self):
1088 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1089 txt._CHUNK_SIZE = 4
1090
1091 reads = txt.read(4)
1092 pos = txt.tell()
1093 txt.seek(0)
1094 txt.seek(pos)
1095 self.assertEquals(txt.read(4), "BBB\n")
1096
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001097 def test_issue2282(self):
1098 buffer = io.BytesIO(self.testdata)
1099 txt = io.TextIOWrapper(buffer, encoding="ascii")
1100
1101 self.assertEqual(buffer.seekable(), txt.seekable())
1102
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001103 def test_newline_decoder(self):
1104 import codecs
1105 decoder = codecs.getincrementaldecoder("utf-8")()
1106 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1107
1108 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1109
1110 self.assertEquals(decoder.decode(b'\xe8'), "")
1111 self.assertEquals(decoder.decode(b'\xa2'), "")
1112 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
1113
1114 self.assertEquals(decoder.decode(b'\xe8'), "")
1115 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1116
1117 decoder.setstate((b'', 0))
1118 self.assertEquals(decoder.decode(b'\n'), "\n")
1119 self.assertEquals(decoder.decode(b'\r'), "")
1120 self.assertEquals(decoder.decode(b'', final=True), "\n")
1121 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
1122
1123 self.assertEquals(decoder.decode(b'\r'), "")
1124 self.assertEquals(decoder.decode(b'a'), "\na")
1125
1126 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
1127 self.assertEquals(decoder.decode(b'\r'), "")
1128 self.assertEquals(decoder.decode(b'\r'), "\n")
1129 self.assertEquals(decoder.decode(b'\na'), "\na")
1130
1131 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
1132 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1133 self.assertEquals(decoder.decode(b'\n'), "\n")
1134 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
1135 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001136
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001137 decoder = codecs.getincrementaldecoder("utf-8")()
1138 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1139 self.assertEquals(decoder.newlines, None)
1140 decoder.decode(b"abc\n\r")
1141 self.assertEquals(decoder.newlines, '\n')
1142 decoder.decode(b"\nabc")
1143 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1144 decoder.decode(b"abc\r")
1145 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1146 decoder.decode(b"abc")
1147 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1148 decoder.decode(b"abc\r")
1149 decoder.reset()
1150 self.assertEquals(decoder.decode(b"abc"), "abc")
1151 self.assertEquals(decoder.newlines, None)
1152
Guido van Rossum01a27522007-03-07 01:00:12 +00001153# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001154
Guido van Rossum5abbf752007-08-27 17:39:33 +00001155class MiscIOTest(unittest.TestCase):
1156
1157 def testImport__all__(self):
1158 for name in io.__all__:
1159 obj = getattr(io, name, None)
1160 self.assert_(obj is not None, name)
1161 if name == "open":
1162 continue
1163 elif "error" in name.lower():
1164 self.assert_(issubclass(obj, Exception), name)
1165 else:
1166 self.assert_(issubclass(obj, io.IOBase))
1167
1168
Guido van Rossum28524c72007-02-27 05:47:44 +00001169def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +00001170 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1171 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +00001172 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +00001173 BufferedRandomTest, TextIOWrapperTest,
1174 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001175
1176if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001177 unittest.main()