blob: 83d363dbd06d22cd4f24c8814eb7a5b1665339ec [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Antoine Pitrou87695762008-08-14 22:44:29 +00007import threading
8import random
Guido van Rossum28524c72007-02-27 05:47:44 +00009import unittest
Antoine Pitrou87695762008-08-14 22:44:29 +000010from itertools import chain, cycle
Benjamin Petersonee8712c2008-05-20 21:35:26 +000011from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000012
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000013import codecs
Guido van Rossum53807da2007-04-10 19:01:47 +000014import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000015
Guido van Rossuma9e20242007-03-08 00:43:48 +000016
Guido van Rossum53807da2007-04-10 19:01:47 +000017class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000018
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000019 def __init__(self, read_stack=()):
20 self._read_stack = list(read_stack)
21 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000022
23 def read(self, n=None):
24 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000025 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000027 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000028
Guido van Rossum01a27522007-03-07 01:00:12 +000029 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000030 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000031 return len(b)
32
33 def writable(self):
34 return True
35
Guido van Rossum68bbcd22007-02-27 17:19:33 +000036 def fileno(self):
37 return 42
38
39 def readable(self):
40 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000043 return True
44
Guido van Rossum01a27522007-03-07 01:00:12 +000045 def seek(self, pos, whence):
46 pass
47
48 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000049 return 42
50
Guido van Rossuma9e20242007-03-08 00:43:48 +000051
Guido van Rossum78892e42007-04-06 17:31:18 +000052class MockFileIO(io.BytesIO):
53
54 def __init__(self, data):
55 self.read_history = []
56 io.BytesIO.__init__(self, data)
57
58 def read(self, n=None):
59 res = io.BytesIO.read(self, n)
60 self.read_history.append(None if res is None else len(res))
61 return res
62
63
Guido van Rossum01a27522007-03-07 01:00:12 +000064class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000065
Guido van Rossum53807da2007-04-10 19:01:47 +000066 def __init__(self, blocking_script):
67 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000068 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000069
Guido van Rossum01a27522007-03-07 01:00:12 +000070 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000071 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000072 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000073 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000074 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000075 else:
76 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000077
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def writable(self):
79 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000080
Guido van Rossuma9e20242007-03-08 00:43:48 +000081
Guido van Rossum28524c72007-02-27 05:47:44 +000082class IOTest(unittest.TestCase):
83
Neal Norwitze7789b12008-03-24 06:18:09 +000084 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000085 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +000086
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000087 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +000088 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000089
Guido van Rossum28524c72007-02-27 05:47:44 +000090 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000091 self.assertEqual(f.write(b"blah."), 5)
92 self.assertEqual(f.seek(0), 0)
93 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000094 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000095 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000096 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +000097 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000098 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000099 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000100 self.assertEqual(f.seek(-1, 2), 13)
101 self.assertEqual(f.tell(), 13)
102 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000103 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000104 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000105
Guido van Rossum9b76da62007-04-11 01:09:03 +0000106 def read_ops(self, f, buffered=False):
107 data = f.read(5)
108 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000109 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000110 self.assertEqual(f.readinto(data), 5)
111 self.assertEqual(data, b" worl")
112 self.assertEqual(f.readinto(data), 2)
113 self.assertEqual(len(data), 5)
114 self.assertEqual(data[:2], b"d\n")
115 self.assertEqual(f.seek(0), 0)
116 self.assertEqual(f.read(20), b"hello world\n")
117 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000118 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000119 self.assertEqual(f.seek(-6, 2), 6)
120 self.assertEqual(f.read(5), b"world")
121 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000122 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000123 self.assertEqual(f.seek(-6, 1), 5)
124 self.assertEqual(f.read(5), b" worl")
125 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000126 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000127 if buffered:
128 f.seek(0)
129 self.assertEqual(f.read(), b"hello world\n")
130 f.seek(6)
131 self.assertEqual(f.read(), b"world\n")
132 self.assertEqual(f.read(), b"")
133
Guido van Rossum34d69e52007-04-10 20:08:41 +0000134 LARGE = 2**31
135
Guido van Rossum53807da2007-04-10 19:01:47 +0000136 def large_file_ops(self, f):
137 assert f.readable()
138 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.seek(self.LARGE), self.LARGE)
140 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000141 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000142 self.assertEqual(f.tell(), self.LARGE + 3)
143 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000144 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000145 self.assertEqual(f.tell(), self.LARGE + 2)
146 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000147 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000148 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000149 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
150 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000151 self.assertEqual(f.read(2), b"x")
152
Guido van Rossum28524c72007-02-27 05:47:44 +0000153 def test_raw_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000154 f = io.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000155 self.assertEqual(f.readable(), False)
156 self.assertEqual(f.writable(), True)
157 self.assertEqual(f.seekable(), True)
158 self.write_ops(f)
159 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000160 f = io.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000161 self.assertEqual(f.readable(), True)
162 self.assertEqual(f.writable(), False)
163 self.assertEqual(f.seekable(), True)
164 self.read_ops(f)
165 f.close()
166
Guido van Rossum87429772007-04-10 21:06:59 +0000167 def test_buffered_file_io(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000168 f = io.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000169 self.assertEqual(f.readable(), False)
170 self.assertEqual(f.writable(), True)
171 self.assertEqual(f.seekable(), True)
172 self.write_ops(f)
173 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000174 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000175 self.assertEqual(f.readable(), True)
176 self.assertEqual(f.writable(), False)
177 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000178 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000179 f.close()
180
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000181 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000182 f = io.open(support.TESTFN, "wb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000183 f.write(b"abc\ndef\nxyzzy\nfoo")
184 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000185 f = io.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000186 self.assertEqual(f.readline(), b"abc\n")
187 self.assertEqual(f.readline(10), b"def\n")
188 self.assertEqual(f.readline(2), b"xy")
189 self.assertEqual(f.readline(4), b"zzy\n")
190 self.assertEqual(f.readline(), b"foo")
191 f.close()
192
Guido van Rossum28524c72007-02-27 05:47:44 +0000193 def test_raw_bytes_io(self):
194 f = io.BytesIO()
195 self.write_ops(f)
196 data = f.getvalue()
197 self.assertEqual(data, b"hello world\n")
198 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000199 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000200
Guido van Rossum53807da2007-04-10 19:01:47 +0000201 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000202 # On Windows and Mac OSX this test comsumes large resources; It takes
203 # a long time to build the >2GB file and takes >2GB of disk space
204 # therefore the resource must be enabled to run this test.
205 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000206 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000207 print("\nTesting large file ops skipped on %s." % sys.platform,
208 file=sys.stderr)
209 print("It requires %d bytes and a long time." % self.LARGE,
210 file=sys.stderr)
211 print("Use 'regrtest.py -u largefile test_io' to run it.",
212 file=sys.stderr)
213 return
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000214 f = io.open(support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000215 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000216 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000217 f = io.open(support.TESTFN, "w+b")
Guido van Rossum87429772007-04-10 21:06:59 +0000218 self.large_file_ops(f)
219 f.close()
220
221 def test_with_open(self):
222 for bufsize in (0, 1, 100):
223 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000224 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000225 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000226 self.assertEqual(f.closed, True)
227 f = None
228 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000229 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000230 1/0
231 except ZeroDivisionError:
232 self.assertEqual(f.closed, True)
233 else:
234 self.fail("1/0 didn't raise an exception")
235
236 def test_destructor(self):
237 record = []
238 class MyFileIO(io.FileIO):
239 def __del__(self):
240 record.append(1)
241 io.FileIO.__del__(self)
242 def close(self):
243 record.append(2)
244 io.FileIO.close(self)
245 def flush(self):
246 record.append(3)
247 io.FileIO.flush(self)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000248 f = MyFileIO(support.TESTFN, "w")
Guido van Rossum87429772007-04-10 21:06:59 +0000249 f.write("xxx")
250 del f
251 self.assertEqual(record, [1, 2, 3])
252
253 def test_close_flushes(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000254 f = io.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000255 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000256 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000257 f = io.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000258 self.assertEqual(f.read(), b"xxx")
259 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000260
Guido van Rossumd4103952007-04-12 05:44:49 +0000261 def test_array_writes(self):
262 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000263 n = len(memoryview(a))
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000264 f = io.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000265 self.assertEqual(f.write(a), n)
266 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000267 f = io.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000268 self.assertEqual(f.write(a), n)
269 f.close()
270
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000271 def test_closefd(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000272 self.assertRaises(ValueError, io.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000273 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Christian Heimesecc42a22008-11-05 19:30:32 +0000275 def testReadClosed(self):
276 with io.open(support.TESTFN, "w") as f:
277 f.write("egg\n")
278 with io.open(support.TESTFN, "r") as f:
279 file = io.open(f.fileno(), "r", closefd=False)
280 self.assertEqual(file.read(), "egg\n")
281 file.seek(0)
282 file.close()
283 self.assertRaises(ValueError, file.read)
284
285 def test_no_closefd_with_filename(self):
286 # can't use closefd in combination with a file name
287 self.assertRaises(ValueError, io.open, support.TESTFN, "r", closefd=False)
288
289 def test_closefd_attr(self):
290 with io.open(support.TESTFN, "wb") as f:
291 f.write(b"egg\n")
292 with io.open(support.TESTFN, "r") as f:
293 self.assertEqual(f.buffer.raw.closefd, True)
294 file = io.open(f.fileno(), "r", closefd=False)
295 self.assertEqual(file.buffer.raw.closefd, False)
296
297
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000298class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000299
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000300 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000301 buf = self.buftype("1234567890")
302 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000303
304 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000305 buf = self.buftype("1234567890")
306 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000307
308 self.assertEquals(buf[:1], bytesIo.read(1))
309 self.assertEquals(buf[1:5], bytesIo.read(4))
310 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000311 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000312
313 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000314 buf = self.buftype("1234567890")
315 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000316
317 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000318 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000319
320 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000321 buf = self.buftype("1234567890")
322 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000323
324 bytesIo.read(5)
325 bytesIo.seek(0)
326 self.assertEquals(buf, bytesIo.read())
327
328 bytesIo.seek(3)
329 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000330 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000331
332 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000333 buf = self.buftype("1234567890")
334 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000335
336 self.assertEquals(0, bytesIo.tell())
337 bytesIo.seek(5)
338 self.assertEquals(5, bytesIo.tell())
339 bytesIo.seek(10000)
340 self.assertEquals(10000, bytesIo.tell())
341
Guido van Rossuma9e20242007-03-08 00:43:48 +0000342
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000343class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000344 @staticmethod
345 def buftype(s):
346 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000347 ioclass = io.BytesIO
348 EOF = b""
349
350
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000352 buftype = str
353 ioclass = io.StringIO
354 EOF = ""
355
356
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000357class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000358
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000359 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000360 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000361 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000362
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000363 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000364
Guido van Rossum78892e42007-04-06 17:31:18 +0000365 def testBuffering(self):
366 data = b"abcdefghi"
367 dlen = len(data)
368
369 tests = [
370 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
371 [ 100, [ 3, 3, 3], [ dlen ] ],
372 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
373 ]
374
375 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000376 rawio = MockFileIO(data)
377 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000378 pos = 0
379 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000381 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000382 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000383
Guido van Rossum01a27522007-03-07 01:00:12 +0000384 def testReadNonBlocking(self):
385 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000386 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000387 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000388
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000389 self.assertEquals(b"abcd", bufio.read(6))
390 self.assertEquals(b"e", bufio.read(1))
391 self.assertEquals(b"fg", bufio.read())
392 self.assert_(None is bufio.read())
393 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000394
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000395 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000396 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000397 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000398
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
401 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000406
407 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000408 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000409 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000410
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000411 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000412
413 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000414 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000415 # this test. Else, write it.
416 pass
417
Antoine Pitrou87695762008-08-14 22:44:29 +0000418 def testThreads(self):
419 try:
420 # Write out many bytes with exactly the same number of 0's,
421 # 1's... 255's. This will help us check that concurrent reading
422 # doesn't duplicate or forget contents.
423 N = 1000
424 l = list(range(256)) * N
425 random.shuffle(l)
426 s = bytes(bytearray(l))
427 with io.open(support.TESTFN, "wb") as f:
428 f.write(s)
429 with io.open(support.TESTFN, "rb", buffering=0) as raw:
430 bufio = io.BufferedReader(raw, 8)
431 errors = []
432 results = []
433 def f():
434 try:
435 # Intra-buffer read then buffer-flushing read
436 for n in cycle([1, 19]):
437 s = bufio.read(n)
438 if not s:
439 break
440 # list.append() is atomic
441 results.append(s)
442 except Exception as e:
443 errors.append(e)
444 raise
445 threads = [threading.Thread(target=f) for x in range(20)]
446 for t in threads:
447 t.start()
448 time.sleep(0.02) # yield
449 for t in threads:
450 t.join()
451 self.assertFalse(errors,
452 "the following exceptions were caught: %r" % errors)
453 s = b''.join(results)
454 for i in range(256):
455 c = bytes(bytearray([i]))
456 self.assertEqual(s.count(c), N)
457 finally:
458 support.unlink(support.TESTFN)
459
460
Guido van Rossuma9e20242007-03-08 00:43:48 +0000461
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000462class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000463
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000464 def testWrite(self):
465 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000466 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000467 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000468
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000469 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000470
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000471 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000472
473 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000474 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000475 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000476
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000477 bufio.write(b"abc")
478 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000479
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000480 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000481
Guido van Rossum01a27522007-03-07 01:00:12 +0000482 def testWriteNonBlocking(self):
483 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000484 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000485
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000486 bufio.write(b"asdf")
487 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000488 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
489
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000490 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000491 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000492 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000493 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
494 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
495
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000496 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000497
Guido van Rossum78892e42007-04-06 17:31:18 +0000498 # XXX I don't like this test. It relies too heavily on how the
499 # algorithm actually works, which we might change. Refactor
500 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000501
502 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000503 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000504 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000505
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000506 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000507
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000508 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000509 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000510 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000511
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000512 bufio.write(b"abc")
513 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000514
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000515 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000516
Antoine Pitrou87695762008-08-14 22:44:29 +0000517 def testThreads(self):
518 # BufferedWriter should not raise exceptions or crash
519 # when called from multiple threads.
520 try:
521 # We use a real file object because it allows us to
522 # exercise situations where the GIL is released before
523 # writing the buffer to the raw streams. This is in addition
524 # to concurrency issues due to switching threads in the middle
525 # of Python code.
526 with io.open(support.TESTFN, "wb", buffering=0) as raw:
527 bufio = io.BufferedWriter(raw, 8)
528 errors = []
529 def f():
530 try:
531 # Write enough bytes to flush the buffer
532 s = b"a" * 19
533 for i in range(50):
534 bufio.write(s)
535 except Exception as e:
536 errors.append(e)
537 raise
538 threads = [threading.Thread(target=f) for x in range(20)]
539 for t in threads:
540 t.start()
541 time.sleep(0.02) # yield
542 for t in threads:
543 t.join()
544 self.assertFalse(errors,
545 "the following exceptions were caught: %r" % errors)
546 finally:
547 support.unlink(support.TESTFN)
548
Guido van Rossuma9e20242007-03-08 00:43:48 +0000549
Guido van Rossum01a27522007-03-07 01:00:12 +0000550class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000551
Guido van Rossum01a27522007-03-07 01:00:12 +0000552 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000553 r = MockRawIO(())
554 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000555 pair = io.BufferedRWPair(r, w)
556
557 # XXX need implementation
558
Guido van Rossuma9e20242007-03-08 00:43:48 +0000559
Guido van Rossum78892e42007-04-06 17:31:18 +0000560class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000561
Guido van Rossum01a27522007-03-07 01:00:12 +0000562 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000563 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000564 rw = io.BufferedRandom(raw, 8, 12)
565
566 self.assertEqual(b"as", rw.read(2))
567 rw.write(b"ddd")
568 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000569 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000570 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000571 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000572
573 def testSeekAndTell(self):
574 raw = io.BytesIO(b"asdfghjkl")
575 rw = io.BufferedRandom(raw)
576
577 self.assertEquals(b"as", rw.read(2))
578 self.assertEquals(2, rw.tell())
579 rw.seek(0, 0)
580 self.assertEquals(b"asdf", rw.read(4))
581
582 rw.write(b"asdf")
583 rw.seek(0, 0)
584 self.assertEquals(b"asdfasdfl", rw.read())
585 self.assertEquals(9, rw.tell())
586 rw.seek(-4, 2)
587 self.assertEquals(5, rw.tell())
588 rw.seek(2, 1)
589 self.assertEquals(7, rw.tell())
590 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000591 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000592
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000593# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
594# properties:
595# - A single output character can correspond to many bytes of input.
596# - The number of input bytes to complete the character can be
597# undetermined until the last input byte is received.
598# - The number of input bytes can vary depending on previous input.
599# - A single input byte can correspond to many characters of output.
600# - The number of output characters can be undetermined until the
601# last input byte is received.
602# - The number of output characters can vary depending on previous input.
603
604class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
605 """
606 For testing seek/tell behavior with a stateful, buffering decoder.
607
608 Input is a sequence of words. Words may be fixed-length (length set
609 by input) or variable-length (period-terminated). In variable-length
610 mode, extra periods are ignored. Possible words are:
611 - 'i' followed by a number sets the input length, I (maximum 99).
612 When I is set to 0, words are space-terminated.
613 - 'o' followed by a number sets the output length, O (maximum 99).
614 - Any other word is converted into a word followed by a period on
615 the output. The output word consists of the input word truncated
616 or padded out with hyphens to make its length equal to O. If O
617 is 0, the word is output verbatim without truncating or padding.
618 I and O are initially set to 1. When I changes, any buffered input is
619 re-scanned according to the new I. EOF also terminates the last word.
620 """
621
622 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +0000623 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000624 self.reset()
625
626 def __repr__(self):
627 return '<SID %x>' % id(self)
628
629 def reset(self):
630 self.i = 1
631 self.o = 1
632 self.buffer = bytearray()
633
634 def getstate(self):
635 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
636 return bytes(self.buffer), i*100 + o
637
638 def setstate(self, state):
639 buffer, io = state
640 self.buffer = bytearray(buffer)
641 i, o = divmod(io, 100)
642 self.i, self.o = i ^ 1, o ^ 1
643
644 def decode(self, input, final=False):
645 output = ''
646 for b in input:
647 if self.i == 0: # variable-length, terminated with period
648 if b == ord('.'):
649 if self.buffer:
650 output += self.process_word()
651 else:
652 self.buffer.append(b)
653 else: # fixed-length, terminate after self.i bytes
654 self.buffer.append(b)
655 if len(self.buffer) == self.i:
656 output += self.process_word()
657 if final and self.buffer: # EOF terminates the last word
658 output += self.process_word()
659 return output
660
661 def process_word(self):
662 output = ''
663 if self.buffer[0] == ord('i'):
664 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
665 elif self.buffer[0] == ord('o'):
666 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
667 else:
668 output = self.buffer.decode('ascii')
669 if len(output) < self.o:
670 output += '-'*self.o # pad out with hyphens
671 if self.o:
672 output = output[:self.o] # truncate to output length
673 output += '.'
674 self.buffer = bytearray()
675 return output
676
Benjamin Petersonad9d48d2008-04-02 21:49:44 +0000677 codecEnabled = False
678
679 @classmethod
680 def lookupTestDecoder(cls, name):
681 if cls.codecEnabled and name == 'test_decoder':
682 return codecs.CodecInfo(
683 name='test_decoder', encode=None, decode=None,
684 incrementalencoder=None,
685 streamreader=None, streamwriter=None,
686 incrementaldecoder=cls)
687
688# Register the previous decoder for testing.
689# Disabled by default, tests will enable it.
690codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
691
692
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000693class StatefulIncrementalDecoderTest(unittest.TestCase):
694 """
695 Make sure the StatefulIncrementalDecoder actually works.
696 """
697
698 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000699 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000700 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000701 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000702 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000703 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000704 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000705 # I=0, O=6 (variable-length input, fixed-length output)
706 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
707 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000708 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +0000709 # I=6, O=3 (fixed-length input > fixed-length output)
710 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
711 # I=0, then 3; O=29, then 15 (with longer output)
712 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
713 'a----------------------------.' +
714 'b----------------------------.' +
715 'cde--------------------------.' +
716 'abcdefghijabcde.' +
717 'a.b------------.' +
718 '.c.------------.' +
719 'd.e------------.' +
720 'k--------------.' +
721 'l--------------.' +
722 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +0000723 ]
724
725 def testDecoder(self):
726 # Try a few one-shot test cases.
727 for input, eof, output in self.test_cases:
728 d = StatefulIncrementalDecoder()
729 self.assertEquals(d.decode(input, eof), output)
730
731 # Also test an unfinished decode, followed by forcing EOF.
732 d = StatefulIncrementalDecoder()
733 self.assertEquals(d.decode(b'oiabcd'), '')
734 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +0000735
736class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000737
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000738 def setUp(self):
739 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
740 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000741 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000742
Guido van Rossumd0712812007-04-11 16:32:43 +0000743 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000744 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000745
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000746 def testLineBuffering(self):
747 r = io.BytesIO()
748 b = io.BufferedWriter(r, 1000)
749 t = io.TextIOWrapper(b, newline="\n", line_buffering=True)
750 t.write("X")
751 self.assertEquals(r.getvalue(), b"") # No flush happened
752 t.write("Y\nZ")
753 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
754 t.write("A\rB")
755 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
756
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000757 def testEncodingErrorsReading(self):
758 # (1) default
759 b = io.BytesIO(b"abc\n\xff\n")
760 t = io.TextIOWrapper(b, encoding="ascii")
761 self.assertRaises(UnicodeError, t.read)
762 # (2) explicit strict
763 b = io.BytesIO(b"abc\n\xff\n")
764 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
765 self.assertRaises(UnicodeError, t.read)
766 # (3) ignore
767 b = io.BytesIO(b"abc\n\xff\n")
768 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore")
769 self.assertEquals(t.read(), "abc\n\n")
770 # (4) replace
771 b = io.BytesIO(b"abc\n\xff\n")
772 t = io.TextIOWrapper(b, encoding="ascii", errors="replace")
773 self.assertEquals(t.read(), "abc\n\ufffd\n")
774
775 def testEncodingErrorsWriting(self):
776 # (1) default
777 b = io.BytesIO()
778 t = io.TextIOWrapper(b, encoding="ascii")
779 self.assertRaises(UnicodeError, t.write, "\xff")
780 # (2) explicit strict
781 b = io.BytesIO()
782 t = io.TextIOWrapper(b, encoding="ascii", errors="strict")
783 self.assertRaises(UnicodeError, t.write, "\xff")
784 # (3) ignore
785 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000786 t = io.TextIOWrapper(b, encoding="ascii", errors="ignore",
787 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000788 t.write("abc\xffdef\n")
789 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000790 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000791 # (4) replace
792 b = io.BytesIO()
Guido van Rossumf64db9f2007-12-06 01:04:26 +0000793 t = io.TextIOWrapper(b, encoding="ascii", errors="replace",
794 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000795 t.write("abc\xffdef\n")
796 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +0000797 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +0000798
Guido van Rossum8358db22007-08-18 21:39:55 +0000799 def testNewlinesInput(self):
800 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
801 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
802 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000803 (None, normalized.decode("ascii").splitlines(True)),
804 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000805 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
806 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
807 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
808 ]:
809 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000810 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000811 self.assertEquals(txt.readlines(), expected)
812 txt.seek(0)
813 self.assertEquals(txt.read(), "".join(expected))
814
815 def testNewlinesOutput(self):
816 testdict = {
817 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
818 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
819 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
820 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
821 }
822 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
823 for newline, expected in tests:
824 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000825 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000826 txt.write("AAA\nB")
827 txt.write("BB\nCCC\n")
828 txt.write("X\rY\r\nZ")
829 txt.flush()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000830 self.assertEquals(buf.closed, False)
Guido van Rossum8358db22007-08-18 21:39:55 +0000831 self.assertEquals(buf.getvalue(), expected)
832
Guido van Rossum78892e42007-04-06 17:31:18 +0000833 def testNewlines(self):
834 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
835
836 tests = [
837 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000838 [ '', input_lines ],
839 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
840 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
841 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000842 ]
843
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000844 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000845
Guido van Rossum8358db22007-08-18 21:39:55 +0000846 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000847 # character in TextIOWrapper._pending_line.
848 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000849 # XXX: str.encode() should return bytes
850 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000851 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000852 for bufsize in range(1, 10):
853 for newline, exp_lines in tests:
854 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000855 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000856 encoding=encoding)
857 if do_reads:
858 got_lines = []
859 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000860 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000861 if c2 == '':
862 break
863 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000864 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000865 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000866 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000867
868 for got_line, exp_line in zip(got_lines, exp_lines):
869 self.assertEquals(got_line, exp_line)
870 self.assertEquals(len(got_lines), len(exp_lines))
871
Guido van Rossum8358db22007-08-18 21:39:55 +0000872 def testNewlinesInput(self):
873 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
874 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
875 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000876 (None, normalized.decode("ascii").splitlines(True)),
877 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000878 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
879 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
880 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
881 ]:
882 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000883 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000884 self.assertEquals(txt.readlines(), expected)
885 txt.seek(0)
886 self.assertEquals(txt.read(), "".join(expected))
887
888 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000889 data = "AAA\nBBB\rCCC\n"
890 data_lf = b"AAA\nBBB\rCCC\n"
891 data_cr = b"AAA\rBBB\rCCC\r"
892 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000893 save_linesep = os.linesep
894 try:
895 for os.linesep, newline, expected in [
896 ("\n", None, data_lf),
897 ("\r\n", None, data_crlf),
898 ("\n", "", data_lf),
899 ("\r\n", "", data_lf),
900 ("\n", "\n", data_lf),
901 ("\r\n", "\n", data_lf),
902 ("\n", "\r", data_cr),
903 ("\r\n", "\r", data_cr),
904 ("\n", "\r\n", data_crlf),
905 ("\r\n", "\r\n", data_crlf),
906 ]:
907 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000908 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000909 txt.write(data)
910 txt.close()
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000911 self.assertEquals(buf.closed, True)
912 self.assertRaises(ValueError, buf.getvalue)
Guido van Rossum18363582007-08-18 22:05:58 +0000913 finally:
914 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000915
Guido van Rossum9b76da62007-04-11 01:09:03 +0000916 # Systematic tests of the text I/O API
917
918 def testBasicIO(self):
919 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
920 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000921 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000922 f._CHUNK_SIZE = chunksize
923 self.assertEquals(f.write("abc"), 3)
924 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000925 f = io.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000926 f._CHUNK_SIZE = chunksize
927 self.assertEquals(f.tell(), 0)
928 self.assertEquals(f.read(), "abc")
929 cookie = f.tell()
930 self.assertEquals(f.seek(0), 0)
931 self.assertEquals(f.read(2), "ab")
932 self.assertEquals(f.read(1), "c")
933 self.assertEquals(f.read(1), "")
934 self.assertEquals(f.read(), "")
935 self.assertEquals(f.tell(), cookie)
936 self.assertEquals(f.seek(0), 0)
937 self.assertEquals(f.seek(0, 2), cookie)
938 self.assertEquals(f.write("def"), 3)
939 self.assertEquals(f.seek(cookie), cookie)
940 self.assertEquals(f.read(), "def")
941 if enc.startswith("utf"):
942 self.multi_line_test(f, enc)
943 f.close()
944
945 def multi_line_test(self, f, enc):
946 f.seek(0)
947 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000948 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000949 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000950 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000951 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000952 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000953 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000954 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000955 wlines.append((f.tell(), line))
956 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000957 f.seek(0)
958 rlines = []
959 while True:
960 pos = f.tell()
961 line = f.readline()
962 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000963 break
964 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000965 self.assertEquals(rlines, wlines)
966
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000967 def testTelling(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000968 f = io.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000969 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000970 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000971 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000972 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000973 p2 = f.tell()
974 f.seek(0)
975 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000976 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000977 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000978 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000979 self.assertEquals(f.tell(), p2)
980 f.seek(0)
981 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000982 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000983 self.assertRaises(IOError, f.tell)
984 self.assertEquals(f.tell(), p2)
985 f.close()
986
Guido van Rossumd76e7792007-04-17 02:38:04 +0000987 def testSeeking(self):
988 chunk_size = io.TextIOWrapper._CHUNK_SIZE
989 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000990 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000991 prefix = bytes(u_prefix.encode("utf-8"))
992 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000993 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000994 suffix = bytes(u_suffix.encode("utf-8"))
995 line = prefix + suffix
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000996 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +0000997 f.write(line*2)
998 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000999 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001000 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001001 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001002 self.assertEquals(f.tell(), prefix_size)
1003 self.assertEquals(f.readline(), u_suffix)
1004
1005 def testSeekingToo(self):
1006 # Regression test for a specific bug
1007 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001008 f = io.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001009 f.write(data)
1010 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001011 f = io.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001012 f._CHUNK_SIZE # Just test that it exists
1013 f._CHUNK_SIZE = 2
1014 f.readline()
1015 f.tell()
1016
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001017 def testSeekAndTell(self):
1018 """Test seek/tell using the StatefulIncrementalDecoder."""
1019
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001020 def testSeekAndTellWithData(data, min_pos=0):
1021 """Tell/seek to various points within a data stream and ensure
1022 that the decoded data returned by read() is consistent."""
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001023 f = io.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001024 f.write(data)
1025 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001026 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001027 decoded = f.read()
1028 f.close()
1029
Neal Norwitze2b07052008-03-18 19:52:05 +00001030 for i in range(min_pos, len(decoded) + 1): # seek positions
1031 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001032 f = io.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001033 self.assertEquals(f.read(i), decoded[:i])
1034 cookie = f.tell()
1035 self.assertEquals(f.read(j), decoded[i:i + j])
1036 f.seek(cookie)
1037 self.assertEquals(f.read(), decoded[i:])
1038 f.close()
1039
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001040 # Enable the test decoder.
1041 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001042
1043 # Run the tests.
1044 try:
1045 # Try each test case.
1046 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1047 testSeekAndTellWithData(input)
1048
1049 # Position each test case so that it crosses a chunk boundary.
1050 CHUNK_SIZE = io.TextIOWrapper._CHUNK_SIZE
1051 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1052 offset = CHUNK_SIZE - len(input)//2
1053 prefix = b'.'*offset
1054 # Don't bother seeking into the prefix (takes too long).
1055 min_pos = offset*2
1056 testSeekAndTellWithData(prefix + input, min_pos)
1057
1058 # Ensure our test decoder won't interfere with subsequent tests.
1059 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001060 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001061
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001062 def testEncodedWrites(self):
1063 data = "1234567890"
1064 tests = ("utf-16",
1065 "utf-16-le",
1066 "utf-16-be",
1067 "utf-32",
1068 "utf-32-le",
1069 "utf-32-be")
1070 for encoding in tests:
1071 buf = io.BytesIO()
1072 f = io.TextIOWrapper(buf, encoding=encoding)
1073 # Check if the BOM is written only once (see issue1753).
1074 f.write(data)
1075 f.write(data)
1076 f.seek(0)
1077 self.assertEquals(f.read(), data * 2)
1078 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1079
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001080 def timingTest(self):
1081 timer = time.time
1082 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001083 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001084 nlines = 10000
1085 nchars = len(line)
1086 nbytes = len(line.encode(enc))
1087 for chunk_size in (32, 64, 128, 256):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001088 f = io.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001089 f._CHUNK_SIZE = chunk_size
1090 t0 = timer()
1091 for i in range(nlines):
1092 f.write(line)
1093 f.flush()
1094 t1 = timer()
1095 f.seek(0)
1096 for line in f:
1097 pass
1098 t2 = timer()
1099 f.seek(0)
1100 while f.readline():
1101 pass
1102 t3 = timer()
1103 f.seek(0)
1104 while f.readline():
1105 f.tell()
1106 t4 = timer()
1107 f.close()
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001108 if support.verbose:
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001109 print("\nTiming test: %d lines of %d characters (%d bytes)" %
1110 (nlines, nchars, nbytes))
1111 print("File chunk size: %6s" % f._CHUNK_SIZE)
1112 print("Writing: %6.3f seconds" % (t1-t0))
1113 print("Reading using iteration: %6.3f seconds" % (t2-t1))
1114 print("Reading using readline(): %6.3f seconds" % (t3-t2))
1115 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
1116
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001117 def testReadOneByOne(self):
1118 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
1119 reads = ""
1120 while True:
1121 c = txt.read(1)
1122 if not c:
1123 break
1124 reads += c
1125 self.assertEquals(reads, "AA\nBB")
1126
1127 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
1128 def testReadByChunk(self):
1129 # make sure "\r\n" straddles 128 char boundary.
1130 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
1131 reads = ""
1132 while True:
1133 c = txt.read(128)
1134 if not c:
1135 break
1136 reads += c
1137 self.assertEquals(reads, "A"*127+"\nB")
1138
1139 def test_issue1395_1(self):
1140 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1141
1142 # read one char at a time
1143 reads = ""
1144 while True:
1145 c = txt.read(1)
1146 if not c:
1147 break
1148 reads += c
1149 self.assertEquals(reads, self.normalized)
1150
1151 def test_issue1395_2(self):
1152 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1153 txt._CHUNK_SIZE = 4
1154
1155 reads = ""
1156 while True:
1157 c = txt.read(4)
1158 if not c:
1159 break
1160 reads += c
1161 self.assertEquals(reads, self.normalized)
1162
1163 def test_issue1395_3(self):
1164 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1165 txt._CHUNK_SIZE = 4
1166
1167 reads = txt.read(4)
1168 reads += txt.read(4)
1169 reads += txt.readline()
1170 reads += txt.readline()
1171 reads += txt.readline()
1172 self.assertEquals(reads, self.normalized)
1173
1174 def test_issue1395_4(self):
1175 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1176 txt._CHUNK_SIZE = 4
1177
1178 reads = txt.read(4)
1179 reads += txt.read()
1180 self.assertEquals(reads, self.normalized)
1181
1182 def test_issue1395_5(self):
1183 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
1184 txt._CHUNK_SIZE = 4
1185
1186 reads = txt.read(4)
1187 pos = txt.tell()
1188 txt.seek(0)
1189 txt.seek(pos)
1190 self.assertEquals(txt.read(4), "BBB\n")
1191
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001192 def test_issue2282(self):
1193 buffer = io.BytesIO(self.testdata)
1194 txt = io.TextIOWrapper(buffer, encoding="ascii")
1195
1196 self.assertEqual(buffer.seekable(), txt.seekable())
1197
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001198 def test_newline_decoder(self):
1199 import codecs
1200 decoder = codecs.getincrementaldecoder("utf-8")()
1201 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1202
1203 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1204
1205 self.assertEquals(decoder.decode(b'\xe8'), "")
1206 self.assertEquals(decoder.decode(b'\xa2'), "")
1207 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
1208
1209 self.assertEquals(decoder.decode(b'\xe8'), "")
1210 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1211
1212 decoder.setstate((b'', 0))
1213 self.assertEquals(decoder.decode(b'\n'), "\n")
1214 self.assertEquals(decoder.decode(b'\r'), "")
1215 self.assertEquals(decoder.decode(b'', final=True), "\n")
1216 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
1217
1218 self.assertEquals(decoder.decode(b'\r'), "")
1219 self.assertEquals(decoder.decode(b'a'), "\na")
1220
1221 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
1222 self.assertEquals(decoder.decode(b'\r'), "")
1223 self.assertEquals(decoder.decode(b'\r'), "\n")
1224 self.assertEquals(decoder.decode(b'\na'), "\na")
1225
1226 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
1227 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
1228 self.assertEquals(decoder.decode(b'\n'), "\n")
1229 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
1230 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001231
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001232 decoder = codecs.getincrementaldecoder("utf-8")()
1233 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
1234 self.assertEquals(decoder.newlines, None)
1235 decoder.decode(b"abc\n\r")
1236 self.assertEquals(decoder.newlines, '\n')
1237 decoder.decode(b"\nabc")
1238 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1239 decoder.decode(b"abc\r")
1240 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1241 decoder.decode(b"abc")
1242 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1243 decoder.decode(b"abc\r")
1244 decoder.reset()
1245 self.assertEquals(decoder.decode(b"abc"), "abc")
1246 self.assertEquals(decoder.newlines, None)
1247
Guido van Rossum01a27522007-03-07 01:00:12 +00001248# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001249
Guido van Rossum5abbf752007-08-27 17:39:33 +00001250class MiscIOTest(unittest.TestCase):
1251
1252 def testImport__all__(self):
1253 for name in io.__all__:
1254 obj = getattr(io, name, None)
1255 self.assert_(obj is not None, name)
1256 if name == "open":
1257 continue
1258 elif "error" in name.lower():
1259 self.assert_(issubclass(obj, Exception), name)
1260 else:
1261 self.assert_(issubclass(obj, io.IOBase))
1262
Benjamin Peterson65676e42008-11-05 21:42:45 +00001263
Guido van Rossum28524c72007-02-27 05:47:44 +00001264def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001265 support.run_unittest(IOTest, BytesIOTest, StringIOTest,
Christian Heimes81ee3ef2008-05-04 22:42:01 +00001266 BufferedReaderTest, BufferedWriterTest,
1267 BufferedRWPairTest, BufferedRandomTest,
1268 StatefulIncrementalDecoderTest,
1269 TextIOWrapperTest, MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +00001270
1271if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001272 unittest.main()