blob: 697f69e5b2efc6dc2f6884b02103166bf1b7d1a7 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum8358db22007-08-18 21:39:55 +00003import os
Guido van Rossum34d69e52007-04-10 20:08:41 +00004import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00005import time
Guido van Rossumd4103952007-04-12 05:44:49 +00006import array
Guido van Rossum28524c72007-02-27 05:47:44 +00007import unittest
Guido van Rossum78892e42007-04-06 17:31:18 +00008from itertools import chain
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00009from test import test_support
10
Guido van Rossum53807da2007-04-10 19:01:47 +000011import io # The module under test
Guido van Rossum28524c72007-02-27 05:47:44 +000012
Guido van Rossuma9e20242007-03-08 00:43:48 +000013
Guido van Rossum53807da2007-04-10 19:01:47 +000014class MockRawIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000015
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000016 def __init__(self, read_stack=()):
17 self._read_stack = list(read_stack)
18 self._write_stack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000019
20 def read(self, n=None):
21 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000022 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000023 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000024 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000025
Guido van Rossum01a27522007-03-07 01:00:12 +000026 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000027 self._write_stack.append(b[:])
Guido van Rossum01a27522007-03-07 01:00:12 +000028 return len(b)
29
30 def writable(self):
31 return True
32
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 def fileno(self):
34 return 42
35
36 def readable(self):
37 return True
38
Guido van Rossum01a27522007-03-07 01:00:12 +000039 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000040 return True
41
Guido van Rossum01a27522007-03-07 01:00:12 +000042 def seek(self, pos, whence):
43 pass
44
45 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000046 return 42
47
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Guido van Rossum78892e42007-04-06 17:31:18 +000049class MockFileIO(io.BytesIO):
50
51 def __init__(self, data):
52 self.read_history = []
53 io.BytesIO.__init__(self, data)
54
55 def read(self, n=None):
56 res = io.BytesIO.read(self, n)
57 self.read_history.append(None if res is None else len(res))
58 return res
59
60
Guido van Rossum01a27522007-03-07 01:00:12 +000061class MockNonBlockWriterIO(io.RawIOBase):
Guido van Rossuma9e20242007-03-08 00:43:48 +000062
Guido van Rossum53807da2007-04-10 19:01:47 +000063 def __init__(self, blocking_script):
64 self._blocking_script = list(blocking_script)
Guido van Rossum01a27522007-03-07 01:00:12 +000065 self._write_stack = []
Guido van Rossuma9e20242007-03-08 00:43:48 +000066
Guido van Rossum01a27522007-03-07 01:00:12 +000067 def write(self, b):
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000068 self._write_stack.append(b[:])
Guido van Rossum53807da2007-04-10 19:01:47 +000069 n = self._blocking_script.pop(0)
Guido van Rossum01a27522007-03-07 01:00:12 +000070 if (n < 0):
Guido van Rossum141f7672007-04-10 00:22:16 +000071 raise io.BlockingIOError(0, "test blocking", -n)
Guido van Rossum01a27522007-03-07 01:00:12 +000072 else:
73 return n
Guido van Rossuma9e20242007-03-08 00:43:48 +000074
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def writable(self):
76 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000077
Guido van Rossuma9e20242007-03-08 00:43:48 +000078
Guido van Rossum28524c72007-02-27 05:47:44 +000079class IOTest(unittest.TestCase):
80
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000081 def tearDown(self):
82 test_support.unlink(test_support.TESTFN)
83
Guido van Rossum28524c72007-02-27 05:47:44 +000084 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +000085 self.assertEqual(f.write(b"blah."), 5)
86 self.assertEqual(f.seek(0), 0)
87 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +000088 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +000089 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000090 self.assertEqual(f.tell(), 5)
Guido van Rossum98297ee2007-11-06 21:34:58 +000091 self.assertEqual(f.write(buffer(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +000092 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +000093 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +000094 self.assertEqual(f.seek(-1, 2), 13)
95 self.assertEqual(f.tell(), 13)
96 self.assertEqual(f.truncate(12), 12)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +000097 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +000098 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +000099
Guido van Rossum9b76da62007-04-11 01:09:03 +0000100 def read_ops(self, f, buffered=False):
101 data = f.read(5)
102 self.assertEqual(data, b"hello")
Guido van Rossum98297ee2007-11-06 21:34:58 +0000103 data = buffer(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000104 self.assertEqual(f.readinto(data), 5)
105 self.assertEqual(data, b" worl")
106 self.assertEqual(f.readinto(data), 2)
107 self.assertEqual(len(data), 5)
108 self.assertEqual(data[:2], b"d\n")
109 self.assertEqual(f.seek(0), 0)
110 self.assertEqual(f.read(20), b"hello world\n")
111 self.assertEqual(f.read(1), b"")
Guido van Rossum98297ee2007-11-06 21:34:58 +0000112 self.assertEqual(f.readinto(buffer(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000113 self.assertEqual(f.seek(-6, 2), 6)
114 self.assertEqual(f.read(5), b"world")
115 self.assertEqual(f.read(0), b"")
Guido van Rossum98297ee2007-11-06 21:34:58 +0000116 self.assertEqual(f.readinto(buffer()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000117 self.assertEqual(f.seek(-6, 1), 5)
118 self.assertEqual(f.read(5), b" worl")
119 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000120 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000121 if buffered:
122 f.seek(0)
123 self.assertEqual(f.read(), b"hello world\n")
124 f.seek(6)
125 self.assertEqual(f.read(), b"world\n")
126 self.assertEqual(f.read(), b"")
127
Guido van Rossum34d69e52007-04-10 20:08:41 +0000128 LARGE = 2**31
129
Guido van Rossum53807da2007-04-10 19:01:47 +0000130 def large_file_ops(self, f):
131 assert f.readable()
132 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000133 self.assertEqual(f.seek(self.LARGE), self.LARGE)
134 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000135 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000136 self.assertEqual(f.tell(), self.LARGE + 3)
137 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000138 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000139 self.assertEqual(f.tell(), self.LARGE + 2)
140 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000141 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Guido van Rossumdc0b1a12007-04-12 22:55:07 +0000142 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000143 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
144 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000145 self.assertEqual(f.read(2), b"x")
146
Guido van Rossum28524c72007-02-27 05:47:44 +0000147 def test_raw_file_io(self):
148 f = io.open(test_support.TESTFN, "wb", buffering=0)
149 self.assertEqual(f.readable(), False)
150 self.assertEqual(f.writable(), True)
151 self.assertEqual(f.seekable(), True)
152 self.write_ops(f)
153 f.close()
154 f = io.open(test_support.TESTFN, "rb", buffering=0)
155 self.assertEqual(f.readable(), True)
156 self.assertEqual(f.writable(), False)
157 self.assertEqual(f.seekable(), True)
158 self.read_ops(f)
159 f.close()
160
Guido van Rossum87429772007-04-10 21:06:59 +0000161 def test_buffered_file_io(self):
162 f = io.open(test_support.TESTFN, "wb")
163 self.assertEqual(f.readable(), False)
164 self.assertEqual(f.writable(), True)
165 self.assertEqual(f.seekable(), True)
166 self.write_ops(f)
167 f.close()
168 f = io.open(test_support.TESTFN, "rb")
169 self.assertEqual(f.readable(), True)
170 self.assertEqual(f.writable(), False)
171 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000172 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000173 f.close()
174
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000175 def test_readline(self):
176 f = io.open(test_support.TESTFN, "wb")
177 f.write(b"abc\ndef\nxyzzy\nfoo")
178 f.close()
179 f = io.open(test_support.TESTFN, "rb")
180 self.assertEqual(f.readline(), b"abc\n")
181 self.assertEqual(f.readline(10), b"def\n")
182 self.assertEqual(f.readline(2), b"xy")
183 self.assertEqual(f.readline(4), b"zzy\n")
184 self.assertEqual(f.readline(), b"foo")
185 f.close()
186
Guido van Rossum28524c72007-02-27 05:47:44 +0000187 def test_raw_bytes_io(self):
188 f = io.BytesIO()
189 self.write_ops(f)
190 data = f.getvalue()
191 self.assertEqual(data, b"hello world\n")
192 f = io.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000193 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000194
Guido van Rossum53807da2007-04-10 19:01:47 +0000195 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000196 # On Windows and Mac OSX this test comsumes large resources; It takes
197 # a long time to build the >2GB file and takes >2GB of disk space
198 # therefore the resource must be enabled to run this test.
199 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
200 if not test_support.is_resource_enabled("largefile"):
201 print("\nTesting large file ops skipped on %s." % sys.platform,
202 file=sys.stderr)
203 print("It requires %d bytes and a long time." % self.LARGE,
204 file=sys.stderr)
205 print("Use 'regrtest.py -u largefile test_io' to run it.",
206 file=sys.stderr)
207 return
Guido van Rossum87429772007-04-10 21:06:59 +0000208 f = io.open(test_support.TESTFN, "w+b", 0)
Guido van Rossum53807da2007-04-10 19:01:47 +0000209 self.large_file_ops(f)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000210 f.close()
Guido van Rossum87429772007-04-10 21:06:59 +0000211 f = io.open(test_support.TESTFN, "w+b")
212 self.large_file_ops(f)
213 f.close()
214
215 def test_with_open(self):
216 for bufsize in (0, 1, 100):
217 f = None
218 with open(test_support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000219 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000220 self.assertEqual(f.closed, True)
221 f = None
222 try:
223 with open(test_support.TESTFN, "wb", bufsize) as f:
224 1/0
225 except ZeroDivisionError:
226 self.assertEqual(f.closed, True)
227 else:
228 self.fail("1/0 didn't raise an exception")
229
230 def test_destructor(self):
231 record = []
232 class MyFileIO(io.FileIO):
233 def __del__(self):
234 record.append(1)
235 io.FileIO.__del__(self)
236 def close(self):
237 record.append(2)
238 io.FileIO.close(self)
239 def flush(self):
240 record.append(3)
241 io.FileIO.flush(self)
242 f = MyFileIO(test_support.TESTFN, "w")
243 f.write("xxx")
244 del f
245 self.assertEqual(record, [1, 2, 3])
246
247 def test_close_flushes(self):
248 f = io.open(test_support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000249 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000250 f.close()
251 f = io.open(test_support.TESTFN, "rb")
252 self.assertEqual(f.read(), b"xxx")
253 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000254
Guido van Rossumd4103952007-04-12 05:44:49 +0000255 def test_array_writes(self):
256 a = array.array('i', range(10))
Guido van Rossumbae07c92007-10-08 02:46:15 +0000257 n = len(memoryview(a))
Guido van Rossumd4103952007-04-12 05:44:49 +0000258 f = io.open(test_support.TESTFN, "wb", 0)
259 self.assertEqual(f.write(a), n)
260 f.close()
261 f = io.open(test_support.TESTFN, "wb")
262 self.assertEqual(f.write(a), n)
263 f.close()
264
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000265 def test_closefd(self):
266 self.assertRaises(ValueError, io.open, test_support.TESTFN, 'w',
267 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000268
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000269class MemorySeekTestMixin:
Guido van Rossuma9e20242007-03-08 00:43:48 +0000270
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000271 def testInit(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000272 buf = self.buftype("1234567890")
273 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000274
275 def testRead(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000276 buf = self.buftype("1234567890")
277 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000278
279 self.assertEquals(buf[:1], bytesIo.read(1))
280 self.assertEquals(buf[1:5], bytesIo.read(4))
281 self.assertEquals(buf[5:], bytesIo.read(900))
Guido van Rossum78892e42007-04-06 17:31:18 +0000282 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000283
284 def testReadNoArgs(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000285 buf = self.buftype("1234567890")
286 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000287
288 self.assertEquals(buf, bytesIo.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000289 self.assertEquals(self.EOF, bytesIo.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000290
291 def testSeek(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000292 buf = self.buftype("1234567890")
293 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000294
295 bytesIo.read(5)
296 bytesIo.seek(0)
297 self.assertEquals(buf, bytesIo.read())
298
299 bytesIo.seek(3)
300 self.assertEquals(buf[3:], bytesIo.read())
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000301 self.assertRaises(TypeError, bytesIo.seek, 0.0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000302
303 def testTell(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000304 buf = self.buftype("1234567890")
305 bytesIo = self.ioclass(buf)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000306
307 self.assertEquals(0, bytesIo.tell())
308 bytesIo.seek(5)
309 self.assertEquals(5, bytesIo.tell())
310 bytesIo.seek(10000)
311 self.assertEquals(10000, bytesIo.tell())
312
Guido van Rossuma9e20242007-03-08 00:43:48 +0000313
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000314class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum39478e82007-08-27 17:23:59 +0000315 @staticmethod
316 def buftype(s):
317 return s.encode("utf-8")
Guido van Rossum78892e42007-04-06 17:31:18 +0000318 ioclass = io.BytesIO
319 EOF = b""
320
321
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000322class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
Guido van Rossum78892e42007-04-06 17:31:18 +0000323 buftype = str
324 ioclass = io.StringIO
325 EOF = ""
326
327
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000328class BufferedReaderTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000329
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000330 def testRead(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000331 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000332 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000333
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000334 self.assertEquals(b"abcdef", bufio.read(6))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000335
Guido van Rossum78892e42007-04-06 17:31:18 +0000336 def testBuffering(self):
337 data = b"abcdefghi"
338 dlen = len(data)
339
340 tests = [
341 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
342 [ 100, [ 3, 3, 3], [ dlen ] ],
343 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
344 ]
345
346 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000347 rawio = MockFileIO(data)
348 bufio = io.BufferedReader(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000349 pos = 0
350 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000351 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000352 pos += nbytes
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000353 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000354
Guido van Rossum01a27522007-03-07 01:00:12 +0000355 def testReadNonBlocking(self):
356 # Inject some None's in there to simulate EWOULDBLOCK
Guido van Rossum53807da2007-04-10 19:01:47 +0000357 rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000358 bufio = io.BufferedReader(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000359
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000360 self.assertEquals(b"abcd", bufio.read(6))
361 self.assertEquals(b"e", bufio.read(1))
362 self.assertEquals(b"fg", bufio.read())
363 self.assert_(None is bufio.read())
364 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000365
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000366 def testReadToEof(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000367 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000368 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000369
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000370 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000371
372 def testReadNoArgs(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000373 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000374 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000375
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000376 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000377
378 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000379 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000380 bufio = io.BufferedReader(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000381
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000382 self.assertEquals(42, bufio.fileno())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000383
384 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000385 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000386 # this test. Else, write it.
387 pass
388
Guido van Rossuma9e20242007-03-08 00:43:48 +0000389
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000390class BufferedWriterTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000391
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000392 def testWrite(self):
393 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum53807da2007-04-10 19:01:47 +0000394 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000395 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000396
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000397 bufio.write(b"abc")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000398
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000399 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000400
401 def testWriteOverflow(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000403 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000404
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000405 bufio.write(b"abc")
406 bufio.write(b"defghijkl")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000407
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000408 self.assertEquals(b"abcdefghijkl", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000409
Guido van Rossum01a27522007-03-07 01:00:12 +0000410 def testWriteNonBlocking(self):
411 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000412 bufio = io.BufferedWriter(raw, 8, 16)
Guido van Rossum01a27522007-03-07 01:00:12 +0000413
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000414 bufio.write(b"asdf")
415 bufio.write(b"asdfa")
Guido van Rossum01a27522007-03-07 01:00:12 +0000416 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
417
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000418 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000419 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000420 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000421 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
422 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
423
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000424 bufio.write(b"asdfasdfasdf")
Guido van Rossum01a27522007-03-07 01:00:12 +0000425
Guido van Rossum78892e42007-04-06 17:31:18 +0000426 # XXX I don't like this test. It relies too heavily on how the
427 # algorithm actually works, which we might change. Refactor
428 # later.
Guido van Rossum01a27522007-03-07 01:00:12 +0000429
430 def testFileno(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000431 rawio = MockRawIO((b"abc", b"d", b"efg"))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000432 bufio = io.BufferedWriter(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000433
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000434 self.assertEquals(42, bufio.fileno())
Guido van Rossum01a27522007-03-07 01:00:12 +0000435
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000436 def testFlush(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000437 writer = MockRawIO()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000438 bufio = io.BufferedWriter(writer, 8)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000439
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000440 bufio.write(b"abc")
441 bufio.flush()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000442
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000443 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000444
Guido van Rossuma9e20242007-03-08 00:43:48 +0000445
Guido van Rossum01a27522007-03-07 01:00:12 +0000446class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000447
Guido van Rossum01a27522007-03-07 01:00:12 +0000448 def testRWPair(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000449 r = MockRawIO(())
450 w = MockRawIO()
Guido van Rossum01a27522007-03-07 01:00:12 +0000451 pair = io.BufferedRWPair(r, w)
452
453 # XXX need implementation
454
Guido van Rossuma9e20242007-03-08 00:43:48 +0000455
Guido van Rossum78892e42007-04-06 17:31:18 +0000456class BufferedRandomTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +0000457
Guido van Rossum01a27522007-03-07 01:00:12 +0000458 def testReadAndWrite(self):
Guido van Rossum53807da2007-04-10 19:01:47 +0000459 raw = MockRawIO((b"asdf", b"ghjk"))
Guido van Rossum01a27522007-03-07 01:00:12 +0000460 rw = io.BufferedRandom(raw, 8, 12)
461
462 self.assertEqual(b"as", rw.read(2))
463 rw.write(b"ddd")
464 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000465 self.assertFalse(raw._write_stack) # Buffer writes
Guido van Rossum01a27522007-03-07 01:00:12 +0000466 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000467 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +0000468
469 def testSeekAndTell(self):
470 raw = io.BytesIO(b"asdfghjkl")
471 rw = io.BufferedRandom(raw)
472
473 self.assertEquals(b"as", rw.read(2))
474 self.assertEquals(2, rw.tell())
475 rw.seek(0, 0)
476 self.assertEquals(b"asdf", rw.read(4))
477
478 rw.write(b"asdf")
479 rw.seek(0, 0)
480 self.assertEquals(b"asdfasdfl", rw.read())
481 self.assertEquals(9, rw.tell())
482 rw.seek(-4, 2)
483 self.assertEquals(5, rw.tell())
484 rw.seek(2, 1)
485 self.assertEquals(7, rw.tell())
486 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000487 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +0000488
Guido van Rossum78892e42007-04-06 17:31:18 +0000489
490class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000491
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000492 def setUp(self):
493 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
494 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
495
Guido van Rossumd0712812007-04-11 16:32:43 +0000496 def tearDown(self):
497 test_support.unlink(test_support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000498
Guido van Rossum8358db22007-08-18 21:39:55 +0000499 def testNewlinesInput(self):
500 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
501 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
502 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000503 (None, normalized.decode("ascii").splitlines(True)),
504 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000505 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
506 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
507 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
508 ]:
509 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000510 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000511 self.assertEquals(txt.readlines(), expected)
512 txt.seek(0)
513 self.assertEquals(txt.read(), "".join(expected))
514
515 def testNewlinesOutput(self):
516 testdict = {
517 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
518 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
519 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
520 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
521 }
522 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
523 for newline, expected in tests:
524 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000525 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000526 txt.write("AAA\nB")
527 txt.write("BB\nCCC\n")
528 txt.write("X\rY\r\nZ")
529 txt.flush()
530 self.assertEquals(buf.getvalue(), expected)
531
Guido van Rossum78892e42007-04-06 17:31:18 +0000532 def testNewlines(self):
533 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
534
535 tests = [
536 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +0000537 [ '', input_lines ],
538 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
539 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
540 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +0000541 ]
542
Guido van Rossumc2f93dc2007-05-24 00:50:02 +0000543 encodings = ('utf-8', 'latin-1')
Guido van Rossum78892e42007-04-06 17:31:18 +0000544
Guido van Rossum8358db22007-08-18 21:39:55 +0000545 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +0000546 # character in TextIOWrapper._pending_line.
547 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +0000548 # XXX: str.encode() should return bytes
549 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +0000550 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +0000551 for bufsize in range(1, 10):
552 for newline, exp_lines in tests:
553 bufio = io.BufferedReader(io.BytesIO(data), bufsize)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000554 textio = io.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +0000555 encoding=encoding)
556 if do_reads:
557 got_lines = []
558 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000559 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +0000560 if c2 == '':
561 break
562 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000563 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +0000564 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000565 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +0000566
567 for got_line, exp_line in zip(got_lines, exp_lines):
568 self.assertEquals(got_line, exp_line)
569 self.assertEquals(len(got_lines), len(exp_lines))
570
Guido van Rossum8358db22007-08-18 21:39:55 +0000571 def testNewlinesInput(self):
572 testdata = b"AAA\nBBB\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
573 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
574 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000575 (None, normalized.decode("ascii").splitlines(True)),
576 ("", testdata.decode("ascii").splitlines(True)),
Guido van Rossum8358db22007-08-18 21:39:55 +0000577 ("\n", ["AAA\n", "BBB\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
578 ("\r\n", ["AAA\nBBB\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
579 ("\r", ["AAA\nBBB\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
580 ]:
581 buf = io.BytesIO(testdata)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000582 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +0000583 self.assertEquals(txt.readlines(), expected)
584 txt.seek(0)
585 self.assertEquals(txt.read(), "".join(expected))
586
587 def testNewlinesOutput(self):
Guido van Rossum8358db22007-08-18 21:39:55 +0000588 data = "AAA\nBBB\rCCC\n"
589 data_lf = b"AAA\nBBB\rCCC\n"
590 data_cr = b"AAA\rBBB\rCCC\r"
591 data_crlf = b"AAA\r\nBBB\rCCC\r\n"
Guido van Rossum18363582007-08-18 22:05:58 +0000592 save_linesep = os.linesep
593 try:
594 for os.linesep, newline, expected in [
595 ("\n", None, data_lf),
596 ("\r\n", None, data_crlf),
597 ("\n", "", data_lf),
598 ("\r\n", "", data_lf),
599 ("\n", "\n", data_lf),
600 ("\r\n", "\n", data_lf),
601 ("\n", "\r", data_cr),
602 ("\r\n", "\r", data_cr),
603 ("\n", "\r\n", data_crlf),
604 ("\r\n", "\r\n", data_crlf),
605 ]:
606 buf = io.BytesIO()
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000607 txt = io.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum18363582007-08-18 22:05:58 +0000608 txt.write(data)
609 txt.close()
610 self.assertEquals(buf.getvalue(), expected)
611 finally:
612 os.linesep = save_linesep
Guido van Rossum8358db22007-08-18 21:39:55 +0000613
Guido van Rossum9b76da62007-04-11 01:09:03 +0000614 # Systematic tests of the text I/O API
615
616 def testBasicIO(self):
617 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
618 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
619 f = io.open(test_support.TESTFN, "w+", encoding=enc)
620 f._CHUNK_SIZE = chunksize
621 self.assertEquals(f.write("abc"), 3)
622 f.close()
623 f = io.open(test_support.TESTFN, "r+", encoding=enc)
624 f._CHUNK_SIZE = chunksize
625 self.assertEquals(f.tell(), 0)
626 self.assertEquals(f.read(), "abc")
627 cookie = f.tell()
628 self.assertEquals(f.seek(0), 0)
629 self.assertEquals(f.read(2), "ab")
630 self.assertEquals(f.read(1), "c")
631 self.assertEquals(f.read(1), "")
632 self.assertEquals(f.read(), "")
633 self.assertEquals(f.tell(), cookie)
634 self.assertEquals(f.seek(0), 0)
635 self.assertEquals(f.seek(0, 2), cookie)
636 self.assertEquals(f.write("def"), 3)
637 self.assertEquals(f.seek(cookie), cookie)
638 self.assertEquals(f.read(), "def")
639 if enc.startswith("utf"):
640 self.multi_line_test(f, enc)
641 f.close()
642
643 def multi_line_test(self, f, enc):
644 f.seek(0)
645 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000646 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000647 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +0000648 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000649 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +0000650 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +0000651 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000652 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +0000653 wlines.append((f.tell(), line))
654 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000655 f.seek(0)
656 rlines = []
657 while True:
658 pos = f.tell()
659 line = f.readline()
660 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +0000661 break
662 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +0000663 self.assertEquals(rlines, wlines)
664
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000665 def testTelling(self):
666 f = io.open(test_support.TESTFN, "w+", encoding="utf8")
667 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000668 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000669 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000670 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000671 p2 = f.tell()
672 f.seek(0)
673 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000674 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000675 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000676 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000677 self.assertEquals(f.tell(), p2)
678 f.seek(0)
679 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000680 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000681 self.assertRaises(IOError, f.tell)
682 self.assertEquals(f.tell(), p2)
683 f.close()
684
Guido van Rossumd76e7792007-04-17 02:38:04 +0000685 def testSeeking(self):
686 chunk_size = io.TextIOWrapper._CHUNK_SIZE
687 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000688 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +0000689 prefix = bytes(u_prefix.encode("utf-8"))
690 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000691 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +0000692 suffix = bytes(u_suffix.encode("utf-8"))
693 line = prefix + suffix
694 f = io.open(test_support.TESTFN, "wb")
695 f.write(line*2)
696 f.close()
697 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
698 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000699 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +0000700 self.assertEquals(f.tell(), prefix_size)
701 self.assertEquals(f.readline(), u_suffix)
702
703 def testSeekingToo(self):
704 # Regression test for a specific bug
705 data = b'\xe0\xbf\xbf\n'
706 f = io.open(test_support.TESTFN, "wb")
707 f.write(data)
708 f.close()
709 f = io.open(test_support.TESTFN, "r", encoding="utf-8")
710 f._CHUNK_SIZE # Just test that it exists
711 f._CHUNK_SIZE = 2
712 f.readline()
713 f.tell()
714
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000715 def timingTest(self):
716 timer = time.time
717 enc = "utf8"
Guido van Rossumef87d6e2007-05-02 19:09:54 +0000718 line = "\0\x0f\xff\u0fff\uffff\U000fffff\U0010ffff"*3 + "\n"
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +0000719 nlines = 10000
720 nchars = len(line)
721 nbytes = len(line.encode(enc))
722 for chunk_size in (32, 64, 128, 256):
723 f = io.open(test_support.TESTFN, "w+", encoding=enc)
724 f._CHUNK_SIZE = chunk_size
725 t0 = timer()
726 for i in range(nlines):
727 f.write(line)
728 f.flush()
729 t1 = timer()
730 f.seek(0)
731 for line in f:
732 pass
733 t2 = timer()
734 f.seek(0)
735 while f.readline():
736 pass
737 t3 = timer()
738 f.seek(0)
739 while f.readline():
740 f.tell()
741 t4 = timer()
742 f.close()
743 if test_support.verbose:
744 print("\nTiming test: %d lines of %d characters (%d bytes)" %
745 (nlines, nchars, nbytes))
746 print("File chunk size: %6s" % f._CHUNK_SIZE)
747 print("Writing: %6.3f seconds" % (t1-t0))
748 print("Reading using iteration: %6.3f seconds" % (t2-t1))
749 print("Reading using readline(): %6.3f seconds" % (t3-t2))
750 print("Using readline()+tell(): %6.3f seconds" % (t4-t3))
751
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +0000752 def testReadOneByOne(self):
753 txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
754 reads = ""
755 while True:
756 c = txt.read(1)
757 if not c:
758 break
759 reads += c
760 self.assertEquals(reads, "AA\nBB")
761
762 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
763 def testReadByChunk(self):
764 # make sure "\r\n" straddles 128 char boundary.
765 txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
766 reads = ""
767 while True:
768 c = txt.read(128)
769 if not c:
770 break
771 reads += c
772 self.assertEquals(reads, "A"*127+"\nB")
773
774 def test_issue1395_1(self):
775 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
776
777 # read one char at a time
778 reads = ""
779 while True:
780 c = txt.read(1)
781 if not c:
782 break
783 reads += c
784 self.assertEquals(reads, self.normalized)
785
786 def test_issue1395_2(self):
787 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
788 txt._CHUNK_SIZE = 4
789
790 reads = ""
791 while True:
792 c = txt.read(4)
793 if not c:
794 break
795 reads += c
796 self.assertEquals(reads, self.normalized)
797
798 def test_issue1395_3(self):
799 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
800 txt._CHUNK_SIZE = 4
801
802 reads = txt.read(4)
803 reads += txt.read(4)
804 reads += txt.readline()
805 reads += txt.readline()
806 reads += txt.readline()
807 self.assertEquals(reads, self.normalized)
808
809 def test_issue1395_4(self):
810 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
811 txt._CHUNK_SIZE = 4
812
813 reads = txt.read(4)
814 reads += txt.read()
815 self.assertEquals(reads, self.normalized)
816
817 def test_issue1395_5(self):
818 txt = io.TextIOWrapper(io.BytesIO(self.testdata), encoding="ascii")
819 txt._CHUNK_SIZE = 4
820
821 reads = txt.read(4)
822 pos = txt.tell()
823 txt.seek(0)
824 txt.seek(pos)
825 self.assertEquals(txt.read(4), "BBB\n")
826
827 def test_newline_decoder(self):
828 import codecs
829 decoder = codecs.getincrementaldecoder("utf-8")()
830 decoder = io.IncrementalNewlineDecoder(decoder, translate=True)
831
832 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
833
834 self.assertEquals(decoder.decode(b'\xe8'), "")
835 self.assertEquals(decoder.decode(b'\xa2'), "")
836 self.assertEquals(decoder.decode(b'\x88'), "\u8888")
837
838 self.assertEquals(decoder.decode(b'\xe8'), "")
839 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
840
841 decoder.setstate((b'', 0))
842 self.assertEquals(decoder.decode(b'\n'), "\n")
843 self.assertEquals(decoder.decode(b'\r'), "")
844 self.assertEquals(decoder.decode(b'', final=True), "\n")
845 self.assertEquals(decoder.decode(b'\r', final=True), "\n")
846
847 self.assertEquals(decoder.decode(b'\r'), "")
848 self.assertEquals(decoder.decode(b'a'), "\na")
849
850 self.assertEquals(decoder.decode(b'\r\r\n'), "\n\n")
851 self.assertEquals(decoder.decode(b'\r'), "")
852 self.assertEquals(decoder.decode(b'\r'), "\n")
853 self.assertEquals(decoder.decode(b'\na'), "\na")
854
855 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r\n'), "\u8888\n")
856 self.assertEquals(decoder.decode(b'\xe8\xa2\x88'), "\u8888")
857 self.assertEquals(decoder.decode(b'\n'), "\n")
858 self.assertEquals(decoder.decode(b'\xe8\xa2\x88\r'), "\u8888")
859 self.assertEquals(decoder.decode(b'\n'), "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000860
Guido van Rossum01a27522007-03-07 01:00:12 +0000861# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000862
Guido van Rossum5abbf752007-08-27 17:39:33 +0000863class MiscIOTest(unittest.TestCase):
864
865 def testImport__all__(self):
866 for name in io.__all__:
867 obj = getattr(io, name, None)
868 self.assert_(obj is not None, name)
869 if name == "open":
870 continue
871 elif "error" in name.lower():
872 self.assert_(issubclass(obj, Exception), name)
873 else:
874 self.assert_(issubclass(obj, io.IOBase))
875
876
Guido van Rossum28524c72007-02-27 05:47:44 +0000877def test_main():
Guido van Rossum78892e42007-04-06 17:31:18 +0000878 test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
879 BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000880 BufferedWriterTest, BufferedRWPairTest,
Guido van Rossum5abbf752007-08-27 17:39:33 +0000881 BufferedRandomTest, TextIOWrapperTest,
882 MiscIOTest)
Guido van Rossum28524c72007-02-27 05:47:44 +0000883
884if __name__ == "__main__":
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000885 unittest.main()