blob: 49404e1cbae2452f9fb65111798c830db94b2111 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000011import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000012import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000013
Guido van Rossuma9e20242007-03-08 00:43:48 +000014
Guido van Rossum53807da2007-04-10 19:01:47 +000015class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000017 def __init__(self, read_stack=()):
18 self._read_stack = list(read_stack)
19 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000020
21 def read(self, n=None):
22 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000023 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000024 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000025 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026
Guido van Rossum01a27522007-03-07 01:00:12 +000027 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000028 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000029 return len(b)
30
31 def writable(self):
32 return True
33
Guido van Rossum68bbcd22007-02-27 17:19:33 +000034 def fileno(self):
35 return 42
36
37 def readable(self):
38 return True
39
Guido van Rossum01a27522007-03-07 01:00:12 +000040 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000041 return True
42
Guido van Rossum01a27522007-03-07 01:00:12 +000043 def seek(self, pos, whence):
44 pass
45
46 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000047 return 42
48
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum78892e42007-04-06 17:31:18 +000050class MockFileIO(io.BytesIO):
51
52 def __init__(self, data):
53 self.read_history = []
54 io.BytesIO.__init__(self, data)
55
56 def read(self, n=None):
57 res = io.BytesIO.read(self, n)
58 self.read_history.append(None if res is None else len(res))
59 return res
60
61
Guido van Rossum01a27522007-03-07 01:00:12 +000062class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000063
Guido van Rossum53807da2007-04-10 19:01:47 +000064 def __init__(self, blocking_script):
65 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000066 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000067
Guido van Rossum01a27522007-03-07 01:00:12 +000068 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000069 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000070 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000071 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000072 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 else:
74 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000075
Guido van Rossum01a27522007-03-07 01:00:12 +000076 def writable(self):
77 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000078
Guido van Rossuma9e20242007-03-08 00:43:48 +000079
Guido van Rossum28524c72007-02-27 05:47:44 +000080class IOTest(unittest.TestCase):
81
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000082 def tearDown(self):
83 test_support.unlink(test_support.TESTFN)
84
Guido van Rossum28524c72007-02-27 05:47:44 +000085 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000086 self.assertEqual(f.write(b"blah."), 5)
87 self.assertEqual(f.seek(0), 0)
88 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000089 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000090 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000091 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000092 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000093 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000094 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000095 self.assertEqual(f.seek(-1, 2), 13)
96 self.assertEqual(f.tell(), 13)
97 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000098 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +000099 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000100
Guido van Rossum9b76da62007-04-11 01:09:03 +0000101 def read_ops(self, f, buffered=False):
102 data = f.read(5)
103 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000104 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000105 self.assertEqual(f.readinto(data), 5)
106 self.assertEqual(data, b" worl")
107 self.assertEqual(f.readinto(data), 2)
108 self.assertEqual(len(data), 5)
109 self.assertEqual(data[:2], b"d\n")
110 self.assertEqual(f.seek(0), 0)
111 self.assertEqual(f.read(20), b"hello world\n")
112 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000113 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000114 self.assertEqual(f.seek(-6, 2), 6)
115 self.assertEqual(f.read(5), b"world")
116 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000117 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000118 self.assertEqual(f.seek(-6, 1), 5)
119 self.assertEqual(f.read(5), b" worl")
120 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000121 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000122 if buffered:
123 f.seek(0)
124 self.assertEqual(f.read(), b"hello world\n")
125 f.seek(6)
126 self.assertEqual(f.read(), b"world\n")
127 self.assertEqual(f.read(), b"")
128
Guido van Rossum34d69e52007-04-10 20:08:41 +0000129 LARGE = 2**31
130
Guido van Rossum53807da2007-04-10 19:01:47 +0000131 def large_file_ops(self, f):
132 assert f.readable()
133 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 self.assertEqual(f.seek(self.LARGE), self.LARGE)
135 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000136 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000137 self.assertEqual(f.tell(), self.LARGE + 3)
138 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000139 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000140 self.assertEqual(f.tell(), self.LARGE + 2)
141 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000142 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000143 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000144 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
145 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000146 self.assertEqual(f.read(2), b"x")
147
Guido van Rossum28524c72007-02-27 05:47:44 +0000148 def test_raw_file_io(self):
149 f = io.open(test_support.TESTFN, "wb", buffering=0)
150 self.assertEqual(f.readable(), False)
151 self.assertEqual(f.writable(), True)
152 self.assertEqual(f.seekable(), True)
153 self.write_ops(f)
154 f.close()
155 f = io.open(test_support.TESTFN, "rb", buffering=0)
156 self.assertEqual(f.readable(), True)
157 self.assertEqual(f.writable(), False)
158 self.assertEqual(f.seekable(), True)
159 self.read_ops(f)
160 f.close()
161
Guido van Rossum87429772007-04-10 21:06:59 +0000162 def test_buffered_file_io(self):
163 f = io.open(test_support.TESTFN, "wb")
164 self.assertEqual(f.readable(), False)
165 self.assertEqual(f.writable(), True)
166 self.assertEqual(f.seekable(), True)
167 self.write_ops(f)
168 f.close()
169 f = io.open(test_support.TESTFN, "rb")
170 self.assertEqual(f.readable(), True)
171 self.assertEqual(f.writable(), False)
172 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000173 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000174 f.close()
175
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000176 def test_readline(self):
177 f = io.open(test_support.TESTFN, "wb")
178 f.write(b"abc\ndef\nxyzzy\nfoo")
179 f.close()
180 f = io.open(test_support.TESTFN, "rb")
181 self.assertEqual(f.readline(), b"abc\n")
182 self.assertEqual(f.readline(10), b"def\n")
183 self.assertEqual(f.readline(2), b"xy")
184 self.assertEqual(f.readline(4), b"zzy\n")
185 self.assertEqual(f.readline(), b"foo")
186 f.close()
187
Guido van Rossum28524c72007-02-27 05:47:44 +0000188 def test_raw_bytes_io(self):
189 f = io.BytesIO()
190 self.write_ops(f)
191 data = f.getvalue()
192 self.assertEqual(data, b"hello world\n")
193 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000194 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000195
Guido van Rossum53807da2007-04-10 19:01:47 +0000196 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000197 # On Windows and Mac OSX this test comsumes large resources; It takes
198 # a long time to build the >2GB file and takes >2GB of disk space
199 # therefore the resource must be enabled to run this test.
200 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
201 if not test_support.is_resource_enabled("largefile"):
202 print("\nTesting large file ops skipped on %s." % sys.platform,
203 file=sys.stderr)
204 print("It requires %d bytes and a long time." % self.LARGE,
205 file=sys.stderr)
206 print("Use 'regrtest.py -u largefile test_io' to run it.",
207 file=sys.stderr)
208 return
Guido van Rossum87429772007-04-10 21:06:59 +0000209 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000210 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000211 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000212 f = io.open(test_support.TESTFN, "w+b")
213 self.large_file_ops(f)
214 f.close()
215
216 def test_with_open(self):
217 for bufsize in (0, 1, 100):
218 f = None
219 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000220 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000221 self.assertEqual(f.closed, True)
222 f = None
223 try:
224 with open(test_support.TESTFN, "wb", bufsize) as f:
225 1/0
226 except ZeroDivisionError:
227 self.assertEqual(f.closed, True)
228 else:
229 self.fail("1/0 didn't raise an exception")
230
231 def test_destructor(self):
232 record = []
233 class MyFileIO(io.FileIO):
234 def __del__(self):
235 record.append(1)
236 io.FileIO.__del__(self)
237 def close(self):
238 record.append(2)
239 io.FileIO.close(self)
240 def flush(self):
241 record.append(3)
242 io.FileIO.flush(self)
243 f = MyFileIO(test_support.TESTFN, "w")
244 f.write("xxx")
245 del f
246 self.assertEqual(record, [1, 2, 3])
247
248 def test_close_flushes(self):
249 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000250 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000251 f.close()
252 f = io.open(test_support.TESTFN, "rb")
253 self.assertEqual(f.read(), b"xxx")
254 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000255
Guido van Rossumd4103952007-04-12 05:44:49 +0000256 def test_array_writes(self):
257 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000258 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000259 f = io.open(test_support.TESTFN, "wb", 0)
260 self.assertEqual(f.write(a), n)
261 f.close()
262 f = io.open(test_support.TESTFN, "wb")
263 self.assertEqual(f.write(a), n)
264 f.close()
265
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000266 def test_closefd(self):
267 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
268 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000269
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000270class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000271
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000272 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000273 buf = self.buftype("1234567890")
274 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000275
276 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000277 buf = self.buftype("1234567890")
278 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000279
280 self.assertEquals(buf[:1], bytesIo.read(1))
281 self.assertEquals(buf[1:5], bytesIo.read(4))
282 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000283 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000284
285 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000286 buf = self.buftype("1234567890")
287 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288
289 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000290 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000291
292 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000293 buf = self.buftype("1234567890")
294 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000295
296 bytesIo.read(5)
297 bytesIo.seek(0)
298 self.assertEquals(buf, bytesIo.read())
299
300 bytesIo.seek(3)
301 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000302 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000303
304 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000305 buf = self.buftype("1234567890")
306 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000307
308 self.assertEquals(0, bytesIo.tell())
309 bytesIo.seek(5)
310 self.assertEquals(5, bytesIo.tell())
311 bytesIo.seek(10000)
312 self.assertEquals(10000, bytesIo.tell())
313
Guido van Rossuma9e20242007-03-08 00:43:48 +0000314
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000315class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000316 @staticmethod
317 def buftype(s):
318 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000319 ioclass = io.BytesIO
320 EOF = b""
321
322
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000323class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000324 buftype = str
325 ioclass = io.StringIO
326 EOF = ""
327
328
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000329class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000330
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000331 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000332 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000333 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000334
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000335 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000336
Guido van Rossum78892e42007-04-06 17:31:18 +0000337 def testBuffering(self):
338 data = b"abcdefghi"
339 dlen = len(data)
340
341 tests = [
342 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
343 [ 100, [ 3, 3, 3], [ dlen ] ],
344 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
345 ]
346
347 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000348 rawio = MockFileIO(data)
349 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000350 pos = 0
351 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000352 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000353 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000354 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000355
Guido van Rossum01a27522007-03-07 01:00:12 +0000356 def testReadNonBlocking(self):
357 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000358 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000359 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000360
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 self.assertEquals(b"abcd", bufio.read(6))
362 self.assertEquals(b"e", bufio.read(1))
363 self.assertEquals(b"fg", bufio.read())
364 self.assert_(None is bufio.read())
365 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000366
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000367 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000368 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000369 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000370
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000371 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000372
373 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000374 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000375 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000376
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000377 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000378
379 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000380 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000381 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000382
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000383 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000384
385 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000386 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000387 # this test. Else, write it.
388 pass
389
Guido van Rossuma9e20242007-03-08 00:43:48 +0000390
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000391class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000392
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000393 def testWrite(self):
394 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000396 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000397
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000398 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000399
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000400 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000401
402 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000403 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000404 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000405
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000406 bufio.write(b"abc")
407 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000408
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000410
Guido van Rossum01a27522007-03-07 01:00:12 +0000411 def testWriteNonBlocking(self):
412 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000413 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000414
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000415 bufio.write(b"asdf")
416 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000417 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
418
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000419 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000420 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000421 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000422 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
423 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
424
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000425 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000426
Guido van Rossum78892e42007-04-06 17:31:18 +0000427 # XXX I don't like this test. It relies too heavily on how the
428 # algorithm actually works, which we might change. Refactor
429 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000430
431 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000432 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000433 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000434
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000435 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000436
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000437 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000438 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000439 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000440
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000441 bufio.write(b"abc")
442 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000443
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000444 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000445
Guido van Rossuma9e20242007-03-08 00:43:48 +0000446
Guido van Rossum01a27522007-03-07 01:00:12 +0000447class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000448
Guido van Rossum01a27522007-03-07 01:00:12 +0000449 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000450 r = MockRawIO(())
451 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000452 pair = io.BufferedRWPair(r, w)
453
454 # XXX need implementation
455
Guido van Rossuma9e20242007-03-08 00:43:48 +0000456
Guido van Rossum78892e42007-04-06 17:31:18 +0000457class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000458
Guido van Rossum01a27522007-03-07 01:00:12 +0000459 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000460 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000461 rw = io.BufferedRandom(raw, 8, 12)
462
463 self.assertEqual(b"as", rw.read(2))
464 rw.write(b"ddd")
465 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000466 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000467 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000468 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000469
470 def testSeekAndTell(self):
471 raw = io.BytesIO(b"asdfghjkl")
472 rw = io.BufferedRandom(raw)
473
474 self.assertEquals(b"as", rw.read(2))
475 self.assertEquals(2, rw.tell())
476 rw.seek(0, 0)
477 self.assertEquals(b"asdf", rw.read(4))
478
479 rw.write(b"asdf")
480 rw.seek(0, 0)
481 self.assertEquals(b"asdfasdfl", rw.read())
482 self.assertEquals(9, rw.tell())
483 rw.seek(-4, 2)
484 self.assertEquals(5, rw.tell())
485 rw.seek(2, 1)
486 self.assertEquals(7, rw.tell())
487 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000488 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000489
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000490# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
491# properties:
492# - A single output character can correspond to many bytes of input.
493# - The number of input bytes to complete the character can be
494# undetermined until the last input byte is received.
495# - The number of input bytes can vary depending on previous input.
496# - A single input byte can correspond to many characters of output.
497# - The number of output characters can be undetermined until the
498# last input byte is received.
499# - The number of output characters can vary depending on previous input.
500
501class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
502 """
503 For testing seek/tell behavior with a stateful, buffering decoder.
504
505 Input is a sequence of words. Words may be fixed-length (length set
506 by input) or variable-length (period-terminated). In variable-length
507 mode, extra periods are ignored. Possible words are:
508 - 'i' followed by a number sets the input length, I (maximum 99).
509 When I is set to 0, words are space-terminated.
510 - 'o' followed by a number sets the output length, O (maximum 99).
511 - Any other word is converted into a word followed by a period on
512 the output. The output word consists of the input word truncated
513 or padded out with hyphens to make its length equal to O. If O
514 is 0, the word is output verbatim without truncating or padding.
515 I and O are initially set to 1. When I changes, any buffered input is
516 re-scanned according to the new I. EOF also terminates the last word.
517 """
518
519 def __init__(self, errors='strict'):
520 codecs.IncrementalEncoder.__init__(self, errors)
521 self.reset()
522
523 def __repr__(self):
524 return '<SID %x>' % id(self)
525
526 def reset(self):
527 self.i = 1
528 self.o = 1
529 self.buffer = bytearray()
530
531 def getstate(self):
532 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
533 return bytes(self.buffer), i*100 + o
534
535 def setstate(self, state):
536 buffer, io = state
537 self.buffer = bytearray(buffer)
538 i, o = divmod(io, 100)
539 self.i, self.o = i ^ 1, o ^ 1
540
541 def decode(self, input, final=False):
542 output = ''
543 for b in input:
544 if self.i == 0: # variable-length, terminated with period
545 if b == ord('.'):
546 if self.buffer:
547 output += self.process_word()
548 else:
549 self.buffer.append(b)
550 else: # fixed-length, terminate after self.i bytes
551 self.buffer.append(b)
552 if len(self.buffer) == self.i:
553 output += self.process_word()
554 if final and self.buffer: # EOF terminates the last word
555 output += self.process_word()
556 return output
557
558 def process_word(self):
559 output = ''
560 if self.buffer[0] == ord('i'):
561 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
562 elif self.buffer[0] == ord('o'):
563 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
564 else:
565 output = self.buffer.decode('ascii')
566 if len(output) < self.o:
567 output += '-'*self.o # pad out with hyphens
568 if self.o:
569 output = output[:self.o] # truncate to output length
570 output += '.'
571 self.buffer = bytearray()
572 return output
573
574class StatefulIncrementalDecoderTest(unittest.TestCase):
575 """
576 Make sure the StatefulIncrementalDecoder actually works.
577 """
578
579 test_cases = [
580 # I=1 fixed-length mode
581 (b'abcd', False, 'a.b.c.d.'),
582 # I=0, O=0, variable-length mode
583 (b'oiabcd', True, 'abcd.'),
584 # I=0, O=0, variable-length mode, should ignore extra periods
585 (b'oi...abcd...', True, 'abcd.'),
586 # I=0, O=6
587 (b'i.o6.xyz.', False, 'xyz---.'),
588 # I=2, O=6
589 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
590 # I=0, O=3
591 (b'i.o3.x.xyz.toolong.', False, 'x--.xyz.too.'),
592 # I=6, O=3
Neal Norwitze2b07052008-03-18 19:52:05 +0000593 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000594 ]
595
596 def testDecoder(self):
597 # Try a few one-shot test cases.
598 for input, eof, output in self.test_cases:
599 d = StatefulIncrementalDecoder()
600 self.assertEquals(d.decode(input, eof), output)
601
602 # Also test an unfinished decode, followed by forcing EOF.
603 d = StatefulIncrementalDecoder()
604 self.assertEquals(d.decode(b'oiabcd'), '')
605 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000606
607class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000608
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000609 def setUp(self):
610 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
611 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
612
Guido van Rossumd0712812007-04-11 16:32:43 +0000613 def tearDown(self):
614 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000615
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000616 def testLineBuffering(self):
617 r = io.BytesIO()
618 b = io.BufferedWriter(r, 1000)
619 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
620 t.write("X")
621 self.assertEquals(r.getvalue(), b"") # No flush happened
622 t.write("Y\nZ")
623 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
624 t.write("A\rB")
625 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
626
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000627 def testEncodingErrorsReading(self):
628 # (1) default
629 b = io.BytesIO(b"abc\n\xff\n")
630 t = io.TextIOWrapper(b, encoding="ascii")
631 self.assertRaises(UnicodeError, t.read)
632 # (2) explicit strict
633 b = io.BytesIO(b"abc\n\xff\n")
634 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
635 self.assertRaises(UnicodeError, t.read)
636 # (3) ignore
637 b = io.BytesIO(b"abc\n\xff\n")
638 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
639 self.assertEquals(t.read(), "abc\n\n")
640 # (4) replace
641 b = io.BytesIO(b"abc\n\xff\n")
642 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
643 self.assertEquals(t.read(), "abc\n\ufffd\n")
644
645 def testEncodingErrorsWriting(self):
646 # (1) default
647 b = io.BytesIO()
648 t = io.TextIOWrapper(b, encoding="ascii")
649 self.assertRaises(UnicodeError, t.write, "\xff")
650 # (2) explicit strict
651 b = io.BytesIO()
652 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
653 self.assertRaises(UnicodeError, t.write, "\xff")
654 # (3) ignore
655 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000656 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
657 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000658 t.write("abc\xffdef\n")
659 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000660 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000661 # (4) replace
662 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000663 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
664 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000665 t.write("abc\xffdef\n")
666 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000667 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000668
Guido van Rossum8358db22007-08-18 21:39:55 +0000669 def testNewlinesInput(self):
670 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
671 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
672 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000673 (None, normalized.decode("ascii").splitlines(True)),
674 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000675 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
676 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
677 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
678 ]:
679 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000680 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000681 self.assertEquals(txt.readlines(), expected)
682 txt.seek(0)
683 self.assertEquals(txt.read(), "".join(expected))
684
685 def testNewlinesOutput(self):
686 testdict = {
687 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
688 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
689 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
690 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
691 }
692 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
693 for newline, expected in tests:
694 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000695 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000696 txt.write("AAA\nB")
697 txt.write("BB\nCCC\n")
698 txt.write("X\rY\r\nZ")
699 txt.flush()
700 self.assertEquals(buf.getvalue(), expected)
701
Guido van Rossum78892e42007-04-06 17:31:18 +0000702 def testNewlines(self):
703 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
704
705 tests = [
706 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000707 [ '', input_lines ],
708 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
709 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
710 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000711 ]
712
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000713 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000714
Guido van Rossum8358db22007-08-18 21:39:55 +0000715 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000716 # character in TextIOWrapper._pending_line.
717 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000718 # XXX: str.encode() should return bytes
719 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000720 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000721 for bufsize in range(1, 10):
722 for newline, exp_lines in tests:
723 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000724 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000725 encoding=encoding)
726 if do_reads:
727 got_lines = []
728 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000729 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000730 if c2 == '':
731 break
732 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000733 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000734 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000735 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000736
737 for got_line, exp_line in zip(got_lines, exp_lines):
738 self.assertEquals(got_line, exp_line)
739 self.assertEquals(len(got_lines), len(exp_lines))
740
Guido van Rossum8358db22007-08-18 21:39:55 +0000741 def testNewlinesInput(self):
742 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
743 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
744 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000745 (None, normalized.decode("ascii").splitlines(True)),
746 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000747 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
748 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
749 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
750 ]:
751 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000752 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000753 self.assertEquals(txt.readlines(), expected)
754 txt.seek(0)
755 self.assertEquals(txt.read(), "".join(expected))
756
757 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000758 data = "AAA\nBBB\rCCC\n"
759 data_lf = b"AAA\nBBB\rCCC\n"
760 data_cr = b"AAA\rBBB\rCCC\r"
761 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000762 save_linesep = os.linesep
763 try:
764 for os.linesep, newline, expected in [
765 ("\n", None, data_lf),
766 ("\r\n", None, data_crlf),
767 ("\n", "", data_lf),
768 ("\r\n", "", data_lf),
769 ("\n", "\n", data_lf),
770 ("\r\n", "\n", data_lf),
771 ("\n", "\r", data_cr),
772 ("\r\n", "\r", data_cr),
773 ("\n", "\r\n", data_crlf),
774 ("\r\n", "\r\n", data_crlf),
775 ]:
776 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000777 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000778 txt.write(data)
779 txt.close()
780 self.assertEquals(buf.getvalue(), expected)
781 finally:
782 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000783
Guido van Rossum9b76da62007-04-11 01:09:03 +0000784 # Systematic tests of the text I/O API
785
786 def testBasicIO(self):
787 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
788 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
789 f = io.open(test_support.TESTFN, "w+", encoding=enc)
790 f._CHUNK_SIZE = chunksize
791 self.assertEquals(f.write("abc"), 3)
792 f.close()
793 f = io.open(test_support.TESTFN, "r+", encoding=enc)
794 f._CHUNK_SIZE = chunksize
795 self.assertEquals(f.tell(), 0)
796 self.assertEquals(f.read(), "abc")
797 cookie = f.tell()
798 self.assertEquals(f.seek(0), 0)
799 self.assertEquals(f.read(2), "ab")
800 self.assertEquals(f.read(1), "c")
801 self.assertEquals(f.read(1), "")
802 self.assertEquals(f.read(), "")
803 self.assertEquals(f.tell(), cookie)
804 self.assertEquals(f.seek(0), 0)
805 self.assertEquals(f.seek(0, 2), cookie)
806 self.assertEquals(f.write("def"), 3)
807 self.assertEquals(f.seek(cookie), cookie)
808 self.assertEquals(f.read(), "def")
809 if enc.startswith("utf"):
810 self.multi_line_test(f, enc)
811 f.close()
812
813 def multi_line_test(self, f, enc):
814 f.seek(0)
815 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000816 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000817 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000818 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000819 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000820 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000821 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000822 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000823 wlines.append((f.tell(), line))
824 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000825 f.seek(0)
826 rlines = []
827 while True:
828 pos = f.tell()
829 line = f.readline()
830 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000831 break
832 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000833 self.assertEquals(rlines, wlines)
834
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000835 def testTelling(self):
836 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
837 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000838 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000839 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000840 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000841 p2 = f.tell()
842 f.seek(0)
843 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000844 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000845 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000846 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000847 self.assertEquals(f.tell(), p2)
848 f.seek(0)
849 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000850 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000851 self.assertRaises(IOError, f.tell)
852 self.assertEquals(f.tell(), p2)
853 f.close()
854
Guido van Rossumd76e7792007-04-17 02:38:04 +0000855 def testSeeking(self):
856 chunk_size = io.TextIOWrapper._CHUNK_SIZE
857 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000858 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000859 prefix = bytes(u_prefix.encode("utf-8"))
860 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000861 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000862 suffix = bytes(u_suffix.encode("utf-8"))
863 line = prefix + suffix
864 f = io.open(test_support.TESTFN, "wb")
865 f.write(line*2)
866 f.close()
867 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
868 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000869 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000870 self.assertEquals(f.tell(), prefix_size)
871 self.assertEquals(f.readline(), u_suffix)
872
873 def testSeekingToo(self):
874 # Regression test for a specific bug
875 data = b'\xe0\xbf\xbf\n'
876 f = io.open(test_support.TESTFN, "wb")
877 f.write(data)
878 f.close()
879 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
880 f._CHUNK_SIZE # Just test that it exists
881 f._CHUNK_SIZE = 2
882 f.readline()
883 f.tell()
884
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000885 def testSeekAndTell(self):
886 """Test seek/tell using the StatefulIncrementalDecoder."""
887
888 def lookupTestDecoder(name):
889 if self.codecEnabled and name == 'test_decoder':
890 return codecs.CodecInfo(
891 name='test_decoder', encode=None, decode=None,
892 incrementalencoder=None,
Neal Norwitze2b07052008-03-18 19:52:05 +0000893 streamreader=None, streamwriter=None,
894 incrementaldecoder=StatefulIncrementalDecoder)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000895
896 def testSeekAndTellWithData(data, min_pos=0):
897 """Tell/seek to various points within a data stream and ensure
898 that the decoded data returned by read() is consistent."""
899 f = io.open(test_support.TESTFN, 'wb')
900 f.write(data)
901 f.close()
902 f = io.open(test_support.TESTFN, encoding='test_decoder')
903 decoded = f.read()
904 f.close()
905
Neal Norwitze2b07052008-03-18 19:52:05 +0000906 for i in range(min_pos, len(decoded) + 1): # seek positions
907 for j in [1, 5, len(decoded) - i]: # read lengths
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000908 f = io.open(test_support.TESTFN, encoding='test_decoder')
909 self.assertEquals(f.read(i), decoded[:i])
910 cookie = f.tell()
911 self.assertEquals(f.read(j), decoded[i:i + j])
912 f.seek(cookie)
913 self.assertEquals(f.read(), decoded[i:])
914 f.close()
915
916 # Register a special incremental decoder for testing.
917 codecs.register(lookupTestDecoder)
918 self.codecEnabled = 1
919
920 # Run the tests.
921 try:
922 # Try each test case.
923 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
924 testSeekAndTellWithData(input)
925
926 # Position each test case so that it crosses a chunk boundary.
927 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
928 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
929 offset = CHUNK_SIZE - len(input)//2
930 prefix = b'.'*offset
931 # Don't bother seeking into the prefix (takes too long).
932 min_pos = offset*2
933 testSeekAndTellWithData(prefix + input, min_pos)
934
935 # Ensure our test decoder won't interfere with subsequent tests.
936 finally:
937 self.codecEnabled = 0
938
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +0000939 def testEncodedWrites(self):
940 data = "1234567890"
941 tests = ("utf-16",
942 "utf-16-le",
943 "utf-16-be",
944 "utf-32",
945 "utf-32-le",
946 "utf-32-be")
947 for encoding in tests:
948 buf = io.BytesIO()
949 f = io.TextIOWrapper(buf, encoding=encoding)
950 # Check if the BOM is written only once (see issue1753).
951 f.write(data)
952 f.write(data)
953 f.seek(0)
954 self.assertEquals(f.read(), data * 2)
955 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
956
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000957 def timingTest(self):
958 timer = time.time
959 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000960 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000961 nlines = 10000
962 nchars = len(line)
963 nbytes = len(line.encode(enc))
964 for chunk_size in (32, 64, 128, 256):
965 f = io.open(test_support.TESTFN, "w+", encoding=enc)
966 f._CHUNK_SIZE = chunk_size
967 t0 = timer()
968 for i in range(nlines):
969 f.write(line)
970 f.flush()
971 t1 = timer()
972 f.seek(0)
973 for line in f:
974 pass
975 t2 = timer()
976 f.seek(0)
977 while f.readline():
978 pass
979 t3 = timer()
980 f.seek(0)
981 while f.readline():
982 f.tell()
983 t4 = timer()
984 f.close()
985 if test_support.verbose:
986 print("\nTiming test: %d lines of %d characters (%d bytes)" %
987 (nlines, nchars, nbytes))
988 print("File chunk size: %6s" % f._CHUNK_SIZE)
989 print("Writing: %6.3f seconds" % (t1-t0))
990 print("Reading using iteration: %6.3f seconds" % (t2-t1))
991 print("Reading using readline(): %6.3f seconds" % (t3-t2))
992 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
993
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000994 def testReadOneByOne(self):
995 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
996 reads = ""
997 while True:
998 c = txt.read(1)
999 if not c:
1000 break
1001 reads += c
1002 self.assertEquals(reads, "AA\nBB")
1003
1004 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1005 def testReadByChunk(self):
1006 # make sure "\r\n" straddles 128 char boundary.
1007 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1008 reads = ""
1009 while True:
1010 c = txt.read(128)
1011 if not c:
1012 break
1013 reads += c
1014 self.assertEquals(reads, "A"*127+"\nB")
1015
1016 def test_issue1395_1(self):
1017 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1018
1019 # read one char at a time
1020 reads = ""
1021 while True:
1022 c = txt.read(1)
1023 if not c:
1024 break
1025 reads += c
1026 self.assertEquals(reads, self.normalized)
1027
1028 def test_issue1395_2(self):
1029 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1030 txt._CHUNK_SIZE = 4
1031
1032 reads = ""
1033 while True:
1034 c = txt.read(4)
1035 if not c:
1036 break
1037 reads += c
1038 self.assertEquals(reads, self.normalized)
1039
1040 def test_issue1395_3(self):
1041 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1042 txt._CHUNK_SIZE = 4
1043
1044 reads = txt.read(4)
1045 reads += txt.read(4)
1046 reads += txt.readline()
1047 reads += txt.readline()
1048 reads += txt.readline()
1049 self.assertEquals(reads, self.normalized)
1050
1051 def test_issue1395_4(self):
1052 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1053 txt._CHUNK_SIZE = 4
1054
1055 reads = txt.read(4)
1056 reads += txt.read()
1057 self.assertEquals(reads, self.normalized)
1058
1059 def test_issue1395_5(self):
1060 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1061 txt._CHUNK_SIZE = 4
1062
1063 reads = txt.read(4)
1064 pos = txt.tell()
1065 txt.seek(0)
1066 txt.seek(pos)
1067 self.assertEquals(txt.read(4), "BBB\n")
1068
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001069 def test_issue2282(self):
1070 buffer = io.BytesIO(self.testdata)
1071 txt = io.TextIOWrapper(buffer, encoding="ascii")
1072
1073 self.assertEqual(buffer.seekable(), txt.seekable())
1074
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001075 def test_newline_decoder(self):
1076 import codecs
1077 decoder = codecs.getincrementaldecoder("utf-8")()
1078 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1079
1080 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1081
1082 self.assertEquals(decoder.decode(b'\xe8'), "")
1083 self.assertEquals(decoder.decode(b'\xa2'), "")
1084 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
1085
1086 self.assertEquals(decoder.decode(b'\xe8'), "")
1087 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1088
1089 decoder.setstate((b'', 0))
1090 self.assertEquals(decoder.decode(b'\n'), "\n")
1091 self.assertEquals(decoder.decode(b'\r'), "")
1092 self.assertEquals(decoder.decode(b'', final=True), "\n")
1093 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
1094
1095 self.assertEquals(decoder.decode(b'\r'), "")
1096 self.assertEquals(decoder.decode(b'a'), "\na")
1097
1098 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
1099 self.assertEquals(decoder.decode(b'\r'), "")
1100 self.assertEquals(decoder.decode(b'\r'), "\n")
1101 self.assertEquals(decoder.decode(b'\na'), "\na")
1102
1103 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
1104 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1105 self.assertEquals(decoder.decode(b'\n'), "\n")
1106 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
1107 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001108
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001109 decoder = codecs.getincrementaldecoder("utf-8")()
1110 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1111 self.assertEquals(decoder.newlines, None)
1112 decoder.decode(b"abc\n\r")
1113 self.assertEquals(decoder.newlines, '\n')
1114 decoder.decode(b"\nabc")
1115 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1116 decoder.decode(b"abc\r")
1117 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1118 decoder.decode(b"abc")
1119 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1120 decoder.decode(b"abc\r")
1121 decoder.reset()
1122 self.assertEquals(decoder.decode(b"abc"), "abc")
1123 self.assertEquals(decoder.newlines, None)
1124
Guido van Rossum01a27522007-03-07 01:00:12 +00001125# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001126
Guido van Rossum5abbf752007-08-27 17:39:33 +00001127class MiscIOTest(unittest.TestCase):
1128
1129 def testImport__all__(self):
1130 for name in io.__all__:
1131 obj = getattr(io, name, None)
1132 self.assert_(obj is not None, name)
1133 if name == "open":
1134 continue
1135 elif "error" in name.lower():
1136 self.assert_(issubclass(obj, Exception), name)
1137 else:
1138 self.assert_(issubclass(obj, io.IOBase))
1139
1140
Guido van Rossum28524c72007-02-27 05:47:44 +00001141def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +00001142 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
1143 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +00001144 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +00001145 BufferedRandomTest, TextIOWrapperTest,
1146 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001147
1148if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001149 unittest.main()