blob: 38af35a1acd743010e01e9e4ce7bb1242cb30405 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000011import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000012import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000013
Guido van Rossuma9e20242007-03-08 00:43:48 +000014
Guido van Rossum53807da2007-04-10 19:01:47 +000015class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000017 def __init__(self, read_stack=()):
18 self._read_stack = list(read_stack)
19 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000020
21 def read(self, n=None):
22 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000023 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000024 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000025 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026
Guido van Rossum01a27522007-03-07 01:00:12 +000027 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000028 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000029 return len(b)
30
31 def writable(self):
32 return True
33
Guido van Rossum68bbcd22007-02-27 17:19:33 +000034 def fileno(self):
35 return 42
36
37 def readable(self):
38 return True
39
Guido van Rossum01a27522007-03-07 01:00:12 +000040 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000041 return True
42
Guido van Rossum01a27522007-03-07 01:00:12 +000043 def seek(self, pos, whence):
44 pass
45
46 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000047 return 42
48
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum78892e42007-04-06 17:31:18 +000050class MockFileIO(io.BytesIO):
51
52 def __init__(self, data):
53 self.read_history = []
54 io.BytesIO.__init__(self, data)
55
56 def read(self, n=None):
57 res = io.BytesIO.read(self, n)
58 self.read_history.append(None if res is None else len(res))
59 return res
60
61
Guido van Rossum01a27522007-03-07 01:00:12 +000062class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000063
Guido van Rossum53807da2007-04-10 19:01:47 +000064 def __init__(self, blocking_script):
65 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000066 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000069 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000070 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000071 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000072 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 else:
74 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000075
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def writable(self):
77 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078
Guido van Rossuma9e20242007-03-08 00:43:48 +000079
Guido van Rossum28524c72007-02-27 05:47:44 +000080class IOTest(unittest.TestCase):
81
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000082 def tearDown(self):
83 test_support.unlink(test_support.TESTFN)
84
Guido van Rossum28524c72007-02-27 05:47:44 +000085 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000086 self.assertEqual(f.write(b"blah."), 5)
87 self.assertEqual(f.seek(0), 0)
88 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000089 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000090 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000091 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000092 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000093 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000094 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000095 self.assertEqual(f.seek(-1, 2), 13)
96 self.assertEqual(f.tell(), 13)
97 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000098 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +000099 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000100
Guido van Rossum9b76da62007-04-11 01:09:03 +0000101 def read_ops(self, f, buffered=False):
102 data = f.read(5)
103 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000104 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000105 self.assertEqual(f.readinto(data), 5)
106 self.assertEqual(data, b" worl")
107 self.assertEqual(f.readinto(data), 2)
108 self.assertEqual(len(data), 5)
109 self.assertEqual(data[:2], b"d\n")
110 self.assertEqual(f.seek(0), 0)
111 self.assertEqual(f.read(20), b"hello world\n")
112 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000113 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000114 self.assertEqual(f.seek(-6, 2), 6)
115 self.assertEqual(f.read(5), b"world")
116 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000117 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000118 self.assertEqual(f.seek(-6, 1), 5)
119 self.assertEqual(f.read(5), b" worl")
120 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000121 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000122 if buffered:
123 f.seek(0)
124 self.assertEqual(f.read(), b"hello world\n")
125 f.seek(6)
126 self.assertEqual(f.read(), b"world\n")
127 self.assertEqual(f.read(), b"")
128
Guido van Rossum34d69e52007-04-10 20:08:41 +0000129 LARGE = 2**31
130
Guido van Rossum53807da2007-04-10 19:01:47 +0000131 def large_file_ops(self, f):
132 assert f.readable()
133 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 self.assertEqual(f.seek(self.LARGE), self.LARGE)
135 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000136 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000137 self.assertEqual(f.tell(), self.LARGE + 3)
138 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000139 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000140 self.assertEqual(f.tell(), self.LARGE + 2)
141 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000142 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000143 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000144 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
145 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000146 self.assertEqual(f.read(2), b"x")
147
Guido van Rossum28524c72007-02-27 05:47:44 +0000148 def test_raw_file_io(self):
149 f = io.open(test_support.TESTFN, "wb", buffering=0)
150 self.assertEqual(f.readable(), False)
151 self.assertEqual(f.writable(), True)
152 self.assertEqual(f.seekable(), True)
153 self.write_ops(f)
154 f.close()
155 f = io.open(test_support.TESTFN, "rb", buffering=0)
156 self.assertEqual(f.readable(), True)
157 self.assertEqual(f.writable(), False)
158 self.assertEqual(f.seekable(), True)
159 self.read_ops(f)
160 f.close()
161
Guido van Rossum87429772007-04-10 21:06:59 +0000162 def test_buffered_file_io(self):
163 f = io.open(test_support.TESTFN, "wb")
164 self.assertEqual(f.readable(), False)
165 self.assertEqual(f.writable(), True)
166 self.assertEqual(f.seekable(), True)
167 self.write_ops(f)
168 f.close()
169 f = io.open(test_support.TESTFN, "rb")
170 self.assertEqual(f.readable(), True)
171 self.assertEqual(f.writable(), False)
172 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000173 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000174 f.close()
175
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000176 def test_readline(self):
177 f = io.open(test_support.TESTFN, "wb")
178 f.write(b"abc\ndef\nxyzzy\nfoo")
179 f.close()
180 f = io.open(test_support.TESTFN, "rb")
181 self.assertEqual(f.readline(), b"abc\n")
182 self.assertEqual(f.readline(10), b"def\n")
183 self.assertEqual(f.readline(2), b"xy")
184 self.assertEqual(f.readline(4), b"zzy\n")
185 self.assertEqual(f.readline(), b"foo")
186 f.close()
187
Guido van Rossum28524c72007-02-27 05:47:44 +0000188 def test_raw_bytes_io(self):
189 f = io.BytesIO()
190 self.write_ops(f)
191 data = f.getvalue()
192 self.assertEqual(data, b"hello world\n")
193 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000194 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000195
Guido van Rossum53807da2007-04-10 19:01:47 +0000196 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000197 # On Windows and Mac OSX this test comsumes large resources; It takes
198 # a long time to build the >2GB file and takes >2GB of disk space
199 # therefore the resource must be enabled to run this test.
200 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
201 if not test_support.is_resource_enabled("largefile"):
202 print("\nTesting large file ops skipped on %s." % sys.platform,
203 file=sys.stderr)
204 print("It requires %d bytes and a long time." % self.LARGE,
205 file=sys.stderr)
206 print("Use 'regrtest.py -u largefile test_io' to run it.",
207 file=sys.stderr)
208 return
Guido van Rossum87429772007-04-10 21:06:59 +0000209 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000210 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000211 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000212 f = io.open(test_support.TESTFN, "w+b")
213 self.large_file_ops(f)
214 f.close()
215
216 def test_with_open(self):
217 for bufsize in (0, 1, 100):
218 f = None
219 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000220 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000221 self.assertEqual(f.closed, True)
222 f = None
223 try:
224 with open(test_support.TESTFN, "wb", bufsize) as f:
225 1/0
226 except ZeroDivisionError:
227 self.assertEqual(f.closed, True)
228 else:
229 self.fail("1/0 didn't raise an exception")
230
231 def test_destructor(self):
232 record = []
233 class MyFileIO(io.FileIO):
234 def __del__(self):
235 record.append(1)
236 io.FileIO.__del__(self)
237 def close(self):
238 record.append(2)
239 io.FileIO.close(self)
240 def flush(self):
241 record.append(3)
242 io.FileIO.flush(self)
243 f = MyFileIO(test_support.TESTFN, "w")
244 f.write("xxx")
245 del f
246 self.assertEqual(record, [1, 2, 3])
247
248 def test_close_flushes(self):
249 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000250 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000251 f.close()
252 f = io.open(test_support.TESTFN, "rb")
253 self.assertEqual(f.read(), b"xxx")
254 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000255
Guido van Rossumd4103952007-04-12 05:44:49 +0000256 def test_array_writes(self):
257 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000258 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000259 f = io.open(test_support.TESTFN, "wb", 0)
260 self.assertEqual(f.write(a), n)
261 f.close()
262 f = io.open(test_support.TESTFN, "wb")
263 self.assertEqual(f.write(a), n)
264 f.close()
265
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000266 def test_closefd(self):
267 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
268 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000269
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000270class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000271
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000272 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000273 buf = self.buftype("1234567890")
274 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000275
276 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000277 buf = self.buftype("1234567890")
278 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000279
280 self.assertEquals(buf[:1], bytesIo.read(1))
281 self.assertEquals(buf[1:5], bytesIo.read(4))
282 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000283 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000284
285 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000286 buf = self.buftype("1234567890")
287 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288
289 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000290 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000291
292 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000293 buf = self.buftype("1234567890")
294 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000295
296 bytesIo.read(5)
297 bytesIo.seek(0)
298 self.assertEquals(buf, bytesIo.read())
299
300 bytesIo.seek(3)
301 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000302 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000303
304 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000305 buf = self.buftype("1234567890")
306 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000307
308 self.assertEquals(0, bytesIo.tell())
309 bytesIo.seek(5)
310 self.assertEquals(5, bytesIo.tell())
311 bytesIo.seek(10000)
312 self.assertEquals(10000, bytesIo.tell())
313
Guido van Rossuma9e20242007-03-08 00:43:48 +0000314
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000315class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000316 @staticmethod
317 def buftype(s):
318 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000319 ioclass = io.BytesIO
320 EOF = b""
321
322
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000323class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000324 buftype = str
325 ioclass = io.StringIO
326 EOF = ""
327
328
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000329class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000330
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000331 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000333 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000334
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000335 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000336
Guido van Rossum78892e42007-04-06 17:31:18 +0000337 def testBuffering(self):
338 data = b"abcdefghi"
339 dlen = len(data)
340
341 tests = [
342 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
343 [ 100, [ 3, 3, 3], [ dlen ] ],
344 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
345 ]
346
347 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000348 rawio = MockFileIO(data)
349 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000350 pos = 0
351 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000352 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000353 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000354 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000355
Guido van Rossum01a27522007-03-07 01:00:12 +0000356 def testReadNonBlocking(self):
357 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000359 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000360
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 self.assertEquals(b"abcd", bufio.read(6))
362 self.assertEquals(b"e", bufio.read(1))
363 self.assertEquals(b"fg", bufio.read())
364 self.assert_(None is bufio.read())
365 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000366
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000367 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000369 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000371 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000372
373 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000374 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000375 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000376
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000377 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000378
379 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000380 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000381 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000382
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000383 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000384
385 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000386 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387 # this test. Else, write it.
388 pass
389
Guido van Rossuma9e20242007-03-08 00:43:48 +0000390
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000391class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000392
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000393 def testWrite(self):
394 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000396 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000397
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000398 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000399
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000400 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000401
402 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000403 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000404 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000405
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000406 bufio.write(b"abc")
407 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000408
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000410
Guido van Rossum01a27522007-03-07 01:00:12 +0000411 def testWriteNonBlocking(self):
412 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000413 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000414
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000415 bufio.write(b"asdf")
416 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000417 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
418
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000419 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000420 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000421 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000422 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
424
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000425 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000426
Guido van Rossum78892e42007-04-06 17:31:18 +0000427 # XXX I don't like this test. It relies too heavily on how the
428 # algorithm actually works, which we might change. Refactor
429 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000430
431 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000432 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000433 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000434
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000435 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000436
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000437 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000438 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000439 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000440
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000441 bufio.write(b"abc")
442 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000443
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000444 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000445
Guido van Rossuma9e20242007-03-08 00:43:48 +0000446
Guido van Rossum01a27522007-03-07 01:00:12 +0000447class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000448
Guido van Rossum01a27522007-03-07 01:00:12 +0000449 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000450 r = MockRawIO(())
451 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000452 pair = io.BufferedRWPair(r, w)
453
454 # XXX need implementation
455
Guido van Rossuma9e20242007-03-08 00:43:48 +0000456
Guido van Rossum78892e42007-04-06 17:31:18 +0000457class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000458
Guido van Rossum01a27522007-03-07 01:00:12 +0000459 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000460 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000461 rw = io.BufferedRandom(raw, 8, 12)
462
463 self.assertEqual(b"as", rw.read(2))
464 rw.write(b"ddd")
465 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000466 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000467 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000468 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000469
470 def testSeekAndTell(self):
471 raw = io.BytesIO(b"asdfghjkl")
472 rw = io.BufferedRandom(raw)
473
474 self.assertEquals(b"as", rw.read(2))
475 self.assertEquals(2, rw.tell())
476 rw.seek(0, 0)
477 self.assertEquals(b"asdf", rw.read(4))
478
479 rw.write(b"asdf")
480 rw.seek(0, 0)
481 self.assertEquals(b"asdfasdfl", rw.read())
482 self.assertEquals(9, rw.tell())
483 rw.seek(-4, 2)
484 self.assertEquals(5, rw.tell())
485 rw.seek(2, 1)
486 self.assertEquals(7, rw.tell())
487 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000488 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000489
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000490# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
491# properties:
492# - A single output character can correspond to many bytes of input.
493# - The number of input bytes to complete the character can be
494# undetermined until the last input byte is received.
495# - The number of input bytes can vary depending on previous input.
496# - A single input byte can correspond to many characters of output.
497# - The number of output characters can be undetermined until the
498# last input byte is received.
499# - The number of output characters can vary depending on previous input.
500
501class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
502 """
503 For testing seek/tell behavior with a stateful, buffering decoder.
504
505 Input is a sequence of words. Words may be fixed-length (length set
506 by input) or variable-length (period-terminated). In variable-length
507 mode, extra periods are ignored. Possible words are:
508 - 'i' followed by a number sets the input length, I (maximum 99).
509 When I is set to 0, words are space-terminated.
510 - 'o' followed by a number sets the output length, O (maximum 99).
511 - Any other word is converted into a word followed by a period on
512 the output. The output word consists of the input word truncated
513 or padded out with hyphens to make its length equal to O. If O
514 is 0, the word is output verbatim without truncating or padding.
515 I and O are initially set to 1. When I changes, any buffered input is
516 re-scanned according to the new I. EOF also terminates the last word.
517 """
518
519 def __init__(self, errors='strict'):
520 codecs.IncrementalEncoder.__init__(self, errors)
521 self.reset()
522
523 def __repr__(self):
524 return '<SID %x>' % id(self)
525
526 def reset(self):
527 self.i = 1
528 self.o = 1
529 self.buffer = bytearray()
530
531 def getstate(self):
532 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
533 return bytes(self.buffer), i*100 + o
534
535 def setstate(self, state):
536 buffer, io = state
537 self.buffer = bytearray(buffer)
538 i, o = divmod(io, 100)
539 self.i, self.o = i ^ 1, o ^ 1
540
541 def decode(self, input, final=False):
542 output = ''
543 for b in input:
544 if self.i == 0: # variable-length, terminated with period
545 if b == ord('.'):
546 if self.buffer:
547 output += self.process_word()
548 else:
549 self.buffer.append(b)
550 else: # fixed-length, terminate after self.i bytes
551 self.buffer.append(b)
552 if len(self.buffer) == self.i:
553 output += self.process_word()
554 if final and self.buffer: # EOF terminates the last word
555 output += self.process_word()
556 return output
557
558 def process_word(self):
559 output = ''
560 if self.buffer[0] == ord('i'):
561 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
562 elif self.buffer[0] == ord('o'):
563 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
564 else:
565 output = self.buffer.decode('ascii')
566 if len(output) < self.o:
567 output += '-'*self.o # pad out with hyphens
568 if self.o:
569 output = output[:self.o] # truncate to output length
570 output += '.'
571 self.buffer = bytearray()
572 return output
573
574class StatefulIncrementalDecoderTest(unittest.TestCase):
575 """
576 Make sure the StatefulIncrementalDecoder actually works.
577 """
578
579 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000580 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000581 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000582 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000583 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000584 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000585 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000586 # I=0, O=6 (variable-length input, fixed-length output)
587 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
588 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000589 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000590 # I=6, O=3 (fixed-length input > fixed-length output)
591 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
592 # I=0, then 3; O=29, then 15 (with longer output)
593 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
594 'a----------------------------.' +
595 'b----------------------------.' +
596 'cde--------------------------.' +
597 'abcdefghijabcde.' +
598 'a.b------------.' +
599 '.c.------------.' +
600 'd.e------------.' +
601 'k--------------.' +
602 'l--------------.' +
603 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000604 ]
605
606 def testDecoder(self):
607 # Try a few one-shot test cases.
608 for input, eof, output in self.test_cases:
609 d = StatefulIncrementalDecoder()
610 self.assertEquals(d.decode(input, eof), output)
611
612 # Also test an unfinished decode, followed by forcing EOF.
613 d = StatefulIncrementalDecoder()
614 self.assertEquals(d.decode(b'oiabcd'), '')
615 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000616
617class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000618
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000619 def setUp(self):
620 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
621 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
622
Guido van Rossumd0712812007-04-11 16:32:43 +0000623 def tearDown(self):
624 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000625
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000626 def testLineBuffering(self):
627 r = io.BytesIO()
628 b = io.BufferedWriter(r, 1000)
629 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
630 t.write("X")
631 self.assertEquals(r.getvalue(), b"") # No flush happened
632 t.write("Y\nZ")
633 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
634 t.write("A\rB")
635 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
636
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000637 def testEncodingErrorsReading(self):
638 # (1) default
639 b = io.BytesIO(b"abc\n\xff\n")
640 t = io.TextIOWrapper(b, encoding="ascii")
641 self.assertRaises(UnicodeError, t.read)
642 # (2) explicit strict
643 b = io.BytesIO(b"abc\n\xff\n")
644 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
645 self.assertRaises(UnicodeError, t.read)
646 # (3) ignore
647 b = io.BytesIO(b"abc\n\xff\n")
648 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
649 self.assertEquals(t.read(), "abc\n\n")
650 # (4) replace
651 b = io.BytesIO(b"abc\n\xff\n")
652 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
653 self.assertEquals(t.read(), "abc\n\ufffd\n")
654
655 def testEncodingErrorsWriting(self):
656 # (1) default
657 b = io.BytesIO()
658 t = io.TextIOWrapper(b, encoding="ascii")
659 self.assertRaises(UnicodeError, t.write, "\xff")
660 # (2) explicit strict
661 b = io.BytesIO()
662 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
663 self.assertRaises(UnicodeError, t.write, "\xff")
664 # (3) ignore
665 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000666 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
667 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000668 t.write("abc\xffdef\n")
669 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000670 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000671 # (4) replace
672 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000673 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
674 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000675 t.write("abc\xffdef\n")
676 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000677 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000678
Guido van Rossum8358db22007-08-18 21:39:55 +0000679 def testNewlinesInput(self):
680 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
681 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
682 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000683 (None, normalized.decode("ascii").splitlines(True)),
684 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000685 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
686 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
687 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
688 ]:
689 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000690 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000691 self.assertEquals(txt.readlines(), expected)
692 txt.seek(0)
693 self.assertEquals(txt.read(), "".join(expected))
694
695 def testNewlinesOutput(self):
696 testdict = {
697 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
698 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
699 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
700 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
701 }
702 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
703 for newline, expected in tests:
704 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000705 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000706 txt.write("AAA\nB")
707 txt.write("BB\nCCC\n")
708 txt.write("X\rY\r\nZ")
709 txt.flush()
710 self.assertEquals(buf.getvalue(), expected)
711
Guido van Rossum78892e42007-04-06 17:31:18 +0000712 def testNewlines(self):
713 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
714
715 tests = [
716 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000717 [ '', input_lines ],
718 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
719 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
720 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000721 ]
722
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000723 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000724
Guido van Rossum8358db22007-08-18 21:39:55 +0000725 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000726 # character in TextIOWrapper._pending_line.
727 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000728 # XXX: str.encode() should return bytes
729 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000730 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000731 for bufsize in range(1, 10):
732 for newline, exp_lines in tests:
733 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000734 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000735 encoding=encoding)
736 if do_reads:
737 got_lines = []
738 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000739 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000740 if c2 == '':
741 break
742 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000743 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000744 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000745 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000746
747 for got_line, exp_line in zip(got_lines, exp_lines):
748 self.assertEquals(got_line, exp_line)
749 self.assertEquals(len(got_lines), len(exp_lines))
750
Guido van Rossum8358db22007-08-18 21:39:55 +0000751 def testNewlinesInput(self):
752 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
753 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
754 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000755 (None, normalized.decode("ascii").splitlines(True)),
756 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000757 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
758 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
759 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
760 ]:
761 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000762 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000763 self.assertEquals(txt.readlines(), expected)
764 txt.seek(0)
765 self.assertEquals(txt.read(), "".join(expected))
766
767 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000768 data = "AAA\nBBB\rCCC\n"
769 data_lf = b"AAA\nBBB\rCCC\n"
770 data_cr = b"AAA\rBBB\rCCC\r"
771 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000772 save_linesep = os.linesep
773 try:
774 for os.linesep, newline, expected in [
775 ("\n", None, data_lf),
776 ("\r\n", None, data_crlf),
777 ("\n", "", data_lf),
778 ("\r\n", "", data_lf),
779 ("\n", "\n", data_lf),
780 ("\r\n", "\n", data_lf),
781 ("\n", "\r", data_cr),
782 ("\r\n", "\r", data_cr),
783 ("\n", "\r\n", data_crlf),
784 ("\r\n", "\r\n", data_crlf),
785 ]:
786 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000787 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000788 txt.write(data)
789 txt.close()
790 self.assertEquals(buf.getvalue(), expected)
791 finally:
792 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000793
Guido van Rossum9b76da62007-04-11 01:09:03 +0000794 # Systematic tests of the text I/O API
795
796 def testBasicIO(self):
797 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
798 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
799 f = io.open(test_support.TESTFN, "w+", encoding=enc)
800 f._CHUNK_SIZE = chunksize
801 self.assertEquals(f.write("abc"), 3)
802 f.close()
803 f = io.open(test_support.TESTFN, "r+", encoding=enc)
804 f._CHUNK_SIZE = chunksize
805 self.assertEquals(f.tell(), 0)
806 self.assertEquals(f.read(), "abc")
807 cookie = f.tell()
808 self.assertEquals(f.seek(0), 0)
809 self.assertEquals(f.read(2), "ab")
810 self.assertEquals(f.read(1), "c")
811 self.assertEquals(f.read(1), "")
812 self.assertEquals(f.read(), "")
813 self.assertEquals(f.tell(), cookie)
814 self.assertEquals(f.seek(0), 0)
815 self.assertEquals(f.seek(0, 2), cookie)
816 self.assertEquals(f.write("def"), 3)
817 self.assertEquals(f.seek(cookie), cookie)
818 self.assertEquals(f.read(), "def")
819 if enc.startswith("utf"):
820 self.multi_line_test(f, enc)
821 f.close()
822
823 def multi_line_test(self, f, enc):
824 f.seek(0)
825 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000826 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000827 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000828 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000829 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000830 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000831 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000832 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000833 wlines.append((f.tell(), line))
834 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000835 f.seek(0)
836 rlines = []
837 while True:
838 pos = f.tell()
839 line = f.readline()
840 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000841 break
842 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000843 self.assertEquals(rlines, wlines)
844
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000845 def testTelling(self):
846 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
847 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000848 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000849 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000850 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000851 p2 = f.tell()
852 f.seek(0)
853 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000854 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000855 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000856 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000857 self.assertEquals(f.tell(), p2)
858 f.seek(0)
859 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000860 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000861 self.assertRaises(IOError, f.tell)
862 self.assertEquals(f.tell(), p2)
863 f.close()
864
Guido van Rossumd76e7792007-04-17 02:38:04 +0000865 def testSeeking(self):
866 chunk_size = io.TextIOWrapper._CHUNK_SIZE
867 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000868 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000869 prefix = bytes(u_prefix.encode("utf-8"))
870 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000871 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000872 suffix = bytes(u_suffix.encode("utf-8"))
873 line = prefix + suffix
874 f = io.open(test_support.TESTFN, "wb")
875 f.write(line*2)
876 f.close()
877 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
878 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000879 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000880 self.assertEquals(f.tell(), prefix_size)
881 self.assertEquals(f.readline(), u_suffix)
882
883 def testSeekingToo(self):
884 # Regression test for a specific bug
885 data = b'\xe0\xbf\xbf\n'
886 f = io.open(test_support.TESTFN, "wb")
887 f.write(data)
888 f.close()
889 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
890 f._CHUNK_SIZE # Just test that it exists
891 f._CHUNK_SIZE = 2
892 f.readline()
893 f.tell()
894
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000895 def testSeekAndTell(self):
896 """Test seek/tell using the StatefulIncrementalDecoder."""
897
898 def lookupTestDecoder(name):
899 if self.codecEnabled and name == 'test_decoder':
900 return codecs.CodecInfo(
901 name='test_decoder', encode=None, decode=None,
902 incrementalencoder=None,
Neal Norwitze2b07052008-03-18 19:52:05 +0000903 streamreader=None, streamwriter=None,
904 incrementaldecoder=StatefulIncrementalDecoder)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000905
906 def testSeekAndTellWithData(data, min_pos=0):
907 """Tell/seek to various points within a data stream and ensure
908 that the decoded data returned by read() is consistent."""
909 f = io.open(test_support.TESTFN, 'wb')
910 f.write(data)
911 f.close()
912 f = io.open(test_support.TESTFN, encoding='test_decoder')
913 decoded = f.read()
914 f.close()
915
Neal Norwitze2b07052008-03-18 19:52:05 +0000916 for i in range(min_pos, len(decoded) + 1): # seek positions
917 for j in [1, 5, len(decoded) - i]: # read lengths
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000918 f = io.open(test_support.TESTFN, encoding='test_decoder')
919 self.assertEquals(f.read(i), decoded[:i])
920 cookie = f.tell()
921 self.assertEquals(f.read(j), decoded[i:i + j])
922 f.seek(cookie)
923 self.assertEquals(f.read(), decoded[i:])
924 f.close()
925
926 # Register a special incremental decoder for testing.
927 codecs.register(lookupTestDecoder)
928 self.codecEnabled = 1
929
930 # Run the tests.
931 try:
932 # Try each test case.
933 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
934 testSeekAndTellWithData(input)
935
936 # Position each test case so that it crosses a chunk boundary.
937 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
938 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
939 offset = CHUNK_SIZE - len(input)//2
940 prefix = b'.'*offset
941 # Don't bother seeking into the prefix (takes too long).
942 min_pos = offset*2
943 testSeekAndTellWithData(prefix + input, min_pos)
944
945 # Ensure our test decoder won't interfere with subsequent tests.
946 finally:
947 self.codecEnabled = 0
948
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +0000949 def testEncodedWrites(self):
950 data = "1234567890"
951 tests = ("utf-16",
952 "utf-16-le",
953 "utf-16-be",
954 "utf-32",
955 "utf-32-le",
956 "utf-32-be")
957 for encoding in tests:
958 buf = io.BytesIO()
959 f = io.TextIOWrapper(buf, encoding=encoding)
960 # Check if the BOM is written only once (see issue1753).
961 f.write(data)
962 f.write(data)
963 f.seek(0)
964 self.assertEquals(f.read(), data * 2)
965 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
966
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000967 def timingTest(self):
968 timer = time.time
969 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000970 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000971 nlines = 10000
972 nchars = len(line)
973 nbytes = len(line.encode(enc))
974 for chunk_size in (32, 64, 128, 256):
975 f = io.open(test_support.TESTFN, "w+", encoding=enc)
976 f._CHUNK_SIZE = chunk_size
977 t0 = timer()
978 for i in range(nlines):
979 f.write(line)
980 f.flush()
981 t1 = timer()
982 f.seek(0)
983 for line in f:
984 pass
985 t2 = timer()
986 f.seek(0)
987 while f.readline():
988 pass
989 t3 = timer()
990 f.seek(0)
991 while f.readline():
992 f.tell()
993 t4 = timer()
994 f.close()
995 if test_support.verbose:
996 print("\nTiming test: %d lines of %d characters (%d bytes)" %
997 (nlines, nchars, nbytes))
998 print("File chunk size: %6s" % f._CHUNK_SIZE)
999 print("Writing: %6.3f seconds" % (t1-t0))
1000 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1001 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1002 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1003
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001004 def testReadOneByOne(self):
1005 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1006 reads = ""
1007 while True:
1008 c = txt.read(1)
1009 if not c:
1010 break
1011 reads += c
1012 self.assertEquals(reads, "AA\nBB")
1013
1014 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1015 def testReadByChunk(self):
1016 # make sure "\r\n" straddles 128 char boundary.
1017 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1018 reads = ""
1019 while True:
1020 c = txt.read(128)
1021 if not c:
1022 break
1023 reads += c
1024 self.assertEquals(reads, "A"*127+"\nB")
1025
1026 def test_issue1395_1(self):
1027 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1028
1029 # read one char at a time
1030 reads = ""
1031 while True:
1032 c = txt.read(1)
1033 if not c:
1034 break
1035 reads += c
1036 self.assertEquals(reads, self.normalized)
1037
1038 def test_issue1395_2(self):
1039 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1040 txt._CHUNK_SIZE = 4
1041
1042 reads = ""
1043 while True:
1044 c = txt.read(4)
1045 if not c:
1046 break
1047 reads += c
1048 self.assertEquals(reads, self.normalized)
1049
1050 def test_issue1395_3(self):
1051 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1052 txt._CHUNK_SIZE = 4
1053
1054 reads = txt.read(4)
1055 reads += txt.read(4)
1056 reads += txt.readline()
1057 reads += txt.readline()
1058 reads += txt.readline()
1059 self.assertEquals(reads, self.normalized)
1060
1061 def test_issue1395_4(self):
1062 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1063 txt._CHUNK_SIZE = 4
1064
1065 reads = txt.read(4)
1066 reads += txt.read()
1067 self.assertEquals(reads, self.normalized)
1068
1069 def test_issue1395_5(self):
1070 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1071 txt._CHUNK_SIZE = 4
1072
1073 reads = txt.read(4)
1074 pos = txt.tell()
1075 txt.seek(0)
1076 txt.seek(pos)
1077 self.assertEquals(txt.read(4), "BBB\n")
1078
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001079 def test_issue2282(self):
1080 buffer = io.BytesIO(self.testdata)
1081 txt = io.TextIOWrapper(buffer, encoding="ascii")
1082
1083 self.assertEqual(buffer.seekable(), txt.seekable())
1084
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001085 def test_newline_decoder(self):
1086 import codecs
1087 decoder = codecs.getincrementaldecoder("utf-8")()
1088 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1089
1090 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1091
1092 self.assertEquals(decoder.decode(b'\xe8'), "")
1093 self.assertEquals(decoder.decode(b'\xa2'), "")
1094 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
1095
1096 self.assertEquals(decoder.decode(b'\xe8'), "")
1097 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1098
1099 decoder.setstate((b'', 0))
1100 self.assertEquals(decoder.decode(b'\n'), "\n")
1101 self.assertEquals(decoder.decode(b'\r'), "")
1102 self.assertEquals(decoder.decode(b'', final=True), "\n")
1103 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
1104
1105 self.assertEquals(decoder.decode(b'\r'), "")
1106 self.assertEquals(decoder.decode(b'a'), "\na")
1107
1108 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
1109 self.assertEquals(decoder.decode(b'\r'), "")
1110 self.assertEquals(decoder.decode(b'\r'), "\n")
1111 self.assertEquals(decoder.decode(b'\na'), "\na")
1112
1113 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
1114 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1115 self.assertEquals(decoder.decode(b'\n'), "\n")
1116 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
1117 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001118
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001119 decoder = codecs.getincrementaldecoder("utf-8")()
1120 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1121 self.assertEquals(decoder.newlines, None)
1122 decoder.decode(b"abc\n\r")
1123 self.assertEquals(decoder.newlines, '\n')
1124 decoder.decode(b"\nabc")
1125 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1126 decoder.decode(b"abc\r")
1127 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1128 decoder.decode(b"abc")
1129 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1130 decoder.decode(b"abc\r")
1131 decoder.reset()
1132 self.assertEquals(decoder.decode(b"abc"), "abc")
1133 self.assertEquals(decoder.newlines, None)
1134
Guido van Rossum01a27522007-03-07 01:00:12 +00001135# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001136
Guido van Rossum5abbf752007-08-27 17:39:33 +00001137class MiscIOTest(unittest.TestCase):
1138
1139 def testImport__all__(self):
1140 for name in io.__all__:
1141 obj = getattr(io, name, None)
1142 self.assert_(obj is not None, name)
1143 if name == "open":
1144 continue
1145 elif "error" in name.lower():
1146 self.assert_(issubclass(obj, Exception), name)
1147 else:
1148 self.assert_(issubclass(obj, io.IOBase))
1149
1150
Guido van Rossum28524c72007-02-27 05:47:44 +00001151def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +00001152 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1153 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +00001154 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +00001155 BufferedRandomTest, TextIOWrapperTest,
1156 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001157
1158if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001159 unittest.main()