blob: 1b80add9039ac045e0f1cfd4b7d1de5cd1eba066 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Guido van Rossum28524c72007-02-27 05:47:44 +0000293 def test_raw_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000294 f = self.open(support.TESTFN, "wb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000295 self.assertEqual(f.readable(), False)
296 self.assertEqual(f.writable(), True)
297 self.assertEqual(f.seekable(), True)
298 self.write_ops(f)
299 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000300 f = self.open(support.TESTFN, "rb", buffering=0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000301 self.assertEqual(f.readable(), True)
302 self.assertEqual(f.writable(), False)
303 self.assertEqual(f.seekable(), True)
304 self.read_ops(f)
305 f.close()
306
Guido van Rossum87429772007-04-10 21:06:59 +0000307 def test_buffered_file_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000308 f = self.open(support.TESTFN, "wb")
Guido van Rossum87429772007-04-10 21:06:59 +0000309 self.assertEqual(f.readable(), False)
310 self.assertEqual(f.writable(), True)
311 self.assertEqual(f.seekable(), True)
312 self.write_ops(f)
313 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000314 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000315 self.assertEqual(f.readable(), True)
316 self.assertEqual(f.writable(), False)
317 self.assertEqual(f.seekable(), True)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000318 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000319 f.close()
320
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000321 def test_readline(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000322 f = io.open(support.TESTFN, "wb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000323 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000324 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000325 f = self.open(support.TESTFN, "rb")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000326 self.assertEqual(f.readline(), b"abc\n")
327 self.assertEqual(f.readline(10), b"def\n")
328 self.assertEqual(f.readline(2), b"xy")
329 self.assertEqual(f.readline(4), b"zzy\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000330 self.assertEqual(f.readline(), b"foo\x00bar\n")
331 self.assertEqual(f.readline(), b"another line")
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000332 f.close()
333
Guido van Rossum28524c72007-02-27 05:47:44 +0000334 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000335 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000336 self.write_ops(f)
337 data = f.getvalue()
338 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000339 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000340 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000341
Guido van Rossum53807da2007-04-10 19:01:47 +0000342 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000343 # On Windows and Mac OSX this test comsumes large resources; It takes
344 # a long time to build the >2GB file and takes >2GB of disk space
345 # therefore the resource must be enabled to run this test.
346 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000347 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000348 print("\nTesting large file ops skipped on %s." % sys.platform,
349 file=sys.stderr)
350 print("It requires %d bytes and a long time." % self.LARGE,
351 file=sys.stderr)
352 print("Use 'regrtest.py -u largefile test_io' to run it.",
353 file=sys.stderr)
354 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000355 with self.open(support.TESTFN, "w+b", 0) as f:
356 self.large_file_ops(f)
357 with self.open(support.TESTFN, "w+b") as f:
358 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000359
360 def test_with_open(self):
361 for bufsize in (0, 1, 100):
362 f = None
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000363 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000364 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000365 self.assertEqual(f.closed, True)
366 f = None
367 try:
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000368 with open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000369 1/0
370 except ZeroDivisionError:
371 self.assertEqual(f.closed, True)
372 else:
373 self.fail("1/0 didn't raise an exception")
374
Antoine Pitrou08838b62009-01-21 00:55:13 +0000375 # issue 5008
376 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000377 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000378 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000379 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000380 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000381 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000382 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000383 with self.open(support.TESTFN, "a") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000384 self.assert_(f.tell() > 0)
385
Guido van Rossum87429772007-04-10 21:06:59 +0000386 def test_destructor(self):
387 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000388 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000389 def __del__(self):
390 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000391 try:
392 f = super().__del__
393 except AttributeError:
394 pass
395 else:
396 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000397 def close(self):
398 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000399 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000400 def flush(self):
401 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000402 super().flush()
403 f = MyFileIO(support.TESTFN, "wb")
404 f.write(b"xxx")
405 del f
406 self.assertEqual(record, [1, 2, 3])
407 f = open(support.TESTFN, "rb")
408 self.assertEqual(f.read(), b"xxx")
409
410 def _check_base_destructor(self, base):
411 record = []
412 class MyIO(base):
413 def __init__(self):
414 # This exercises the availability of attributes on object
415 # destruction.
416 # (in the C version, close() is called by the tp_dealloc
417 # function, not by __del__)
418 self.on_del = 1
419 self.on_close = 2
420 self.on_flush = 3
421 def __del__(self):
422 record.append(self.on_del)
423 try:
424 f = super().__del__
425 except AttributeError:
426 pass
427 else:
428 f()
429 def close(self):
430 record.append(self.on_close)
431 super().close()
432 def flush(self):
433 record.append(self.on_flush)
434 super().flush()
435 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000436 del f
437 self.assertEqual(record, [1, 2, 3])
438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000439 def test_IOBase_destructor(self):
440 self._check_base_destructor(self.IOBase)
441
442 def test_RawIOBase_destructor(self):
443 self._check_base_destructor(self.RawIOBase)
444
445 def test_BufferedIOBase_destructor(self):
446 self._check_base_destructor(self.BufferedIOBase)
447
448 def test_TextIOBase_destructor(self):
449 self._check_base_destructor(self.TextIOBase)
450
Guido van Rossum87429772007-04-10 21:06:59 +0000451 def test_close_flushes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000452 f = self.open(support.TESTFN, "wb")
Guido van Rossum2b08b382007-05-08 20:18:39 +0000453 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000454 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000455 f = self.open(support.TESTFN, "rb")
Guido van Rossum87429772007-04-10 21:06:59 +0000456 self.assertEqual(f.read(), b"xxx")
457 f.close()
Guido van Rossuma9e20242007-03-08 00:43:48 +0000458
Guido van Rossumd4103952007-04-12 05:44:49 +0000459 def test_array_writes(self):
460 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000461 n = len(a.tostring())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000462 f = self.open(support.TESTFN, "wb", 0)
Guido van Rossumd4103952007-04-12 05:44:49 +0000463 self.assertEqual(f.write(a), n)
464 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000465 f = self.open(support.TESTFN, "wb")
Guido van Rossumd4103952007-04-12 05:44:49 +0000466 self.assertEqual(f.write(a), n)
467 f.close()
468
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000469 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000470 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000471 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000472
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000473 def test_read_closed(self):
474 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000475 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000476 with self.open(support.TESTFN, "r") as f:
477 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000478 self.assertEqual(file.read(), "egg\n")
479 file.seek(0)
480 file.close()
481 self.assertRaises(ValueError, file.read)
482
483 def test_no_closefd_with_filename(self):
484 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000485 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000486
487 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000488 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000489 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000490 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000491 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000492 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000493 self.assertEqual(file.buffer.raw.closefd, False)
494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000495 def test_garbage_collection(self):
496 # FileIO objects are collected, and collecting them flushes
497 # all data to disk.
498 f = self.FileIO(support.TESTFN, "wb")
499 f.write(b"abcxxx")
500 f.f = f
501 wr = weakref.ref(f)
502 del f
503 gc.collect()
504 self.assert_(wr() is None, wr)
505 with open(support.TESTFN, "rb") as f:
506 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000507
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000508class CIOTest(IOTest):
509 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000510
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000511class PyIOTest(IOTest):
512 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000513
Guido van Rossuma9e20242007-03-08 00:43:48 +0000514
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000515class CommonBufferedTests:
516 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
517
518 def test_fileno(self):
519 rawio = self.MockRawIO()
520 bufio = self.tp(rawio)
521
522 self.assertEquals(42, bufio.fileno())
523
524 def test_no_fileno(self):
525 # XXX will we always have fileno() function? If so, kill
526 # this test. Else, write it.
527 pass
528
529 def test_invalid_args(self):
530 rawio = self.MockRawIO()
531 bufio = self.tp(rawio)
532 # Invalid whence
533 self.assertRaises(ValueError, bufio.seek, 0, -1)
534 self.assertRaises(ValueError, bufio.seek, 0, 3)
535
536 def test_override_destructor(self):
537 tp = self.tp
538 record = []
539 class MyBufferedIO(tp):
540 def __del__(self):
541 record.append(1)
542 try:
543 f = super().__del__
544 except AttributeError:
545 pass
546 else:
547 f()
548 def close(self):
549 record.append(2)
550 super().close()
551 def flush(self):
552 record.append(3)
553 super().flush()
554 rawio = self.MockRawIO()
555 bufio = MyBufferedIO(rawio)
556 writable = bufio.writable()
557 del bufio
558 if writable:
559 self.assertEqual(record, [1, 2, 3])
560 else:
561 self.assertEqual(record, [1, 2])
562
563 def test_context_manager(self):
564 # Test usability as a context manager
565 rawio = self.MockRawIO()
566 bufio = self.tp(rawio)
567 def _with():
568 with bufio:
569 pass
570 _with()
571 # bufio should now be closed, and using it a second time should raise
572 # a ValueError.
573 self.assertRaises(ValueError, _with)
574
575 def test_error_through_destructor(self):
576 # Test that the exception state is not modified by a destructor,
577 # even if close() fails.
578 rawio = self.CloseFailureIO()
579 def f():
580 self.tp(rawio).xyzzy
581 with support.captured_output("stderr") as s:
582 self.assertRaises(AttributeError, f)
583 s = s.getvalue().strip()
584 if s:
585 # The destructor *may* have printed an unraisable error, check it
586 self.assertEqual(len(s.splitlines()), 1)
587 self.assert_(s.startswith("Exception IOError: "), s)
588 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000589
590
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000591class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
592 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000593
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000594 def test_constructor(self):
595 rawio = self.MockRawIO([b"abc"])
596 bufio = self.tp(rawio)
597 bufio.__init__(rawio)
598 bufio.__init__(rawio, buffer_size=1024)
599 bufio.__init__(rawio, buffer_size=16)
600 self.assertEquals(b"abc", bufio.read())
601 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
602 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
603 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
604 rawio = self.MockRawIO([b"abc"])
605 bufio.__init__(rawio)
606 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000607
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000608 def test_read(self):
609 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
610 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000611 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000612 # Invalid args
613 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000614
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000615 def test_read1(self):
616 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
617 bufio = self.tp(rawio)
618 self.assertEquals(b"a", bufio.read(1))
619 self.assertEquals(b"b", bufio.read1(1))
620 self.assertEquals(rawio._reads, 1)
621 self.assertEquals(b"c", bufio.read1(100))
622 self.assertEquals(rawio._reads, 1)
623 self.assertEquals(b"d", bufio.read1(100))
624 self.assertEquals(rawio._reads, 2)
625 self.assertEquals(b"efg", bufio.read1(100))
626 self.assertEquals(rawio._reads, 3)
627 self.assertEquals(b"", bufio.read1(100))
628 # Invalid args
629 self.assertRaises(ValueError, bufio.read1, -1)
630
631 def test_readinto(self):
632 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
633 bufio = self.tp(rawio)
634 b = bytearray(2)
635 self.assertEquals(bufio.readinto(b), 2)
636 self.assertEquals(b, b"ab")
637 self.assertEquals(bufio.readinto(b), 2)
638 self.assertEquals(b, b"cd")
639 self.assertEquals(bufio.readinto(b), 2)
640 self.assertEquals(b, b"ef")
641 self.assertEquals(bufio.readinto(b), 1)
642 self.assertEquals(b, b"gf")
643 self.assertEquals(bufio.readinto(b), 0)
644 self.assertEquals(b, b"gf")
645
646 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000647 data = b"abcdefghi"
648 dlen = len(data)
649
650 tests = [
651 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
652 [ 100, [ 3, 3, 3], [ dlen ] ],
653 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
654 ]
655
656 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 rawio = self.MockFileIO(data)
658 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000659 pos = 0
660 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000661 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000662 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000663 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000664 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000667 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000668 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
669 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000670
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000671 self.assertEquals(b"abcd", bufio.read(6))
672 self.assertEquals(b"e", bufio.read(1))
673 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000674 self.assertEquals(b"", bufio.peek(1))
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000675 self.assert_(None is bufio.read())
676 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000678 def test_read_past_eof(self):
679 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
680 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000681
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000682 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000684 def test_read_all(self):
685 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
686 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000687
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000688 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000690 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000691 try:
692 # Write out many bytes with exactly the same number of 0's,
693 # 1's... 255's. This will help us check that concurrent reading
694 # doesn't duplicate or forget contents.
695 N = 1000
696 l = list(range(256)) * N
697 random.shuffle(l)
698 s = bytes(bytearray(l))
699 with io.open(support.TESTFN, "wb") as f:
700 f.write(s)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000701 with io.open(support.TESTFN, self.read_mode, buffering=0) as raw:
702 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000703 errors = []
704 results = []
705 def f():
706 try:
707 # Intra-buffer read then buffer-flushing read
708 for n in cycle([1, 19]):
709 s = bufio.read(n)
710 if not s:
711 break
712 # list.append() is atomic
713 results.append(s)
714 except Exception as e:
715 errors.append(e)
716 raise
717 threads = [threading.Thread(target=f) for x in range(20)]
718 for t in threads:
719 t.start()
720 time.sleep(0.02) # yield
721 for t in threads:
722 t.join()
723 self.assertFalse(errors,
724 "the following exceptions were caught: %r" % errors)
725 s = b''.join(results)
726 for i in range(256):
727 c = bytes(bytearray([i]))
728 self.assertEqual(s.count(c), N)
729 finally:
730 support.unlink(support.TESTFN)
731
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 def test_misbehaved_io(self):
733 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
734 bufio = self.tp(rawio)
735 self.assertRaises(IOError, bufio.seek, 0)
736 self.assertRaises(IOError, bufio.tell)
737
738class CBufferedReaderTest(BufferedReaderTest):
739 tp = io.BufferedReader
740
741 def test_constructor(self):
742 BufferedReaderTest.test_constructor(self)
743 # The allocation can succeed on 32-bit builds, e.g. with more
744 # than 2GB RAM and a 64-bit kernel.
745 if sys.maxsize > 0x7FFFFFFF:
746 rawio = self.MockRawIO()
747 bufio = self.tp(rawio)
748 self.assertRaises((OverflowError, MemoryError, ValueError),
749 bufio.__init__, rawio, sys.maxsize)
750
751 def test_initialization(self):
752 rawio = self.MockRawIO([b"abc"])
753 bufio = self.tp(rawio)
754 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
755 self.assertRaises(ValueError, bufio.read)
756 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
757 self.assertRaises(ValueError, bufio.read)
758 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
759 self.assertRaises(ValueError, bufio.read)
760
761 def test_misbehaved_io_read(self):
762 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
763 bufio = self.tp(rawio)
764 # _pyio.BufferedReader seems to implement reading different, so that
765 # checking this is not so easy.
766 self.assertRaises(IOError, bufio.read, 10)
767
768 def test_garbage_collection(self):
769 # C BufferedReader objects are collected.
770 # The Python version has __del__, so it ends into gc.garbage instead
771 rawio = self.FileIO(support.TESTFN, "w+b")
772 f = self.tp(rawio)
773 f.f = f
774 wr = weakref.ref(f)
775 del f
776 gc.collect()
777 self.assert_(wr() is None, wr)
778
779class PyBufferedReaderTest(BufferedReaderTest):
780 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000781
Guido van Rossuma9e20242007-03-08 00:43:48 +0000782
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000783class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
784 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000786 def test_constructor(self):
787 rawio = self.MockRawIO()
788 bufio = self.tp(rawio)
789 bufio.__init__(rawio)
790 bufio.__init__(rawio, buffer_size=1024)
791 bufio.__init__(rawio, buffer_size=16)
792 self.assertEquals(3, bufio.write(b"abc"))
793 bufio.flush()
794 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
796 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
797 bufio.__init__(rawio)
798 self.assertEquals(3, bufio.write(b"ghi"))
799 bufio.flush()
800 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
801
802 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000803 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000804 writer = self.MockRawIO()
805 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000806 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000807 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000808
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000809 def test_write_overflow(self):
810 writer = self.MockRawIO()
811 bufio = self.tp(writer, 8)
812 contents = b"abcdefghijklmnop"
813 for n in range(0, len(contents), 3):
814 bufio.write(contents[n:n+3])
815 flushed = b"".join(writer._write_stack)
816 # At least (total - 8) bytes were implicitly flushed, perhaps more
817 # depending on the implementation.
818 self.assert_(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000819
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000820 def check_writes(self, intermediate_func):
821 # Lots of writes, test the flushed output is as expected.
822 contents = bytes(range(256)) * 1000
823 n = 0
824 writer = self.MockRawIO()
825 bufio = self.tp(writer, 13)
826 # Generator of write sizes: repeat each N 15 times then proceed to N+1
827 def gen_sizes():
828 for size in count(1):
829 for i in range(15):
830 yield size
831 sizes = gen_sizes()
832 while n < len(contents):
833 size = min(next(sizes), len(contents) - n)
834 self.assertEquals(bufio.write(contents[n:n+size]), size)
835 intermediate_func(bufio)
836 n += size
837 bufio.flush()
838 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000839
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000840 def test_writes(self):
841 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000842
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000843 def test_writes_and_flushes(self):
844 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000845
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000846 def test_writes_and_seeks(self):
847 def _seekabs(bufio):
848 pos = bufio.tell()
849 bufio.seek(pos + 1, 0)
850 bufio.seek(pos - 1, 0)
851 bufio.seek(pos, 0)
852 self.check_writes(_seekabs)
853 def _seekrel(bufio):
854 pos = bufio.seek(0, 1)
855 bufio.seek(+1, 1)
856 bufio.seek(-1, 1)
857 bufio.seek(pos, 0)
858 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000859
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000860 def test_writes_and_truncates(self):
861 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000862
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000863 def test_write_non_blocking(self):
864 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000865 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000866
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000867 self.assertEquals(bufio.write(b"abcd"), 4)
868 self.assertEquals(bufio.write(b"efghi"), 5)
869 # 1 byte will be written, the rest will be buffered
870 raw.block_on(b"k")
871 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000873 # 8 bytes will be written, 8 will be buffered and the rest will be lost
874 raw.block_on(b"0")
875 try:
876 bufio.write(b"opqrwxyz0123456789")
877 except self.BlockingIOError as e:
878 written = e.characters_written
879 else:
880 self.fail("BlockingIOError should have been raised")
881 self.assertEquals(written, 16)
882 self.assertEquals(raw.pop_written(),
883 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000885 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
886 s = raw.pop_written()
887 # Previously buffered bytes were flushed
888 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000889
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000890 def test_write_and_rewind(self):
891 raw = io.BytesIO()
892 bufio = self.tp(raw, 4)
893 self.assertEqual(bufio.write(b"abcdef"), 6)
894 self.assertEqual(bufio.tell(), 6)
895 bufio.seek(0, 0)
896 self.assertEqual(bufio.write(b"XY"), 2)
897 bufio.seek(6, 0)
898 self.assertEqual(raw.getvalue(), b"XYcdef")
899 self.assertEqual(bufio.write(b"123456"), 6)
900 bufio.flush()
901 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000902
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000903 def test_flush(self):
904 writer = self.MockRawIO()
905 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000906 bufio.write(b"abc")
907 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000908 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000909
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000910 def test_destructor(self):
911 writer = self.MockRawIO()
912 bufio = self.tp(writer, 8)
913 bufio.write(b"abc")
914 del bufio
915 self.assertEquals(b"abc", writer._write_stack[0])
916
917 def test_truncate(self):
918 # Truncate implicitly flushes the buffer.
919 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
920 bufio = self.tp(raw, 8)
921 bufio.write(b"abcdef")
922 self.assertEqual(bufio.truncate(3), 3)
923 self.assertEqual(bufio.tell(), 3)
924 with io.open(support.TESTFN, "rb", buffering=0) as f:
925 self.assertEqual(f.read(), b"abc")
926
927 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000928 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000929 # Write out many bytes from many threads and test they were
930 # all flushed.
931 N = 1000
932 contents = bytes(range(256)) * N
933 sizes = cycle([1, 19])
934 n = 0
935 queue = deque()
936 while n < len(contents):
937 size = next(sizes)
938 queue.append(contents[n:n+size])
939 n += size
940 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000941 # We use a real file object because it allows us to
942 # exercise situations where the GIL is released before
943 # writing the buffer to the raw streams. This is in addition
944 # to concurrency issues due to switching threads in the middle
945 # of Python code.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000946 with io.open(support.TESTFN, self.write_mode, buffering=0) as raw:
947 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000948 errors = []
949 def f():
950 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000951 while True:
952 try:
953 s = queue.popleft()
954 except IndexError:
955 return
Antoine Pitrou87695762008-08-14 22:44:29 +0000956 bufio.write(s)
957 except Exception as e:
958 errors.append(e)
959 raise
960 threads = [threading.Thread(target=f) for x in range(20)]
961 for t in threads:
962 t.start()
963 time.sleep(0.02) # yield
964 for t in threads:
965 t.join()
966 self.assertFalse(errors,
967 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000968 bufio.close()
969 with io.open(support.TESTFN, "rb") as f:
970 s = f.read()
971 for i in range(256):
972 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +0000973 finally:
974 support.unlink(support.TESTFN)
975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000976 def test_misbehaved_io(self):
977 rawio = self.MisbehavedRawIO()
978 bufio = self.tp(rawio, 5)
979 self.assertRaises(IOError, bufio.seek, 0)
980 self.assertRaises(IOError, bufio.tell)
981 self.assertRaises(IOError, bufio.write, b"abcdef")
982
Benjamin Peterson59406a92009-03-26 17:10:29 +0000983 def test_max_buffer_size_deprecation(self):
984 with support.check_warnings() as w:
985 warnings.simplefilter("always", DeprecationWarning)
986 self.tp(self.MockRawIO(), 8, 12)
987 self.assertEqual(len(w.warnings), 1)
988 warning = w.warnings[0]
989 self.assertTrue(warning.category is DeprecationWarning)
990 self.assertEqual(str(warning.message),
991 "max_buffer_size is deprecated")
992
993
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000994class CBufferedWriterTest(BufferedWriterTest):
995 tp = io.BufferedWriter
996
997 def test_constructor(self):
998 BufferedWriterTest.test_constructor(self)
999 # The allocation can succeed on 32-bit builds, e.g. with more
1000 # than 2GB RAM and a 64-bit kernel.
1001 if sys.maxsize > 0x7FFFFFFF:
1002 rawio = self.MockRawIO()
1003 bufio = self.tp(rawio)
1004 self.assertRaises((OverflowError, MemoryError, ValueError),
1005 bufio.__init__, rawio, sys.maxsize)
1006
1007 def test_initialization(self):
1008 rawio = self.MockRawIO()
1009 bufio = self.tp(rawio)
1010 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1011 self.assertRaises(ValueError, bufio.write, b"def")
1012 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1013 self.assertRaises(ValueError, bufio.write, b"def")
1014 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1015 self.assertRaises(ValueError, bufio.write, b"def")
1016
1017 def test_garbage_collection(self):
1018 # C BufferedWriter objects are collected, and collecting them flushes
1019 # all data to disk.
1020 # The Python version has __del__, so it ends into gc.garbage instead
1021 rawio = self.FileIO(support.TESTFN, "w+b")
1022 f = self.tp(rawio)
1023 f.write(b"123xxx")
1024 f.x = f
1025 wr = weakref.ref(f)
1026 del f
1027 gc.collect()
1028 self.assert_(wr() is None, wr)
1029 with open(support.TESTFN, "rb") as f:
1030 self.assertEqual(f.read(), b"123xxx")
1031
1032
1033class PyBufferedWriterTest(BufferedWriterTest):
1034 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001035
Guido van Rossum01a27522007-03-07 01:00:12 +00001036class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001037
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001038 def test_basic(self):
1039 r = self.MockRawIO(())
1040 w = self.MockRawIO()
1041 pair = self.tp(r, w)
Benjamin Peterson92035012008-12-27 16:00:54 +00001042 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001043
Benjamin Peterson59406a92009-03-26 17:10:29 +00001044 def test_max_buffer_size_deprecation(self):
1045 with support.check_warnings() as w:
1046 warnings.simplefilter("always", DeprecationWarning)
1047 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1048 self.assertEqual(len(w.warnings), 1)
1049 warning = w.warnings[0]
1050 self.assertTrue(warning.category is DeprecationWarning)
1051 self.assertEqual(str(warning.message),
1052 "max_buffer_size is deprecated")
1053
Benjamin Peterson92035012008-12-27 16:00:54 +00001054 # XXX More Tests
Guido van Rossum01a27522007-03-07 01:00:12 +00001055
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001056class CBufferedRWPairTest(BufferedRWPairTest):
1057 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001059class PyBufferedRWPairTest(BufferedRWPairTest):
1060 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001061
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001062
1063class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1064 read_mode = "rb+"
1065 write_mode = "wb+"
1066
1067 def test_constructor(self):
1068 BufferedReaderTest.test_constructor(self)
1069 BufferedWriterTest.test_constructor(self)
1070
1071 def test_read_and_write(self):
1072 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001073 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001074
1075 self.assertEqual(b"as", rw.read(2))
1076 rw.write(b"ddd")
1077 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001078 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001080 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001081
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082 def test_seek_and_tell(self):
1083 raw = self.BytesIO(b"asdfghjkl")
1084 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001085
1086 self.assertEquals(b"as", rw.read(2))
1087 self.assertEquals(2, rw.tell())
1088 rw.seek(0, 0)
1089 self.assertEquals(b"asdf", rw.read(4))
1090
1091 rw.write(b"asdf")
1092 rw.seek(0, 0)
1093 self.assertEquals(b"asdfasdfl", rw.read())
1094 self.assertEquals(9, rw.tell())
1095 rw.seek(-4, 2)
1096 self.assertEquals(5, rw.tell())
1097 rw.seek(2, 1)
1098 self.assertEquals(7, rw.tell())
1099 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001100 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001101
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001102 def check_flush_and_read(self, read_func):
1103 raw = self.BytesIO(b"abcdefghi")
1104 bufio = self.tp(raw)
1105
1106 self.assertEquals(b"ab", read_func(bufio, 2))
1107 bufio.write(b"12")
1108 self.assertEquals(b"ef", read_func(bufio, 2))
1109 self.assertEquals(6, bufio.tell())
1110 bufio.flush()
1111 self.assertEquals(6, bufio.tell())
1112 self.assertEquals(b"ghi", read_func(bufio))
1113 raw.seek(0, 0)
1114 raw.write(b"XYZ")
1115 # flush() resets the read buffer
1116 bufio.flush()
1117 bufio.seek(0, 0)
1118 self.assertEquals(b"XYZ", read_func(bufio, 3))
1119
1120 def test_flush_and_read(self):
1121 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1122
1123 def test_flush_and_readinto(self):
1124 def _readinto(bufio, n=-1):
1125 b = bytearray(n if n >= 0 else 9999)
1126 n = bufio.readinto(b)
1127 return bytes(b[:n])
1128 self.check_flush_and_read(_readinto)
1129
1130 def test_flush_and_peek(self):
1131 def _peek(bufio, n=-1):
1132 # This relies on the fact that the buffer can contain the whole
1133 # raw stream, otherwise peek() can return less.
1134 b = bufio.peek(n)
1135 if n != -1:
1136 b = b[:n]
1137 bufio.seek(len(b), 1)
1138 return b
1139 self.check_flush_and_read(_peek)
1140
1141 def test_flush_and_write(self):
1142 raw = self.BytesIO(b"abcdefghi")
1143 bufio = self.tp(raw)
1144
1145 bufio.write(b"123")
1146 bufio.flush()
1147 bufio.write(b"45")
1148 bufio.flush()
1149 bufio.seek(0, 0)
1150 self.assertEquals(b"12345fghi", raw.getvalue())
1151 self.assertEquals(b"12345fghi", bufio.read())
1152
1153 def test_threads(self):
1154 BufferedReaderTest.test_threads(self)
1155 BufferedWriterTest.test_threads(self)
1156
1157 def test_writes_and_peek(self):
1158 def _peek(bufio):
1159 bufio.peek(1)
1160 self.check_writes(_peek)
1161 def _peek(bufio):
1162 pos = bufio.tell()
1163 bufio.seek(-1, 1)
1164 bufio.peek(1)
1165 bufio.seek(pos, 0)
1166 self.check_writes(_peek)
1167
1168 def test_writes_and_reads(self):
1169 def _read(bufio):
1170 bufio.seek(-1, 1)
1171 bufio.read(1)
1172 self.check_writes(_read)
1173
1174 def test_writes_and_read1s(self):
1175 def _read1(bufio):
1176 bufio.seek(-1, 1)
1177 bufio.read1(1)
1178 self.check_writes(_read1)
1179
1180 def test_writes_and_readintos(self):
1181 def _read(bufio):
1182 bufio.seek(-1, 1)
1183 bufio.readinto(bytearray(1))
1184 self.check_writes(_read)
1185
1186 def test_misbehaved_io(self):
1187 BufferedReaderTest.test_misbehaved_io(self)
1188 BufferedWriterTest.test_misbehaved_io(self)
1189
1190class CBufferedRandomTest(BufferedRandomTest):
1191 tp = io.BufferedRandom
1192
1193 def test_constructor(self):
1194 BufferedRandomTest.test_constructor(self)
1195 # The allocation can succeed on 32-bit builds, e.g. with more
1196 # than 2GB RAM and a 64-bit kernel.
1197 if sys.maxsize > 0x7FFFFFFF:
1198 rawio = self.MockRawIO()
1199 bufio = self.tp(rawio)
1200 self.assertRaises((OverflowError, MemoryError, ValueError),
1201 bufio.__init__, rawio, sys.maxsize)
1202
1203 def test_garbage_collection(self):
1204 CBufferedReaderTest.test_garbage_collection(self)
1205 CBufferedWriterTest.test_garbage_collection(self)
1206
1207class PyBufferedRandomTest(BufferedRandomTest):
1208 tp = pyio.BufferedRandom
1209
1210
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001211# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1212# properties:
1213# - A single output character can correspond to many bytes of input.
1214# - The number of input bytes to complete the character can be
1215# undetermined until the last input byte is received.
1216# - The number of input bytes can vary depending on previous input.
1217# - A single input byte can correspond to many characters of output.
1218# - The number of output characters can be undetermined until the
1219# last input byte is received.
1220# - The number of output characters can vary depending on previous input.
1221
1222class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1223 """
1224 For testing seek/tell behavior with a stateful, buffering decoder.
1225
1226 Input is a sequence of words. Words may be fixed-length (length set
1227 by input) or variable-length (period-terminated). In variable-length
1228 mode, extra periods are ignored. Possible words are:
1229 - 'i' followed by a number sets the input length, I (maximum 99).
1230 When I is set to 0, words are space-terminated.
1231 - 'o' followed by a number sets the output length, O (maximum 99).
1232 - Any other word is converted into a word followed by a period on
1233 the output. The output word consists of the input word truncated
1234 or padded out with hyphens to make its length equal to O. If O
1235 is 0, the word is output verbatim without truncating or padding.
1236 I and O are initially set to 1. When I changes, any buffered input is
1237 re-scanned according to the new I. EOF also terminates the last word.
1238 """
1239
1240 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001241 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001242 self.reset()
1243
1244 def __repr__(self):
1245 return '<SID %x>' % id(self)
1246
1247 def reset(self):
1248 self.i = 1
1249 self.o = 1
1250 self.buffer = bytearray()
1251
1252 def getstate(self):
1253 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1254 return bytes(self.buffer), i*100 + o
1255
1256 def setstate(self, state):
1257 buffer, io = state
1258 self.buffer = bytearray(buffer)
1259 i, o = divmod(io, 100)
1260 self.i, self.o = i ^ 1, o ^ 1
1261
1262 def decode(self, input, final=False):
1263 output = ''
1264 for b in input:
1265 if self.i == 0: # variable-length, terminated with period
1266 if b == ord('.'):
1267 if self.buffer:
1268 output += self.process_word()
1269 else:
1270 self.buffer.append(b)
1271 else: # fixed-length, terminate after self.i bytes
1272 self.buffer.append(b)
1273 if len(self.buffer) == self.i:
1274 output += self.process_word()
1275 if final and self.buffer: # EOF terminates the last word
1276 output += self.process_word()
1277 return output
1278
1279 def process_word(self):
1280 output = ''
1281 if self.buffer[0] == ord('i'):
1282 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1283 elif self.buffer[0] == ord('o'):
1284 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1285 else:
1286 output = self.buffer.decode('ascii')
1287 if len(output) < self.o:
1288 output += '-'*self.o # pad out with hyphens
1289 if self.o:
1290 output = output[:self.o] # truncate to output length
1291 output += '.'
1292 self.buffer = bytearray()
1293 return output
1294
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001295 codecEnabled = False
1296
1297 @classmethod
1298 def lookupTestDecoder(cls, name):
1299 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001300 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001301 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001302 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001303 incrementalencoder=None,
1304 streamreader=None, streamwriter=None,
1305 incrementaldecoder=cls)
1306
1307# Register the previous decoder for testing.
1308# Disabled by default, tests will enable it.
1309codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1310
1311
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001312class StatefulIncrementalDecoderTest(unittest.TestCase):
1313 """
1314 Make sure the StatefulIncrementalDecoder actually works.
1315 """
1316
1317 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001318 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001319 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001320 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001321 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001322 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001323 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001324 # I=0, O=6 (variable-length input, fixed-length output)
1325 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1326 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001327 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001328 # I=6, O=3 (fixed-length input > fixed-length output)
1329 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1330 # I=0, then 3; O=29, then 15 (with longer output)
1331 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1332 'a----------------------------.' +
1333 'b----------------------------.' +
1334 'cde--------------------------.' +
1335 'abcdefghijabcde.' +
1336 'a.b------------.' +
1337 '.c.------------.' +
1338 'd.e------------.' +
1339 'k--------------.' +
1340 'l--------------.' +
1341 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001342 ]
1343
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001344 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001345 # Try a few one-shot test cases.
1346 for input, eof, output in self.test_cases:
1347 d = StatefulIncrementalDecoder()
1348 self.assertEquals(d.decode(input, eof), output)
1349
1350 # Also test an unfinished decode, followed by forcing EOF.
1351 d = StatefulIncrementalDecoder()
1352 self.assertEquals(d.decode(b'oiabcd'), '')
1353 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001354
1355class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001356
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001357 def setUp(self):
1358 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1359 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001360 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001361
Guido van Rossumd0712812007-04-11 16:32:43 +00001362 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001363 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001365 def test_constructor(self):
1366 r = self.BytesIO(b"\xc3\xa9\n\n")
1367 b = self.BufferedReader(r, 1000)
1368 t = self.TextIOWrapper(b)
1369 t.__init__(b, encoding="latin1", newline="\r\n")
1370 self.assertEquals(t.encoding, "latin1")
1371 self.assertEquals(t.line_buffering, False)
1372 t.__init__(b, encoding="utf8", line_buffering=True)
1373 self.assertEquals(t.encoding, "utf8")
1374 self.assertEquals(t.line_buffering, True)
1375 self.assertEquals("\xe9\n", t.readline())
1376 self.assertRaises(TypeError, t.__init__, b, newline=42)
1377 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1378
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001379 def test_repr(self):
1380 raw = self.BytesIO("hello".encode("utf-8"))
1381 b = self.BufferedReader(raw)
1382 t = self.TextIOWrapper(b, encoding="utf-8")
1383 self.assertEqual(repr(t), "<TextIOWrapper encoding=utf-8>")
1384
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001385 def test_line_buffering(self):
1386 r = self.BytesIO()
1387 b = self.BufferedWriter(r, 1000)
1388 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001389 t.write("X")
1390 self.assertEquals(r.getvalue(), b"") # No flush happened
1391 t.write("Y\nZ")
1392 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1393 t.write("A\rB")
1394 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1395
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001396 def test_encoding(self):
1397 # Check the encoding attribute is always set, and valid
1398 b = self.BytesIO()
1399 t = self.TextIOWrapper(b, encoding="utf8")
1400 self.assertEqual(t.encoding, "utf8")
1401 t = self.TextIOWrapper(b)
1402 self.assert_(t.encoding is not None)
1403 codecs.lookup(t.encoding)
1404
1405 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001406 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001407 b = self.BytesIO(b"abc\n\xff\n")
1408 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001409 self.assertRaises(UnicodeError, t.read)
1410 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411 b = self.BytesIO(b"abc\n\xff\n")
1412 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001413 self.assertRaises(UnicodeError, t.read)
1414 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001415 b = self.BytesIO(b"abc\n\xff\n")
1416 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001417 self.assertEquals(t.read(), "abc\n\n")
1418 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419 b = self.BytesIO(b"abc\n\xff\n")
1420 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001421 self.assertEquals(t.read(), "abc\n\ufffd\n")
1422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001424 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 b = self.BytesIO()
1426 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001427 self.assertRaises(UnicodeError, t.write, "\xff")
1428 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001429 b = self.BytesIO()
1430 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001431 self.assertRaises(UnicodeError, t.write, "\xff")
1432 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001433 b = self.BytesIO()
1434 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001435 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001436 t.write("abc\xffdef\n")
1437 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001438 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001439 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 b = self.BytesIO()
1441 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001442 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001443 t.write("abc\xffdef\n")
1444 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001445 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001446
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001447 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001448 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1449
1450 tests = [
1451 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001452 [ '', input_lines ],
1453 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1454 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1455 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001456 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001457 encodings = (
1458 'utf-8', 'latin-1',
1459 'utf-16', 'utf-16-le', 'utf-16-be',
1460 'utf-32', 'utf-32-le', 'utf-32-be',
1461 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001462
Guido van Rossum8358db22007-08-18 21:39:55 +00001463 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001464 # character in TextIOWrapper._pending_line.
1465 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001466 # XXX: str.encode() should return bytes
1467 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001468 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001469 for bufsize in range(1, 10):
1470 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001471 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1472 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001473 encoding=encoding)
1474 if do_reads:
1475 got_lines = []
1476 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001477 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001478 if c2 == '':
1479 break
1480 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001481 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001482 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001483 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001484
1485 for got_line, exp_line in zip(got_lines, exp_lines):
1486 self.assertEquals(got_line, exp_line)
1487 self.assertEquals(len(got_lines), len(exp_lines))
1488
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 def test_newlines_input(self):
1490 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001491 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1492 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001493 (None, normalized.decode("ascii").splitlines(True)),
1494 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001495 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1496 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1497 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001498 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001499 buf = self.BytesIO(testdata)
1500 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001501 self.assertEquals(txt.readlines(), expected)
1502 txt.seek(0)
1503 self.assertEquals(txt.read(), "".join(expected))
1504
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001505 def test_newlines_output(self):
1506 testdict = {
1507 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1508 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1509 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1510 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1511 }
1512 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1513 for newline, expected in tests:
1514 buf = self.BytesIO()
1515 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1516 txt.write("AAA\nB")
1517 txt.write("BB\nCCC\n")
1518 txt.write("X\rY\r\nZ")
1519 txt.flush()
1520 self.assertEquals(buf.closed, False)
1521 self.assertEquals(buf.getvalue(), expected)
1522
1523 def test_destructor(self):
1524 l = []
1525 base = self.BytesIO
1526 class MyBytesIO(base):
1527 def close(self):
1528 l.append(self.getvalue())
1529 base.close(self)
1530 b = MyBytesIO()
1531 t = self.TextIOWrapper(b, encoding="ascii")
1532 t.write("abc")
1533 del t
1534 self.assertEquals([b"abc"], l)
1535
1536 def test_override_destructor(self):
1537 record = []
1538 class MyTextIO(self.TextIOWrapper):
1539 def __del__(self):
1540 record.append(1)
1541 try:
1542 f = super().__del__
1543 except AttributeError:
1544 pass
1545 else:
1546 f()
1547 def close(self):
1548 record.append(2)
1549 super().close()
1550 def flush(self):
1551 record.append(3)
1552 super().flush()
1553 b = self.BytesIO()
1554 t = MyTextIO(b, encoding="ascii")
1555 del t
1556 self.assertEqual(record, [1, 2, 3])
1557
1558 def test_error_through_destructor(self):
1559 # Test that the exception state is not modified by a destructor,
1560 # even if close() fails.
1561 rawio = self.CloseFailureIO()
1562 def f():
1563 self.TextIOWrapper(rawio).xyzzy
1564 with support.captured_output("stderr") as s:
1565 self.assertRaises(AttributeError, f)
1566 s = s.getvalue().strip()
1567 if s:
1568 # The destructor *may* have printed an unraisable error, check it
1569 self.assertEqual(len(s.splitlines()), 1)
1570 self.assert_(s.startswith("Exception IOError: "), s)
1571 self.assert_(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001572
Guido van Rossum9b76da62007-04-11 01:09:03 +00001573 # Systematic tests of the text I/O API
1574
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001575 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001576 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1577 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001578 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001579 f._CHUNK_SIZE = chunksize
1580 self.assertEquals(f.write("abc"), 3)
1581 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001582 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001583 f._CHUNK_SIZE = chunksize
1584 self.assertEquals(f.tell(), 0)
1585 self.assertEquals(f.read(), "abc")
1586 cookie = f.tell()
1587 self.assertEquals(f.seek(0), 0)
1588 self.assertEquals(f.read(2), "ab")
1589 self.assertEquals(f.read(1), "c")
1590 self.assertEquals(f.read(1), "")
1591 self.assertEquals(f.read(), "")
1592 self.assertEquals(f.tell(), cookie)
1593 self.assertEquals(f.seek(0), 0)
1594 self.assertEquals(f.seek(0, 2), cookie)
1595 self.assertEquals(f.write("def"), 3)
1596 self.assertEquals(f.seek(cookie), cookie)
1597 self.assertEquals(f.read(), "def")
1598 if enc.startswith("utf"):
1599 self.multi_line_test(f, enc)
1600 f.close()
1601
1602 def multi_line_test(self, f, enc):
1603 f.seek(0)
1604 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001605 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001606 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001607 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001608 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001609 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001610 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001611 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001612 wlines.append((f.tell(), line))
1613 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001614 f.seek(0)
1615 rlines = []
1616 while True:
1617 pos = f.tell()
1618 line = f.readline()
1619 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001620 break
1621 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001622 self.assertEquals(rlines, wlines)
1623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 def test_telling(self):
1625 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001626 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001627 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001628 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001629 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001630 p2 = f.tell()
1631 f.seek(0)
1632 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001633 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001634 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001635 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001636 self.assertEquals(f.tell(), p2)
1637 f.seek(0)
1638 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001639 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001640 self.assertRaises(IOError, f.tell)
1641 self.assertEquals(f.tell(), p2)
1642 f.close()
1643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 def test_seeking(self):
1645 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001646 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001647 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001648 prefix = bytes(u_prefix.encode("utf-8"))
1649 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001650 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001651 suffix = bytes(u_suffix.encode("utf-8"))
1652 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001654 f.write(line*2)
1655 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001656 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001657 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001658 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001659 self.assertEquals(f.tell(), prefix_size)
1660 self.assertEquals(f.readline(), u_suffix)
1661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001663 # Regression test for a specific bug
1664 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001666 f.write(data)
1667 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001668 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001669 f._CHUNK_SIZE # Just test that it exists
1670 f._CHUNK_SIZE = 2
1671 f.readline()
1672 f.tell()
1673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 def test_seek_and_tell(self):
1675 #Test seek/tell using the StatefulIncrementalDecoder.
1676 # Make test faster by doing smaller seeks
1677 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001678
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001679 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001680 """Tell/seek to various points within a data stream and ensure
1681 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001683 f.write(data)
1684 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 f = self.open(support.TESTFN, encoding='test_decoder')
1686 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001687 decoded = f.read()
1688 f.close()
1689
Neal Norwitze2b07052008-03-18 19:52:05 +00001690 for i in range(min_pos, len(decoded) + 1): # seek positions
1691 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001693 self.assertEquals(f.read(i), decoded[:i])
1694 cookie = f.tell()
1695 self.assertEquals(f.read(j), decoded[i:i + j])
1696 f.seek(cookie)
1697 self.assertEquals(f.read(), decoded[i:])
1698 f.close()
1699
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001700 # Enable the test decoder.
1701 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001702
1703 # Run the tests.
1704 try:
1705 # Try each test case.
1706 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001707 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001708
1709 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001710 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1711 offset = CHUNK_SIZE - len(input)//2
1712 prefix = b'.'*offset
1713 # Don't bother seeking into the prefix (takes too long).
1714 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001715 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001716
1717 # Ensure our test decoder won't interfere with subsequent tests.
1718 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001719 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001722 data = "1234567890"
1723 tests = ("utf-16",
1724 "utf-16-le",
1725 "utf-16-be",
1726 "utf-32",
1727 "utf-32-le",
1728 "utf-32-be")
1729 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001730 buf = self.BytesIO()
1731 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001732 # Check if the BOM is written only once (see issue1753).
1733 f.write(data)
1734 f.write(data)
1735 f.seek(0)
1736 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001737 f.seek(0)
1738 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001739 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001741 def test_read_one_by_one(self):
1742 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001743 reads = ""
1744 while True:
1745 c = txt.read(1)
1746 if not c:
1747 break
1748 reads += c
1749 self.assertEquals(reads, "AA\nBB")
1750
1751 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001752 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001753 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001754 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001755 reads = ""
1756 while True:
1757 c = txt.read(128)
1758 if not c:
1759 break
1760 reads += c
1761 self.assertEquals(reads, "A"*127+"\nB")
1762
1763 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001764 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001765
1766 # read one char at a time
1767 reads = ""
1768 while True:
1769 c = txt.read(1)
1770 if not c:
1771 break
1772 reads += c
1773 self.assertEquals(reads, self.normalized)
1774
1775 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001776 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001777 txt._CHUNK_SIZE = 4
1778
1779 reads = ""
1780 while True:
1781 c = txt.read(4)
1782 if not c:
1783 break
1784 reads += c
1785 self.assertEquals(reads, self.normalized)
1786
1787 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001788 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001789 txt._CHUNK_SIZE = 4
1790
1791 reads = txt.read(4)
1792 reads += txt.read(4)
1793 reads += txt.readline()
1794 reads += txt.readline()
1795 reads += txt.readline()
1796 self.assertEquals(reads, self.normalized)
1797
1798 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001800 txt._CHUNK_SIZE = 4
1801
1802 reads = txt.read(4)
1803 reads += txt.read()
1804 self.assertEquals(reads, self.normalized)
1805
1806 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001808 txt._CHUNK_SIZE = 4
1809
1810 reads = txt.read(4)
1811 pos = txt.tell()
1812 txt.seek(0)
1813 txt.seek(pos)
1814 self.assertEquals(txt.read(4), "BBB\n")
1815
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001816 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001817 buffer = self.BytesIO(self.testdata)
1818 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00001819
1820 self.assertEqual(buffer.seekable(), txt.seekable())
1821
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822class CTextIOWrapperTest(TextIOWrapperTest):
1823
1824 def test_initialization(self):
1825 r = self.BytesIO(b"\xc3\xa9\n\n")
1826 b = self.BufferedReader(r, 1000)
1827 t = self.TextIOWrapper(b)
1828 self.assertRaises(TypeError, t.__init__, b, newline=42)
1829 self.assertRaises(ValueError, t.read)
1830 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1831 self.assertRaises(ValueError, t.read)
1832
1833 def test_garbage_collection(self):
1834 # C TextIOWrapper objects are collected, and collecting them flushes
1835 # all data to disk.
1836 # The Python version has __del__, so it ends in gc.garbage instead.
1837 rawio = io.FileIO(support.TESTFN, "wb")
1838 b = self.BufferedWriter(rawio)
1839 t = self.TextIOWrapper(b, encoding="ascii")
1840 t.write("456def")
1841 t.x = t
1842 wr = weakref.ref(t)
1843 del t
1844 gc.collect()
1845 self.assert_(wr() is None, wr)
1846 with open(support.TESTFN, "rb") as f:
1847 self.assertEqual(f.read(), b"456def")
1848
1849class PyTextIOWrapperTest(TextIOWrapperTest):
1850 pass
1851
1852
1853class IncrementalNewlineDecoderTest(unittest.TestCase):
1854
1855 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001856 # UTF-8 specific tests for a newline decoder
1857 def _check_decode(b, s, **kwargs):
1858 # We exercise getstate() / setstate() as well as decode()
1859 state = decoder.getstate()
1860 self.assertEquals(decoder.decode(b, **kwargs), s)
1861 decoder.setstate(state)
1862 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001863
Antoine Pitrou180a3362008-12-14 16:36:46 +00001864 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001865
Antoine Pitrou180a3362008-12-14 16:36:46 +00001866 _check_decode(b'\xe8', "")
1867 _check_decode(b'\xa2', "")
1868 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001869
Antoine Pitrou180a3362008-12-14 16:36:46 +00001870 _check_decode(b'\xe8', "")
1871 _check_decode(b'\xa2', "")
1872 _check_decode(b'\x88', "\u8888")
1873
1874 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001875 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
1876
Antoine Pitrou180a3362008-12-14 16:36:46 +00001877 decoder.reset()
1878 _check_decode(b'\n', "\n")
1879 _check_decode(b'\r', "")
1880 _check_decode(b'', "\n", final=True)
1881 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001882
Antoine Pitrou180a3362008-12-14 16:36:46 +00001883 _check_decode(b'\r', "")
1884 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001885
Antoine Pitrou180a3362008-12-14 16:36:46 +00001886 _check_decode(b'\r\r\n', "\n\n")
1887 _check_decode(b'\r', "")
1888 _check_decode(b'\r', "\n")
1889 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001890
Antoine Pitrou180a3362008-12-14 16:36:46 +00001891 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
1892 _check_decode(b'\xe8\xa2\x88', "\u8888")
1893 _check_decode(b'\n', "\n")
1894 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
1895 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00001896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001897 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00001898 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001899 if encoding is not None:
1900 encoder = codecs.getincrementalencoder(encoding)()
1901 def _decode_bytewise(s):
1902 # Decode one byte at a time
1903 for b in encoder.encode(s):
1904 result.append(decoder.decode(bytes([b])))
1905 else:
1906 encoder = None
1907 def _decode_bytewise(s):
1908 # Decode one char at a time
1909 for c in s:
1910 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00001911 self.assertEquals(decoder.newlines, None)
1912 _decode_bytewise("abc\n\r")
1913 self.assertEquals(decoder.newlines, '\n')
1914 _decode_bytewise("\nabc")
1915 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1916 _decode_bytewise("abc\r")
1917 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
1918 _decode_bytewise("abc")
1919 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
1920 _decode_bytewise("abc\r")
1921 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
1922 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001923 input = "abc"
1924 if encoder is not None:
1925 encoder.reset()
1926 input = encoder.encode(input)
1927 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00001928 self.assertEquals(decoder.newlines, None)
1929
1930 def test_newline_decoder(self):
1931 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001932 # None meaning the IncrementalNewlineDecoder takes unicode input
1933 # rather than bytes input
1934 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00001935 'utf-16', 'utf-16-le', 'utf-16-be',
1936 'utf-32', 'utf-32-le', 'utf-32-be',
1937 )
1938 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001939 decoder = enc and codecs.getincrementaldecoder(enc)()
1940 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
1941 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001942 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001943 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
1944 self.check_newline_decoding_utf8(decoder)
1945
Antoine Pitrou66913e22009-03-06 23:40:56 +00001946 def test_newline_bytes(self):
1947 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
1948 def _check(dec):
1949 self.assertEquals(dec.newlines, None)
1950 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
1951 self.assertEquals(dec.newlines, None)
1952 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
1953 self.assertEquals(dec.newlines, None)
1954 dec = self.IncrementalNewlineDecoder(None, translate=False)
1955 _check(dec)
1956 dec = self.IncrementalNewlineDecoder(None, translate=True)
1957 _check(dec)
1958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001959class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
1960 pass
1961
1962class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
1963 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00001964
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00001965
Guido van Rossum01a27522007-03-07 01:00:12 +00001966# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001967
Guido van Rossum5abbf752007-08-27 17:39:33 +00001968class MiscIOTest(unittest.TestCase):
1969
Barry Warsaw40e82462008-11-20 20:14:50 +00001970 def tearDown(self):
1971 support.unlink(support.TESTFN)
1972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001973 def test___all__(self):
1974 for name in self.io.__all__:
1975 obj = getattr(self.io, name, None)
Guido van Rossum5abbf752007-08-27 17:39:33 +00001976 self.assert_(obj is not None, name)
1977 if name == "open":
1978 continue
1979 elif "error" in name.lower():
1980 self.assert_(issubclass(obj, Exception), name)
1981 else:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001982 self.assert_(issubclass(obj, self.IOBase), name)
Benjamin Peterson65676e42008-11-05 21:42:45 +00001983
Barry Warsaw40e82462008-11-20 20:14:50 +00001984 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001985 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00001986 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001987 f.close()
1988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001989 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00001990 self.assertEquals(f.name, support.TESTFN)
1991 self.assertEquals(f.buffer.name, support.TESTFN)
1992 self.assertEquals(f.buffer.raw.name, support.TESTFN)
1993 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00001994 self.assertEquals(f.buffer.mode, "rb")
1995 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00001996 f.close()
1997
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001998 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00001999 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002000 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2001 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002003 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002004 self.assertEquals(g.mode, "wb")
2005 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002006 self.assertEquals(g.name, f.fileno())
2007 self.assertEquals(g.raw.name, f.fileno())
2008 f.close()
2009 g.close()
2010
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002011 def test_io_after_close(self):
2012 for kwargs in [
2013 {"mode": "w"},
2014 {"mode": "wb"},
2015 {"mode": "w", "buffering": 1},
2016 {"mode": "w", "buffering": 2},
2017 {"mode": "wb", "buffering": 0},
2018 {"mode": "r"},
2019 {"mode": "rb"},
2020 {"mode": "r", "buffering": 1},
2021 {"mode": "r", "buffering": 2},
2022 {"mode": "rb", "buffering": 0},
2023 {"mode": "w+"},
2024 {"mode": "w+b"},
2025 {"mode": "w+", "buffering": 1},
2026 {"mode": "w+", "buffering": 2},
2027 {"mode": "w+b", "buffering": 0},
2028 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002029 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002030 f.close()
2031 self.assertRaises(ValueError, f.flush)
2032 self.assertRaises(ValueError, f.fileno)
2033 self.assertRaises(ValueError, f.isatty)
2034 self.assertRaises(ValueError, f.__iter__)
2035 if hasattr(f, "peek"):
2036 self.assertRaises(ValueError, f.peek, 1)
2037 self.assertRaises(ValueError, f.read)
2038 if hasattr(f, "read1"):
2039 self.assertRaises(ValueError, f.read1, 1024)
2040 if hasattr(f, "readinto"):
2041 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2042 self.assertRaises(ValueError, f.readline)
2043 self.assertRaises(ValueError, f.readlines)
2044 self.assertRaises(ValueError, f.seek, 0)
2045 self.assertRaises(ValueError, f.tell)
2046 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002047 self.assertRaises(ValueError, f.write,
2048 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002049 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002050 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002052 def test_blockingioerror(self):
2053 # Various BlockingIOError issues
2054 self.assertRaises(TypeError, self.BlockingIOError)
2055 self.assertRaises(TypeError, self.BlockingIOError, 1)
2056 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2057 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2058 b = self.BlockingIOError(1, "")
2059 self.assertEqual(b.characters_written, 0)
2060 class C(str):
2061 pass
2062 c = C("")
2063 b = self.BlockingIOError(1, c)
2064 c.b = b
2065 b.c = c
2066 wr = weakref.ref(c)
2067 del c, b
2068 gc.collect()
2069 self.assert_(wr() is None, wr)
2070
2071 def test_abcs(self):
2072 # Test the visible base classes are ABCs.
2073 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2074 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2075 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2076 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2077
2078 def _check_abc_inheritance(self, abcmodule):
2079 with self.open(support.TESTFN, "wb", buffering=0) as f:
2080 self.assertTrue(isinstance(f, abcmodule.IOBase))
2081 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2082 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2083 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2084 with self.open(support.TESTFN, "wb") as f:
2085 self.assertTrue(isinstance(f, abcmodule.IOBase))
2086 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2087 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2088 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2089 with self.open(support.TESTFN, "w") as f:
2090 self.assertTrue(isinstance(f, abcmodule.IOBase))
2091 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2092 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2093 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2094
2095 def test_abc_inheritance(self):
2096 # Test implementations inherit from their respective ABCs
2097 self._check_abc_inheritance(self)
2098
2099 def test_abc_inheritance_official(self):
2100 # Test implementations inherit from the official ABCs of the
2101 # baseline "io" module.
2102 self._check_abc_inheritance(io)
2103
2104class CMiscIOTest(MiscIOTest):
2105 io = io
2106
2107class PyMiscIOTest(MiscIOTest):
2108 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002109
Guido van Rossum28524c72007-02-27 05:47:44 +00002110def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002111 tests = (CIOTest, PyIOTest,
2112 CBufferedReaderTest, PyBufferedReaderTest,
2113 CBufferedWriterTest, PyBufferedWriterTest,
2114 CBufferedRWPairTest, PyBufferedRWPairTest,
2115 CBufferedRandomTest, PyBufferedRandomTest,
2116 StatefulIncrementalDecoderTest,
2117 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2118 CTextIOWrapperTest, PyTextIOWrapperTest,
2119 CMiscIOTest, PyMiscIOTest,)
2120
2121 # Put the namespaces of the IO module we are testing and some useful mock
2122 # classes in the __dict__ of each test.
2123 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2124 MockNonBlockWriterIO)
2125 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2126 c_io_ns = {name : getattr(io, name) for name in all_members}
2127 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2128 globs = globals()
2129 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2130 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2131 # Avoid turning open into a bound method.
2132 py_io_ns["open"] = pyio.OpenWrapper
2133 for test in tests:
2134 if test.__name__.startswith("C"):
2135 for name, obj in c_io_ns.items():
2136 setattr(test, name, obj)
2137 elif test.__name__.startswith("Py"):
2138 for name, obj in py_io_ns.items():
2139 setattr(test, name, obj)
2140
2141 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002142
2143if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002144 test_main()