blob: a9094d9431638ed2bc894719174497a9c5c24f43 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
18# the type it is testing as a attribute. Then it provides custom subclasses to
19# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Guido van Rossum8358db22007-08-18 21:39:55 +000022import os
Guido van Rossum34d69e52007-04-10 20:08:41 +000023import sys
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000024import time
Guido van Rossumd4103952007-04-12 05:44:49 +000025import array
Antoine Pitrou87695762008-08-14 22:44:29 +000026import threading
27import random
Guido van Rossum28524c72007-02-27 05:47:44 +000028import unittest
Benjamin Peterson59406a92009-03-26 17:10:29 +000029import warnings
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000030import weakref
31import gc
32import abc
33from itertools import chain, cycle, count
34from collections import deque
Benjamin Petersonee8712c2008-05-20 21:35:26 +000035from test import support
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000036
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000037import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000038import io # C implementation of io
39import _pyio as pyio # Python implementation of io
Guido van Rossum28524c72007-02-27 05:47:44 +000040
Guido van Rossuma9e20242007-03-08 00:43:48 +000041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000042def _default_chunk_size():
43 """Get the default TextIOWrapper chunk size"""
44 with open(__file__, "r", encoding="latin1") as f:
45 return f._CHUNK_SIZE
46
47
48class MockRawIO:
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000050 def __init__(self, read_stack=()):
51 self._read_stack = list(read_stack)
52 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000053 self._reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
55 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000056 self._reads += 1
Guido van Rossum68bbcd22007-02-27 17:19:33 +000057 try:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000058 return self._read_stack.pop(0)
Guido van Rossum68bbcd22007-02-27 17:19:33 +000059 except:
Guido van Rossum78892e42007-04-06 17:31:18 +000060 return b""
Guido van Rossum68bbcd22007-02-27 17:19:33 +000061
Guido van Rossum01a27522007-03-07 01:00:12 +000062 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000063 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000064 return len(b)
65
66 def writable(self):
67 return True
68
Guido van Rossum68bbcd22007-02-27 17:19:33 +000069 def fileno(self):
70 return 42
71
72 def readable(self):
73 return True
74
Guido van Rossum01a27522007-03-07 01:00:12 +000075 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000076 return True
77
Guido van Rossum01a27522007-03-07 01:00:12 +000078 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000079 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +000080
81 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000082 return 0 # same comment as above
83
84 def readinto(self, buf):
85 self._reads += 1
86 max_len = len(buf)
87 try:
88 data = self._read_stack[0]
89 except IndexError:
90 return 0
91 if data is None:
92 del self._read_stack[0]
93 return None
94 n = len(data)
95 if len(data) <= max_len:
96 del self._read_stack[0]
97 buf[:n] = data
98 return n
99 else:
100 buf[:] = data[:max_len]
101 self._read_stack[0] = data[max_len:]
102 return max_len
103
104 def truncate(self, pos=None):
105 return pos
106
107class CMockRawIO(MockRawIO, io.RawIOBase):
108 pass
109
110class PyMockRawIO(MockRawIO, pyio.RawIOBase):
111 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000112
Guido van Rossuma9e20242007-03-08 00:43:48 +0000113
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114class MisbehavedRawIO(MockRawIO):
115 def write(self, b):
116 return super().write(b) * 2
117
118 def read(self, n=None):
119 return super().read(n) * 2
120
121 def seek(self, pos, whence):
122 return -123
123
124 def tell(self):
125 return -456
126
127 def readinto(self, buf):
128 super().readinto(buf)
129 return len(buf) * 5
130
131class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
132 pass
133
134class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
135 pass
136
137
138class CloseFailureIO(MockRawIO):
139 closed = 0
140
141 def close(self):
142 if not self.closed:
143 self.closed = 1
144 raise IOError
145
146class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
147 pass
148
149class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
150 pass
151
152
153class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000154
155 def __init__(self, data):
156 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000158
159 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000161 self.read_history.append(None if res is None else len(res))
162 return res
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164 def readinto(self, b):
165 res = super().readinto(b)
166 self.read_history.append(res)
167 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000168
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000169class CMockFileIO(MockFileIO, io.BytesIO):
170 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000171
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000172class PyMockFileIO(MockFileIO, pyio.BytesIO):
173 pass
174
175
176class MockNonBlockWriterIO:
177
178 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000179 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000180 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000182 def pop_written(self):
183 s = b"".join(self._write_stack)
184 self._write_stack[:] = []
185 return s
186
187 def block_on(self, char):
188 """Block when a given char is encountered."""
189 self._blocker_char = char
190
191 def readable(self):
192 return True
193
194 def seekable(self):
195 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000196
Guido van Rossum01a27522007-03-07 01:00:12 +0000197 def writable(self):
198 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000200 def write(self, b):
201 b = bytes(b)
202 n = -1
203 if self._blocker_char:
204 try:
205 n = b.index(self._blocker_char)
206 except ValueError:
207 pass
208 else:
209 self._blocker_char = None
210 self._write_stack.append(b[:n])
211 raise self.BlockingIOError(0, "test blocking", n)
212 self._write_stack.append(b)
213 return len(b)
214
215class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
216 BlockingIOError = io.BlockingIOError
217
218class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
219 BlockingIOError = pyio.BlockingIOError
220
Guido van Rossuma9e20242007-03-08 00:43:48 +0000221
Guido van Rossum28524c72007-02-27 05:47:44 +0000222class IOTest(unittest.TestCase):
223
Neal Norwitze7789b12008-03-24 06:18:09 +0000224 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000225 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000226
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000227 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000228 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000229
Guido van Rossum28524c72007-02-27 05:47:44 +0000230 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000231 self.assertEqual(f.write(b"blah."), 5)
232 self.assertEqual(f.seek(0), 0)
233 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000234 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000235 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000236 self.assertEqual(f.tell(), 5)
Guido van Rossum254348e2007-11-21 19:29:53 +0000237 self.assertEqual(f.write(bytearray(b" world\n\n\n")), 9)
Guido van Rossum87429772007-04-10 21:06:59 +0000238 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000239 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000240 self.assertEqual(f.seek(-1, 2), 13)
241 self.assertEqual(f.tell(), 13)
242 self.assertEqual(f.truncate(12), 12)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000243 self.assertEqual(f.tell(), 12)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000244 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000245
Guido van Rossum9b76da62007-04-11 01:09:03 +0000246 def read_ops(self, f, buffered=False):
247 data = f.read(5)
248 self.assertEqual(data, b"hello")
Guido van Rossum254348e2007-11-21 19:29:53 +0000249 data = bytearray(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000250 self.assertEqual(f.readinto(data), 5)
251 self.assertEqual(data, b" worl")
252 self.assertEqual(f.readinto(data), 2)
253 self.assertEqual(len(data), 5)
254 self.assertEqual(data[:2], b"d\n")
255 self.assertEqual(f.seek(0), 0)
256 self.assertEqual(f.read(20), b"hello world\n")
257 self.assertEqual(f.read(1), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000258 self.assertEqual(f.readinto(bytearray(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000259 self.assertEqual(f.seek(-6, 2), 6)
260 self.assertEqual(f.read(5), b"world")
261 self.assertEqual(f.read(0), b"")
Guido van Rossum254348e2007-11-21 19:29:53 +0000262 self.assertEqual(f.readinto(bytearray()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000263 self.assertEqual(f.seek(-6, 1), 5)
264 self.assertEqual(f.read(5), b" worl")
265 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000266 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000267 if buffered:
268 f.seek(0)
269 self.assertEqual(f.read(), b"hello world\n")
270 f.seek(6)
271 self.assertEqual(f.read(), b"world\n")
272 self.assertEqual(f.read(), b"")
273
Guido van Rossum34d69e52007-04-10 20:08:41 +0000274 LARGE = 2**31
275
Guido van Rossum53807da2007-04-10 19:01:47 +0000276 def large_file_ops(self, f):
277 assert f.readable()
278 assert f.writable()
Guido van Rossum34d69e52007-04-10 20:08:41 +0000279 self.assertEqual(f.seek(self.LARGE), self.LARGE)
280 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000281 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000282 self.assertEqual(f.tell(), self.LARGE + 3)
283 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000284 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000285 self.assertEqual(f.tell(), self.LARGE + 2)
286 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000287 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Alexandre Vassalotti77250f42008-05-06 19:48:38 +0000288 self.assertEqual(f.tell(), self.LARGE + 1)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000289 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
290 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000291 self.assertEqual(f.read(2), b"x")
292
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000293 def test_invalid_operations(self):
294 # Try writing on a file opened in read mode and vice-versa.
295 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000296 with self.open(support.TESTFN, mode) as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000297 self.assertRaises(IOError, fp.read)
298 self.assertRaises(IOError, fp.readline)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000299 with self.open(support.TESTFN, "rb") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000300 self.assertRaises(IOError, fp.write, b"blah")
301 self.assertRaises(IOError, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000302 with self.open(support.TESTFN, "r") as fp:
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000303 self.assertRaises(IOError, fp.write, "blah")
304 self.assertRaises(IOError, fp.writelines, ["blah\n"])
305
Guido van Rossum28524c72007-02-27 05:47:44 +0000306 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000307 with self.open(support.TESTFN, "wb", buffering=0) as f:
308 self.assertEqual(f.readable(), False)
309 self.assertEqual(f.writable(), True)
310 self.assertEqual(f.seekable(), True)
311 self.write_ops(f)
312 with self.open(support.TESTFN, "rb", buffering=0) as f:
313 self.assertEqual(f.readable(), True)
314 self.assertEqual(f.writable(), False)
315 self.assertEqual(f.seekable(), True)
316 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000317
Guido van Rossum87429772007-04-10 21:06:59 +0000318 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000319 with self.open(support.TESTFN, "wb") as f:
320 self.assertEqual(f.readable(), False)
321 self.assertEqual(f.writable(), True)
322 self.assertEqual(f.seekable(), True)
323 self.write_ops(f)
324 with self.open(support.TESTFN, "rb") as f:
325 self.assertEqual(f.readable(), True)
326 self.assertEqual(f.writable(), False)
327 self.assertEqual(f.seekable(), True)
328 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000329
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000330 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000331 with self.open(support.TESTFN, "wb") as f:
332 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
333 with self.open(support.TESTFN, "rb") as f:
334 self.assertEqual(f.readline(), b"abc\n")
335 self.assertEqual(f.readline(10), b"def\n")
336 self.assertEqual(f.readline(2), b"xy")
337 self.assertEqual(f.readline(4), b"zzy\n")
338 self.assertEqual(f.readline(), b"foo\x00bar\n")
339 self.assertEqual(f.readline(), b"another line")
340 self.assertRaises(TypeError, f.readline, 5.3)
341 with self.open(support.TESTFN, "r") as f:
342 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000343
Guido van Rossum28524c72007-02-27 05:47:44 +0000344 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000345 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000346 self.write_ops(f)
347 data = f.getvalue()
348 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000349 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000351
Guido van Rossum53807da2007-04-10 19:01:47 +0000352 def test_large_file_ops(self):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000353 # On Windows and Mac OSX this test comsumes large resources; It takes
354 # a long time to build the >2GB file and takes >2GB of disk space
355 # therefore the resource must be enabled to run this test.
356 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000357 if not support.is_resource_enabled("largefile"):
Guido van Rossum34d69e52007-04-10 20:08:41 +0000358 print("\nTesting large file ops skipped on %s." % sys.platform,
359 file=sys.stderr)
360 print("It requires %d bytes and a long time." % self.LARGE,
361 file=sys.stderr)
362 print("Use 'regrtest.py -u largefile test_io' to run it.",
363 file=sys.stderr)
364 return
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000365 with self.open(support.TESTFN, "w+b", 0) as f:
366 self.large_file_ops(f)
367 with self.open(support.TESTFN, "w+b") as f:
368 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000369
370 def test_with_open(self):
371 for bufsize in (0, 1, 100):
372 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000373 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000374 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000375 self.assertEqual(f.closed, True)
376 f = None
377 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000378 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000379 1/0
380 except ZeroDivisionError:
381 self.assertEqual(f.closed, True)
382 else:
383 self.fail("1/0 didn't raise an exception")
384
Antoine Pitrou08838b62009-01-21 00:55:13 +0000385 # issue 5008
386 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000387 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000388 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000389 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000390 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000391 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000392 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000393 with self.open(support.TESTFN, "a") as f:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000394 self.assertTrue(f.tell() > 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000395
Guido van Rossum87429772007-04-10 21:06:59 +0000396 def test_destructor(self):
397 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000398 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000399 def __del__(self):
400 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000401 try:
402 f = super().__del__
403 except AttributeError:
404 pass
405 else:
406 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000407 def close(self):
408 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000409 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000410 def flush(self):
411 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000412 super().flush()
413 f = MyFileIO(support.TESTFN, "wb")
414 f.write(b"xxx")
415 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000416 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000417 self.assertEqual(record, [1, 2, 3])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000418 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000419 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000420
421 def _check_base_destructor(self, base):
422 record = []
423 class MyIO(base):
424 def __init__(self):
425 # This exercises the availability of attributes on object
426 # destruction.
427 # (in the C version, close() is called by the tp_dealloc
428 # function, not by __del__)
429 self.on_del = 1
430 self.on_close = 2
431 self.on_flush = 3
432 def __del__(self):
433 record.append(self.on_del)
434 try:
435 f = super().__del__
436 except AttributeError:
437 pass
438 else:
439 f()
440 def close(self):
441 record.append(self.on_close)
442 super().close()
443 def flush(self):
444 record.append(self.on_flush)
445 super().flush()
446 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000447 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000448 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000449 self.assertEqual(record, [1, 2, 3])
450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000451 def test_IOBase_destructor(self):
452 self._check_base_destructor(self.IOBase)
453
454 def test_RawIOBase_destructor(self):
455 self._check_base_destructor(self.RawIOBase)
456
457 def test_BufferedIOBase_destructor(self):
458 self._check_base_destructor(self.BufferedIOBase)
459
460 def test_TextIOBase_destructor(self):
461 self._check_base_destructor(self.TextIOBase)
462
Guido van Rossum87429772007-04-10 21:06:59 +0000463 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000464 with self.open(support.TESTFN, "wb") as f:
465 f.write(b"xxx")
466 with self.open(support.TESTFN, "rb") as f:
467 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000468
Guido van Rossumd4103952007-04-12 05:44:49 +0000469 def test_array_writes(self):
470 a = array.array('i', range(10))
Antoine Pitrouc3b39242009-01-03 16:59:18 +0000471 n = len(a.tostring())
Benjamin Peterson45cec322009-04-24 23:14:50 +0000472 with self.open(support.TESTFN, "wb", 0) as f:
473 self.assertEqual(f.write(a), n)
474 with self.open(support.TESTFN, "wb") as f:
475 self.assertEqual(f.write(a), n)
Guido van Rossumd4103952007-04-12 05:44:49 +0000476
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000477 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000478 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000479 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000480
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000481 def test_read_closed(self):
482 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000483 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000484 with self.open(support.TESTFN, "r") as f:
485 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000486 self.assertEqual(file.read(), "egg\n")
487 file.seek(0)
488 file.close()
489 self.assertRaises(ValueError, file.read)
490
491 def test_no_closefd_with_filename(self):
492 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000493 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000494
495 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000496 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000497 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000498 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000499 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000500 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000501 self.assertEqual(file.buffer.raw.closefd, False)
502
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000503 def test_garbage_collection(self):
504 # FileIO objects are collected, and collecting them flushes
505 # all data to disk.
506 f = self.FileIO(support.TESTFN, "wb")
507 f.write(b"abcxxx")
508 f.f = f
509 wr = weakref.ref(f)
510 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000511 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000512 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000513 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000514 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000515
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000516 def test_unbounded_file(self):
517 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
518 zero = "/dev/zero"
519 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000520 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000521 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000522 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000523 if support.real_max_memuse < support._2G:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000524 self.skipTest("test requires at least 2GB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000525 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000526 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000527 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000528 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000529 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000530 self.assertRaises(OverflowError, f.read)
531
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000532class CIOTest(IOTest):
533 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000534
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000535class PyIOTest(IOTest):
536 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000537
Guido van Rossuma9e20242007-03-08 00:43:48 +0000538
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000539class CommonBufferedTests:
540 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
541
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000542 def test_detach(self):
543 raw = self.MockRawIO()
544 buf = self.tp(raw)
545 self.assertIs(buf.detach(), raw)
546 self.assertRaises(ValueError, buf.detach)
547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000548 def test_fileno(self):
549 rawio = self.MockRawIO()
550 bufio = self.tp(rawio)
551
552 self.assertEquals(42, bufio.fileno())
553
554 def test_no_fileno(self):
555 # XXX will we always have fileno() function? If so, kill
556 # this test. Else, write it.
557 pass
558
559 def test_invalid_args(self):
560 rawio = self.MockRawIO()
561 bufio = self.tp(rawio)
562 # Invalid whence
563 self.assertRaises(ValueError, bufio.seek, 0, -1)
564 self.assertRaises(ValueError, bufio.seek, 0, 3)
565
566 def test_override_destructor(self):
567 tp = self.tp
568 record = []
569 class MyBufferedIO(tp):
570 def __del__(self):
571 record.append(1)
572 try:
573 f = super().__del__
574 except AttributeError:
575 pass
576 else:
577 f()
578 def close(self):
579 record.append(2)
580 super().close()
581 def flush(self):
582 record.append(3)
583 super().flush()
584 rawio = self.MockRawIO()
585 bufio = MyBufferedIO(rawio)
586 writable = bufio.writable()
587 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000588 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000589 if writable:
590 self.assertEqual(record, [1, 2, 3])
591 else:
592 self.assertEqual(record, [1, 2])
593
594 def test_context_manager(self):
595 # Test usability as a context manager
596 rawio = self.MockRawIO()
597 bufio = self.tp(rawio)
598 def _with():
599 with bufio:
600 pass
601 _with()
602 # bufio should now be closed, and using it a second time should raise
603 # a ValueError.
604 self.assertRaises(ValueError, _with)
605
606 def test_error_through_destructor(self):
607 # Test that the exception state is not modified by a destructor,
608 # even if close() fails.
609 rawio = self.CloseFailureIO()
610 def f():
611 self.tp(rawio).xyzzy
612 with support.captured_output("stderr") as s:
613 self.assertRaises(AttributeError, f)
614 s = s.getvalue().strip()
615 if s:
616 # The destructor *may* have printed an unraisable error, check it
617 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000618 self.assertTrue(s.startswith("Exception IOError: "), s)
619 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum78892e42007-04-06 17:31:18 +0000620
Antoine Pitrou716c4442009-05-23 19:04:03 +0000621 def test_repr(self):
622 raw = self.MockRawIO()
623 b = self.tp(raw)
624 clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
625 self.assertEqual(repr(b), "<%s>" % clsname)
626 raw.name = "dummy"
627 self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
628 raw.name = b"dummy"
629 self.assertEqual(repr(b), "<%s name=b'dummy'>" % clsname)
630
Guido van Rossum78892e42007-04-06 17:31:18 +0000631
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000632class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
633 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +0000634
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 def test_constructor(self):
636 rawio = self.MockRawIO([b"abc"])
637 bufio = self.tp(rawio)
638 bufio.__init__(rawio)
639 bufio.__init__(rawio, buffer_size=1024)
640 bufio.__init__(rawio, buffer_size=16)
641 self.assertEquals(b"abc", bufio.read())
642 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
643 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
644 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
645 rawio = self.MockRawIO([b"abc"])
646 bufio.__init__(rawio)
647 self.assertEquals(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +0000648
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 def test_read(self):
650 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
651 bufio = self.tp(rawio)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000652 self.assertEquals(b"abcdef", bufio.read(6))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000653 # Invalid args
654 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000655
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656 def test_read1(self):
657 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
658 bufio = self.tp(rawio)
659 self.assertEquals(b"a", bufio.read(1))
660 self.assertEquals(b"b", bufio.read1(1))
661 self.assertEquals(rawio._reads, 1)
662 self.assertEquals(b"c", bufio.read1(100))
663 self.assertEquals(rawio._reads, 1)
664 self.assertEquals(b"d", bufio.read1(100))
665 self.assertEquals(rawio._reads, 2)
666 self.assertEquals(b"efg", bufio.read1(100))
667 self.assertEquals(rawio._reads, 3)
668 self.assertEquals(b"", bufio.read1(100))
669 # Invalid args
670 self.assertRaises(ValueError, bufio.read1, -1)
671
672 def test_readinto(self):
673 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
674 bufio = self.tp(rawio)
675 b = bytearray(2)
676 self.assertEquals(bufio.readinto(b), 2)
677 self.assertEquals(b, b"ab")
678 self.assertEquals(bufio.readinto(b), 2)
679 self.assertEquals(b, b"cd")
680 self.assertEquals(bufio.readinto(b), 2)
681 self.assertEquals(b, b"ef")
682 self.assertEquals(bufio.readinto(b), 1)
683 self.assertEquals(b, b"gf")
684 self.assertEquals(bufio.readinto(b), 0)
685 self.assertEquals(b, b"gf")
686
687 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +0000688 data = b"abcdefghi"
689 dlen = len(data)
690
691 tests = [
692 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
693 [ 100, [ 3, 3, 3], [ dlen ] ],
694 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
695 ]
696
697 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000698 rawio = self.MockFileIO(data)
699 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +0000700 pos = 0
701 for nbytes in buf_read_sizes:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000702 self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +0000703 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000704 # this is mildly implementation-dependent
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000705 self.assertEquals(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +0000706
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000707 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000708 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000709 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
710 bufio = self.tp(rawio)
Guido van Rossum01a27522007-03-07 01:00:12 +0000711
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000712 self.assertEquals(b"abcd", bufio.read(6))
713 self.assertEquals(b"e", bufio.read(1))
714 self.assertEquals(b"fg", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000715 self.assertEquals(b"", bufio.peek(1))
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000716 self.assertTrue(None is bufio.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000717 self.assertEquals(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +0000718
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000719 def test_read_past_eof(self):
720 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
721 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000722
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000723 self.assertEquals(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000725 def test_read_all(self):
726 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
727 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000728
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000729 self.assertEquals(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000732 try:
733 # Write out many bytes with exactly the same number of 0's,
734 # 1's... 255's. This will help us check that concurrent reading
735 # doesn't duplicate or forget contents.
736 N = 1000
737 l = list(range(256)) * N
738 random.shuffle(l)
739 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000740 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +0000741 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000742 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000743 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000744 errors = []
745 results = []
746 def f():
747 try:
748 # Intra-buffer read then buffer-flushing read
749 for n in cycle([1, 19]):
750 s = bufio.read(n)
751 if not s:
752 break
753 # list.append() is atomic
754 results.append(s)
755 except Exception as e:
756 errors.append(e)
757 raise
758 threads = [threading.Thread(target=f) for x in range(20)]
759 for t in threads:
760 t.start()
761 time.sleep(0.02) # yield
762 for t in threads:
763 t.join()
764 self.assertFalse(errors,
765 "the following exceptions were caught: %r" % errors)
766 s = b''.join(results)
767 for i in range(256):
768 c = bytes(bytearray([i]))
769 self.assertEqual(s.count(c), N)
770 finally:
771 support.unlink(support.TESTFN)
772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000773 def test_misbehaved_io(self):
774 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
775 bufio = self.tp(rawio)
776 self.assertRaises(IOError, bufio.seek, 0)
777 self.assertRaises(IOError, bufio.tell)
778
779class CBufferedReaderTest(BufferedReaderTest):
780 tp = io.BufferedReader
781
782 def test_constructor(self):
783 BufferedReaderTest.test_constructor(self)
784 # The allocation can succeed on 32-bit builds, e.g. with more
785 # than 2GB RAM and a 64-bit kernel.
786 if sys.maxsize > 0x7FFFFFFF:
787 rawio = self.MockRawIO()
788 bufio = self.tp(rawio)
789 self.assertRaises((OverflowError, MemoryError, ValueError),
790 bufio.__init__, rawio, sys.maxsize)
791
792 def test_initialization(self):
793 rawio = self.MockRawIO([b"abc"])
794 bufio = self.tp(rawio)
795 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
796 self.assertRaises(ValueError, bufio.read)
797 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
798 self.assertRaises(ValueError, bufio.read)
799 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
800 self.assertRaises(ValueError, bufio.read)
801
802 def test_misbehaved_io_read(self):
803 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
804 bufio = self.tp(rawio)
805 # _pyio.BufferedReader seems to implement reading different, so that
806 # checking this is not so easy.
807 self.assertRaises(IOError, bufio.read, 10)
808
809 def test_garbage_collection(self):
810 # C BufferedReader objects are collected.
811 # The Python version has __del__, so it ends into gc.garbage instead
812 rawio = self.FileIO(support.TESTFN, "w+b")
813 f = self.tp(rawio)
814 f.f = f
815 wr = weakref.ref(f)
816 del f
Benjamin Peterson45cec322009-04-24 23:14:50 +0000817 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000818 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000819
820class PyBufferedReaderTest(BufferedReaderTest):
821 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +0000822
Guido van Rossuma9e20242007-03-08 00:43:48 +0000823
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000824class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
825 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +0000826
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000827 def test_constructor(self):
828 rawio = self.MockRawIO()
829 bufio = self.tp(rawio)
830 bufio.__init__(rawio)
831 bufio.__init__(rawio, buffer_size=1024)
832 bufio.__init__(rawio, buffer_size=16)
833 self.assertEquals(3, bufio.write(b"abc"))
834 bufio.flush()
835 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
836 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
837 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
838 bufio.__init__(rawio)
839 self.assertEquals(3, bufio.write(b"ghi"))
840 bufio.flush()
841 self.assertEquals(b"".join(rawio._write_stack), b"abcghi")
842
Benjamin Petersond2e0c792009-05-01 20:40:59 +0000843 def test_detach_flush(self):
844 raw = self.MockRawIO()
845 buf = self.tp(raw)
846 buf.write(b"howdy!")
847 self.assertFalse(raw._write_stack)
848 buf.detach()
849 self.assertEqual(raw._write_stack, [b"howdy!"])
850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000851 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000852 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000853 writer = self.MockRawIO()
854 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000855 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000856 self.assertFalse(writer._write_stack)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000858 def test_write_overflow(self):
859 writer = self.MockRawIO()
860 bufio = self.tp(writer, 8)
861 contents = b"abcdefghijklmnop"
862 for n in range(0, len(contents), 3):
863 bufio.write(contents[n:n+3])
864 flushed = b"".join(writer._write_stack)
865 # At least (total - 8) bytes were implicitly flushed, perhaps more
866 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000867 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000868
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000869 def check_writes(self, intermediate_func):
870 # Lots of writes, test the flushed output is as expected.
871 contents = bytes(range(256)) * 1000
872 n = 0
873 writer = self.MockRawIO()
874 bufio = self.tp(writer, 13)
875 # Generator of write sizes: repeat each N 15 times then proceed to N+1
876 def gen_sizes():
877 for size in count(1):
878 for i in range(15):
879 yield size
880 sizes = gen_sizes()
881 while n < len(contents):
882 size = min(next(sizes), len(contents) - n)
883 self.assertEquals(bufio.write(contents[n:n+size]), size)
884 intermediate_func(bufio)
885 n += size
886 bufio.flush()
887 self.assertEquals(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000888
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000889 def test_writes(self):
890 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000891
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000892 def test_writes_and_flushes(self):
893 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +0000894
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000895 def test_writes_and_seeks(self):
896 def _seekabs(bufio):
897 pos = bufio.tell()
898 bufio.seek(pos + 1, 0)
899 bufio.seek(pos - 1, 0)
900 bufio.seek(pos, 0)
901 self.check_writes(_seekabs)
902 def _seekrel(bufio):
903 pos = bufio.seek(0, 1)
904 bufio.seek(+1, 1)
905 bufio.seek(-1, 1)
906 bufio.seek(pos, 0)
907 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +0000908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000909 def test_writes_and_truncates(self):
910 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +0000911
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000912 def test_write_non_blocking(self):
913 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +0000914 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +0000915
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000916 self.assertEquals(bufio.write(b"abcd"), 4)
917 self.assertEquals(bufio.write(b"efghi"), 5)
918 # 1 byte will be written, the rest will be buffered
919 raw.block_on(b"k")
920 self.assertEquals(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +0000921
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000922 # 8 bytes will be written, 8 will be buffered and the rest will be lost
923 raw.block_on(b"0")
924 try:
925 bufio.write(b"opqrwxyz0123456789")
926 except self.BlockingIOError as e:
927 written = e.characters_written
928 else:
929 self.fail("BlockingIOError should have been raised")
930 self.assertEquals(written, 16)
931 self.assertEquals(raw.pop_written(),
932 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +0000933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000934 self.assertEquals(bufio.write(b"ABCDEFGHI"), 9)
935 s = raw.pop_written()
936 # Previously buffered bytes were flushed
937 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +0000938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000939 def test_write_and_rewind(self):
940 raw = io.BytesIO()
941 bufio = self.tp(raw, 4)
942 self.assertEqual(bufio.write(b"abcdef"), 6)
943 self.assertEqual(bufio.tell(), 6)
944 bufio.seek(0, 0)
945 self.assertEqual(bufio.write(b"XY"), 2)
946 bufio.seek(6, 0)
947 self.assertEqual(raw.getvalue(), b"XYcdef")
948 self.assertEqual(bufio.write(b"123456"), 6)
949 bufio.flush()
950 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000952 def test_flush(self):
953 writer = self.MockRawIO()
954 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000955 bufio.write(b"abc")
956 bufio.flush()
Guido van Rossum76c5d4d2007-04-06 19:10:29 +0000957 self.assertEquals(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000959 def test_destructor(self):
960 writer = self.MockRawIO()
961 bufio = self.tp(writer, 8)
962 bufio.write(b"abc")
963 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000964 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000965 self.assertEquals(b"abc", writer._write_stack[0])
966
967 def test_truncate(self):
968 # Truncate implicitly flushes the buffer.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000969 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000970 bufio = self.tp(raw, 8)
971 bufio.write(b"abcdef")
972 self.assertEqual(bufio.truncate(3), 3)
973 self.assertEqual(bufio.tell(), 3)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000974 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000975 self.assertEqual(f.read(), b"abc")
976
977 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +0000978 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000979 # Write out many bytes from many threads and test they were
980 # all flushed.
981 N = 1000
982 contents = bytes(range(256)) * N
983 sizes = cycle([1, 19])
984 n = 0
985 queue = deque()
986 while n < len(contents):
987 size = next(sizes)
988 queue.append(contents[n:n+size])
989 n += size
990 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +0000991 # We use a real file object because it allows us to
992 # exercise situations where the GIL is released before
993 # writing the buffer to the raw streams. This is in addition
994 # to concurrency issues due to switching threads in the middle
995 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000996 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000997 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +0000998 errors = []
999 def f():
1000 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001001 while True:
1002 try:
1003 s = queue.popleft()
1004 except IndexError:
1005 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001006 bufio.write(s)
1007 except Exception as e:
1008 errors.append(e)
1009 raise
1010 threads = [threading.Thread(target=f) for x in range(20)]
1011 for t in threads:
1012 t.start()
1013 time.sleep(0.02) # yield
1014 for t in threads:
1015 t.join()
1016 self.assertFalse(errors,
1017 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001018 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001019 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020 s = f.read()
1021 for i in range(256):
1022 self.assertEquals(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001023 finally:
1024 support.unlink(support.TESTFN)
1025
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001026 def test_misbehaved_io(self):
1027 rawio = self.MisbehavedRawIO()
1028 bufio = self.tp(rawio, 5)
1029 self.assertRaises(IOError, bufio.seek, 0)
1030 self.assertRaises(IOError, bufio.tell)
1031 self.assertRaises(IOError, bufio.write, b"abcdef")
1032
Benjamin Peterson59406a92009-03-26 17:10:29 +00001033 def test_max_buffer_size_deprecation(self):
1034 with support.check_warnings() as w:
1035 warnings.simplefilter("always", DeprecationWarning)
1036 self.tp(self.MockRawIO(), 8, 12)
1037 self.assertEqual(len(w.warnings), 1)
1038 warning = w.warnings[0]
1039 self.assertTrue(warning.category is DeprecationWarning)
1040 self.assertEqual(str(warning.message),
1041 "max_buffer_size is deprecated")
1042
1043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001044class CBufferedWriterTest(BufferedWriterTest):
1045 tp = io.BufferedWriter
1046
1047 def test_constructor(self):
1048 BufferedWriterTest.test_constructor(self)
1049 # The allocation can succeed on 32-bit builds, e.g. with more
1050 # than 2GB RAM and a 64-bit kernel.
1051 if sys.maxsize > 0x7FFFFFFF:
1052 rawio = self.MockRawIO()
1053 bufio = self.tp(rawio)
1054 self.assertRaises((OverflowError, MemoryError, ValueError),
1055 bufio.__init__, rawio, sys.maxsize)
1056
1057 def test_initialization(self):
1058 rawio = self.MockRawIO()
1059 bufio = self.tp(rawio)
1060 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1061 self.assertRaises(ValueError, bufio.write, b"def")
1062 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1063 self.assertRaises(ValueError, bufio.write, b"def")
1064 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1065 self.assertRaises(ValueError, bufio.write, b"def")
1066
1067 def test_garbage_collection(self):
1068 # C BufferedWriter objects are collected, and collecting them flushes
1069 # all data to disk.
1070 # The Python version has __del__, so it ends into gc.garbage instead
1071 rawio = self.FileIO(support.TESTFN, "w+b")
1072 f = self.tp(rawio)
1073 f.write(b"123xxx")
1074 f.x = f
1075 wr = weakref.ref(f)
1076 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001077 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001078 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001079 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 self.assertEqual(f.read(), b"123xxx")
1081
1082
1083class PyBufferedWriterTest(BufferedWriterTest):
1084 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001085
Guido van Rossum01a27522007-03-07 01:00:12 +00001086class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001087
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001088 def test_constructor(self):
1089 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001090 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001091
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001092 def test_detach(self):
1093 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1094 self.assertRaises(self.UnsupportedOperation, pair.detach)
1095
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001096 def test_constructor_max_buffer_size_deprecation(self):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001097 with support.check_warnings() as w:
1098 warnings.simplefilter("always", DeprecationWarning)
1099 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
1100 self.assertEqual(len(w.warnings), 1)
1101 warning = w.warnings[0]
1102 self.assertTrue(warning.category is DeprecationWarning)
1103 self.assertEqual(str(warning.message),
1104 "max_buffer_size is deprecated")
1105
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001106 def test_constructor_with_not_readable(self):
1107 class NotReadable(MockRawIO):
1108 def readable(self):
1109 return False
1110
1111 self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
1112
1113 def test_constructor_with_not_writeable(self):
1114 class NotWriteable(MockRawIO):
1115 def writable(self):
1116 return False
1117
1118 self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
1119
1120 def test_read(self):
1121 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1122
1123 self.assertEqual(pair.read(3), b"abc")
1124 self.assertEqual(pair.read(1), b"d")
1125 self.assertEqual(pair.read(), b"ef")
1126
1127 def test_read1(self):
1128 # .read1() is delegated to the underlying reader object, so this test
1129 # can be shallow.
1130 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1131
1132 self.assertEqual(pair.read1(3), b"abc")
1133
1134 def test_readinto(self):
1135 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1136
1137 data = bytearray(5)
1138 self.assertEqual(pair.readinto(data), 5)
1139 self.assertEqual(data, b"abcde")
1140
1141 def test_write(self):
1142 w = self.MockRawIO()
1143 pair = self.tp(self.MockRawIO(), w)
1144
1145 pair.write(b"abc")
1146 pair.flush()
1147 pair.write(b"def")
1148 pair.flush()
1149 self.assertEqual(w._write_stack, [b"abc", b"def"])
1150
1151 def test_peek(self):
1152 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1153
1154 self.assertTrue(pair.peek(3).startswith(b"abc"))
1155 self.assertEqual(pair.read(3), b"abc")
1156
1157 def test_readable(self):
1158 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1159 self.assertTrue(pair.readable())
1160
1161 def test_writeable(self):
1162 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1163 self.assertTrue(pair.writable())
1164
1165 def test_seekable(self):
1166 # BufferedRWPairs are never seekable, even if their readers and writers
1167 # are.
1168 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1169 self.assertFalse(pair.seekable())
1170
1171 # .flush() is delegated to the underlying writer object and has been
1172 # tested in the test_write method.
1173
1174 def test_close_and_closed(self):
1175 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1176 self.assertFalse(pair.closed)
1177 pair.close()
1178 self.assertTrue(pair.closed)
1179
1180 def test_isatty(self):
1181 class SelectableIsAtty(MockRawIO):
1182 def __init__(self, isatty):
1183 MockRawIO.__init__(self)
1184 self._isatty = isatty
1185
1186 def isatty(self):
1187 return self._isatty
1188
1189 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
1190 self.assertFalse(pair.isatty())
1191
1192 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
1193 self.assertTrue(pair.isatty())
1194
1195 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
1196 self.assertTrue(pair.isatty())
1197
1198 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
1199 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00001200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001201class CBufferedRWPairTest(BufferedRWPairTest):
1202 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001204class PyBufferedRWPairTest(BufferedRWPairTest):
1205 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00001206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001207
1208class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
1209 read_mode = "rb+"
1210 write_mode = "wb+"
1211
1212 def test_constructor(self):
1213 BufferedReaderTest.test_constructor(self)
1214 BufferedWriterTest.test_constructor(self)
1215
1216 def test_read_and_write(self):
1217 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00001218 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001219
1220 self.assertEqual(b"as", rw.read(2))
1221 rw.write(b"ddd")
1222 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001223 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001224 self.assertEqual(b"ghjk", rw.read())
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001225 self.assertEquals(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00001226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001227 def test_seek_and_tell(self):
1228 raw = self.BytesIO(b"asdfghjkl")
1229 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00001230
1231 self.assertEquals(b"as", rw.read(2))
1232 self.assertEquals(2, rw.tell())
1233 rw.seek(0, 0)
1234 self.assertEquals(b"asdf", rw.read(4))
1235
1236 rw.write(b"asdf")
1237 rw.seek(0, 0)
1238 self.assertEquals(b"asdfasdfl", rw.read())
1239 self.assertEquals(9, rw.tell())
1240 rw.seek(-4, 2)
1241 self.assertEquals(5, rw.tell())
1242 rw.seek(2, 1)
1243 self.assertEquals(7, rw.tell())
1244 self.assertEquals(b"fl", rw.read(11))
Christian Heimes8e42a0a2007-11-08 18:04:45 +00001245 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00001246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247 def check_flush_and_read(self, read_func):
1248 raw = self.BytesIO(b"abcdefghi")
1249 bufio = self.tp(raw)
1250
1251 self.assertEquals(b"ab", read_func(bufio, 2))
1252 bufio.write(b"12")
1253 self.assertEquals(b"ef", read_func(bufio, 2))
1254 self.assertEquals(6, bufio.tell())
1255 bufio.flush()
1256 self.assertEquals(6, bufio.tell())
1257 self.assertEquals(b"ghi", read_func(bufio))
1258 raw.seek(0, 0)
1259 raw.write(b"XYZ")
1260 # flush() resets the read buffer
1261 bufio.flush()
1262 bufio.seek(0, 0)
1263 self.assertEquals(b"XYZ", read_func(bufio, 3))
1264
1265 def test_flush_and_read(self):
1266 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
1267
1268 def test_flush_and_readinto(self):
1269 def _readinto(bufio, n=-1):
1270 b = bytearray(n if n >= 0 else 9999)
1271 n = bufio.readinto(b)
1272 return bytes(b[:n])
1273 self.check_flush_and_read(_readinto)
1274
1275 def test_flush_and_peek(self):
1276 def _peek(bufio, n=-1):
1277 # This relies on the fact that the buffer can contain the whole
1278 # raw stream, otherwise peek() can return less.
1279 b = bufio.peek(n)
1280 if n != -1:
1281 b = b[:n]
1282 bufio.seek(len(b), 1)
1283 return b
1284 self.check_flush_and_read(_peek)
1285
1286 def test_flush_and_write(self):
1287 raw = self.BytesIO(b"abcdefghi")
1288 bufio = self.tp(raw)
1289
1290 bufio.write(b"123")
1291 bufio.flush()
1292 bufio.write(b"45")
1293 bufio.flush()
1294 bufio.seek(0, 0)
1295 self.assertEquals(b"12345fghi", raw.getvalue())
1296 self.assertEquals(b"12345fghi", bufio.read())
1297
1298 def test_threads(self):
1299 BufferedReaderTest.test_threads(self)
1300 BufferedWriterTest.test_threads(self)
1301
1302 def test_writes_and_peek(self):
1303 def _peek(bufio):
1304 bufio.peek(1)
1305 self.check_writes(_peek)
1306 def _peek(bufio):
1307 pos = bufio.tell()
1308 bufio.seek(-1, 1)
1309 bufio.peek(1)
1310 bufio.seek(pos, 0)
1311 self.check_writes(_peek)
1312
1313 def test_writes_and_reads(self):
1314 def _read(bufio):
1315 bufio.seek(-1, 1)
1316 bufio.read(1)
1317 self.check_writes(_read)
1318
1319 def test_writes_and_read1s(self):
1320 def _read1(bufio):
1321 bufio.seek(-1, 1)
1322 bufio.read1(1)
1323 self.check_writes(_read1)
1324
1325 def test_writes_and_readintos(self):
1326 def _read(bufio):
1327 bufio.seek(-1, 1)
1328 bufio.readinto(bytearray(1))
1329 self.check_writes(_read)
1330
Antoine Pitroua0ceb732009-08-06 20:29:56 +00001331 def test_write_after_readahead(self):
1332 # Issue #6629: writing after the buffer was filled by readahead should
1333 # first rewind the raw stream.
1334 for overwrite_size in [1, 5]:
1335 raw = self.BytesIO(b"A" * 10)
1336 bufio = self.tp(raw, 4)
1337 # Trigger readahead
1338 self.assertEqual(bufio.read(1), b"A")
1339 self.assertEqual(bufio.tell(), 1)
1340 # Overwriting should rewind the raw stream if it needs so
1341 bufio.write(b"B" * overwrite_size)
1342 self.assertEqual(bufio.tell(), overwrite_size + 1)
1343 # If the write size was smaller than the buffer size, flush() and
1344 # check that rewind happens.
1345 bufio.flush()
1346 self.assertEqual(bufio.tell(), overwrite_size + 1)
1347 s = raw.getvalue()
1348 self.assertEqual(s,
1349 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
1350
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001351 def test_misbehaved_io(self):
1352 BufferedReaderTest.test_misbehaved_io(self)
1353 BufferedWriterTest.test_misbehaved_io(self)
1354
1355class CBufferedRandomTest(BufferedRandomTest):
1356 tp = io.BufferedRandom
1357
1358 def test_constructor(self):
1359 BufferedRandomTest.test_constructor(self)
1360 # The allocation can succeed on 32-bit builds, e.g. with more
1361 # than 2GB RAM and a 64-bit kernel.
1362 if sys.maxsize > 0x7FFFFFFF:
1363 rawio = self.MockRawIO()
1364 bufio = self.tp(rawio)
1365 self.assertRaises((OverflowError, MemoryError, ValueError),
1366 bufio.__init__, rawio, sys.maxsize)
1367
1368 def test_garbage_collection(self):
1369 CBufferedReaderTest.test_garbage_collection(self)
1370 CBufferedWriterTest.test_garbage_collection(self)
1371
1372class PyBufferedRandomTest(BufferedRandomTest):
1373 tp = pyio.BufferedRandom
1374
1375
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001376# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
1377# properties:
1378# - A single output character can correspond to many bytes of input.
1379# - The number of input bytes to complete the character can be
1380# undetermined until the last input byte is received.
1381# - The number of input bytes can vary depending on previous input.
1382# - A single input byte can correspond to many characters of output.
1383# - The number of output characters can be undetermined until the
1384# last input byte is received.
1385# - The number of output characters can vary depending on previous input.
1386
1387class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
1388 """
1389 For testing seek/tell behavior with a stateful, buffering decoder.
1390
1391 Input is a sequence of words. Words may be fixed-length (length set
1392 by input) or variable-length (period-terminated). In variable-length
1393 mode, extra periods are ignored. Possible words are:
1394 - 'i' followed by a number sets the input length, I (maximum 99).
1395 When I is set to 0, words are space-terminated.
1396 - 'o' followed by a number sets the output length, O (maximum 99).
1397 - Any other word is converted into a word followed by a period on
1398 the output. The output word consists of the input word truncated
1399 or padded out with hyphens to make its length equal to O. If O
1400 is 0, the word is output verbatim without truncating or padding.
1401 I and O are initially set to 1. When I changes, any buffered input is
1402 re-scanned according to the new I. EOF also terminates the last word.
1403 """
1404
1405 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00001406 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001407 self.reset()
1408
1409 def __repr__(self):
1410 return '<SID %x>' % id(self)
1411
1412 def reset(self):
1413 self.i = 1
1414 self.o = 1
1415 self.buffer = bytearray()
1416
1417 def getstate(self):
1418 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
1419 return bytes(self.buffer), i*100 + o
1420
1421 def setstate(self, state):
1422 buffer, io = state
1423 self.buffer = bytearray(buffer)
1424 i, o = divmod(io, 100)
1425 self.i, self.o = i ^ 1, o ^ 1
1426
1427 def decode(self, input, final=False):
1428 output = ''
1429 for b in input:
1430 if self.i == 0: # variable-length, terminated with period
1431 if b == ord('.'):
1432 if self.buffer:
1433 output += self.process_word()
1434 else:
1435 self.buffer.append(b)
1436 else: # fixed-length, terminate after self.i bytes
1437 self.buffer.append(b)
1438 if len(self.buffer) == self.i:
1439 output += self.process_word()
1440 if final and self.buffer: # EOF terminates the last word
1441 output += self.process_word()
1442 return output
1443
1444 def process_word(self):
1445 output = ''
1446 if self.buffer[0] == ord('i'):
1447 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
1448 elif self.buffer[0] == ord('o'):
1449 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
1450 else:
1451 output = self.buffer.decode('ascii')
1452 if len(output) < self.o:
1453 output += '-'*self.o # pad out with hyphens
1454 if self.o:
1455 output = output[:self.o] # truncate to output length
1456 output += '.'
1457 self.buffer = bytearray()
1458 return output
1459
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001460 codecEnabled = False
1461
1462 @classmethod
1463 def lookupTestDecoder(cls, name):
1464 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00001465 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001466 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00001467 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001468 incrementalencoder=None,
1469 streamreader=None, streamwriter=None,
1470 incrementaldecoder=cls)
1471
1472# Register the previous decoder for testing.
1473# Disabled by default, tests will enable it.
1474codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
1475
1476
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001477class StatefulIncrementalDecoderTest(unittest.TestCase):
1478 """
1479 Make sure the StatefulIncrementalDecoder actually works.
1480 """
1481
1482 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001483 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001484 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001485 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001486 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001487 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001488 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001489 # I=0, O=6 (variable-length input, fixed-length output)
1490 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
1491 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001492 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00001493 # I=6, O=3 (fixed-length input > fixed-length output)
1494 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
1495 # I=0, then 3; O=29, then 15 (with longer output)
1496 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
1497 'a----------------------------.' +
1498 'b----------------------------.' +
1499 'cde--------------------------.' +
1500 'abcdefghijabcde.' +
1501 'a.b------------.' +
1502 '.c.------------.' +
1503 'd.e------------.' +
1504 'k--------------.' +
1505 'l--------------.' +
1506 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001507 ]
1508
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001509 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001510 # Try a few one-shot test cases.
1511 for input, eof, output in self.test_cases:
1512 d = StatefulIncrementalDecoder()
1513 self.assertEquals(d.decode(input, eof), output)
1514
1515 # Also test an unfinished decode, followed by forcing EOF.
1516 d = StatefulIncrementalDecoder()
1517 self.assertEquals(d.decode(b'oiabcd'), '')
1518 self.assertEquals(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00001519
1520class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001521
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001522 def setUp(self):
1523 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
1524 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001525 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001526
Guido van Rossumd0712812007-04-11 16:32:43 +00001527 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00001528 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001529
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_constructor(self):
1531 r = self.BytesIO(b"\xc3\xa9\n\n")
1532 b = self.BufferedReader(r, 1000)
1533 t = self.TextIOWrapper(b)
1534 t.__init__(b, encoding="latin1", newline="\r\n")
1535 self.assertEquals(t.encoding, "latin1")
1536 self.assertEquals(t.line_buffering, False)
1537 t.__init__(b, encoding="utf8", line_buffering=True)
1538 self.assertEquals(t.encoding, "utf8")
1539 self.assertEquals(t.line_buffering, True)
1540 self.assertEquals("\xe9\n", t.readline())
1541 self.assertRaises(TypeError, t.__init__, b, newline=42)
1542 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
1543
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001544 def test_detach(self):
1545 r = self.BytesIO()
1546 b = self.BufferedWriter(r)
1547 t = self.TextIOWrapper(b)
1548 self.assertIs(t.detach(), b)
1549
1550 t = self.TextIOWrapper(b, encoding="ascii")
1551 t.write("howdy")
1552 self.assertFalse(r.getvalue())
1553 t.detach()
1554 self.assertEqual(r.getvalue(), b"howdy")
1555 self.assertRaises(ValueError, t.detach)
1556
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001557 def test_repr(self):
1558 raw = self.BytesIO("hello".encode("utf-8"))
1559 b = self.BufferedReader(raw)
1560 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00001561 modname = self.TextIOWrapper.__module__
1562 self.assertEqual(repr(t),
1563 "<%s.TextIOWrapper encoding='utf-8'>" % modname)
1564 raw.name = "dummy"
1565 self.assertEqual(repr(t),
1566 "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
1567 raw.name = b"dummy"
1568 self.assertEqual(repr(t),
1569 "<%s.TextIOWrapper name=b'dummy' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00001570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571 def test_line_buffering(self):
1572 r = self.BytesIO()
1573 b = self.BufferedWriter(r, 1000)
1574 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001575 t.write("X")
1576 self.assertEquals(r.getvalue(), b"") # No flush happened
1577 t.write("Y\nZ")
1578 self.assertEquals(r.getvalue(), b"XY\nZ") # All got flushed
1579 t.write("A\rB")
1580 self.assertEquals(r.getvalue(), b"XY\nZA\rB")
1581
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001582 def test_encoding(self):
1583 # Check the encoding attribute is always set, and valid
1584 b = self.BytesIO()
1585 t = self.TextIOWrapper(b, encoding="utf8")
1586 self.assertEqual(t.encoding, "utf8")
1587 t = self.TextIOWrapper(b)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001588 self.assertTrue(t.encoding is not None)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001589 codecs.lookup(t.encoding)
1590
1591 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001592 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001593 b = self.BytesIO(b"abc\n\xff\n")
1594 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001595 self.assertRaises(UnicodeError, t.read)
1596 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001597 b = self.BytesIO(b"abc\n\xff\n")
1598 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001599 self.assertRaises(UnicodeError, t.read)
1600 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001601 b = self.BytesIO(b"abc\n\xff\n")
1602 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001603 self.assertEquals(t.read(), "abc\n\n")
1604 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001605 b = self.BytesIO(b"abc\n\xff\n")
1606 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001607 self.assertEquals(t.read(), "abc\n\ufffd\n")
1608
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001609 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001610 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001611 b = self.BytesIO()
1612 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001613 self.assertRaises(UnicodeError, t.write, "\xff")
1614 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001615 b = self.BytesIO()
1616 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001617 self.assertRaises(UnicodeError, t.write, "\xff")
1618 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 b = self.BytesIO()
1620 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001621 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001622 t.write("abc\xffdef\n")
1623 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001624 self.assertEquals(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001625 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001626 b = self.BytesIO()
1627 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00001628 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001629 t.write("abc\xffdef\n")
1630 t.flush()
Christian Heimesecda2612007-12-05 17:59:44 +00001631 self.assertEquals(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00001632
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001634 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
1635
1636 tests = [
1637 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00001638 [ '', input_lines ],
1639 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
1640 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
1641 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00001642 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00001643 encodings = (
1644 'utf-8', 'latin-1',
1645 'utf-16', 'utf-16-le', 'utf-16-be',
1646 'utf-32', 'utf-32-le', 'utf-32-be',
1647 )
Guido van Rossum78892e42007-04-06 17:31:18 +00001648
Guido van Rossum8358db22007-08-18 21:39:55 +00001649 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00001650 # character in TextIOWrapper._pending_line.
1651 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00001652 # XXX: str.encode() should return bytes
1653 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00001654 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00001655 for bufsize in range(1, 10):
1656 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001657 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
1658 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00001659 encoding=encoding)
1660 if do_reads:
1661 got_lines = []
1662 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001663 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00001664 if c2 == '':
1665 break
1666 self.assertEquals(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001667 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00001668 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001669 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00001670
1671 for got_line, exp_line in zip(got_lines, exp_lines):
1672 self.assertEquals(got_line, exp_line)
1673 self.assertEquals(len(got_lines), len(exp_lines))
1674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001675 def test_newlines_input(self):
1676 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00001677 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
1678 for newline, expected in [
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001679 (None, normalized.decode("ascii").splitlines(True)),
1680 ("", testdata.decode("ascii").splitlines(True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1682 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
1683 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00001684 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 buf = self.BytesIO(testdata)
1686 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Guido van Rossum8358db22007-08-18 21:39:55 +00001687 self.assertEquals(txt.readlines(), expected)
1688 txt.seek(0)
1689 self.assertEquals(txt.read(), "".join(expected))
1690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 def test_newlines_output(self):
1692 testdict = {
1693 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1694 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
1695 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
1696 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
1697 }
1698 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
1699 for newline, expected in tests:
1700 buf = self.BytesIO()
1701 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
1702 txt.write("AAA\nB")
1703 txt.write("BB\nCCC\n")
1704 txt.write("X\rY\r\nZ")
1705 txt.flush()
1706 self.assertEquals(buf.closed, False)
1707 self.assertEquals(buf.getvalue(), expected)
1708
1709 def test_destructor(self):
1710 l = []
1711 base = self.BytesIO
1712 class MyBytesIO(base):
1713 def close(self):
1714 l.append(self.getvalue())
1715 base.close(self)
1716 b = MyBytesIO()
1717 t = self.TextIOWrapper(b, encoding="ascii")
1718 t.write("abc")
1719 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001720 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721 self.assertEquals([b"abc"], l)
1722
1723 def test_override_destructor(self):
1724 record = []
1725 class MyTextIO(self.TextIOWrapper):
1726 def __del__(self):
1727 record.append(1)
1728 try:
1729 f = super().__del__
1730 except AttributeError:
1731 pass
1732 else:
1733 f()
1734 def close(self):
1735 record.append(2)
1736 super().close()
1737 def flush(self):
1738 record.append(3)
1739 super().flush()
1740 b = self.BytesIO()
1741 t = MyTextIO(b, encoding="ascii")
1742 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001743 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 self.assertEqual(record, [1, 2, 3])
1745
1746 def test_error_through_destructor(self):
1747 # Test that the exception state is not modified by a destructor,
1748 # even if close() fails.
1749 rawio = self.CloseFailureIO()
1750 def f():
1751 self.TextIOWrapper(rawio).xyzzy
1752 with support.captured_output("stderr") as s:
1753 self.assertRaises(AttributeError, f)
1754 s = s.getvalue().strip()
1755 if s:
1756 # The destructor *may* have printed an unraisable error, check it
1757 self.assertEqual(len(s.splitlines()), 1)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001758 self.assertTrue(s.startswith("Exception IOError: "), s)
1759 self.assertTrue(s.endswith(" ignored"), s)
Guido van Rossum8358db22007-08-18 21:39:55 +00001760
Guido van Rossum9b76da62007-04-11 01:09:03 +00001761 # Systematic tests of the text I/O API
1762
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001763 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001764 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
1765 for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001766 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001767 f._CHUNK_SIZE = chunksize
1768 self.assertEquals(f.write("abc"), 3)
1769 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001771 f._CHUNK_SIZE = chunksize
1772 self.assertEquals(f.tell(), 0)
1773 self.assertEquals(f.read(), "abc")
1774 cookie = f.tell()
1775 self.assertEquals(f.seek(0), 0)
1776 self.assertEquals(f.read(2), "ab")
1777 self.assertEquals(f.read(1), "c")
1778 self.assertEquals(f.read(1), "")
1779 self.assertEquals(f.read(), "")
1780 self.assertEquals(f.tell(), cookie)
1781 self.assertEquals(f.seek(0), 0)
1782 self.assertEquals(f.seek(0, 2), cookie)
1783 self.assertEquals(f.write("def"), 3)
1784 self.assertEquals(f.seek(cookie), cookie)
1785 self.assertEquals(f.read(), "def")
1786 if enc.startswith("utf"):
1787 self.multi_line_test(f, enc)
1788 f.close()
1789
1790 def multi_line_test(self, f, enc):
1791 f.seek(0)
1792 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001793 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001794 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00001795 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001796 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00001797 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00001798 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001799 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00001800 wlines.append((f.tell(), line))
1801 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00001802 f.seek(0)
1803 rlines = []
1804 while True:
1805 pos = f.tell()
1806 line = f.readline()
1807 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00001808 break
1809 rlines.append((pos, line))
Guido van Rossum9b76da62007-04-11 01:09:03 +00001810 self.assertEquals(rlines, wlines)
1811
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001812 def test_telling(self):
1813 f = self.open(support.TESTFN, "w+", encoding="utf8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001814 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001815 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001816 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001817 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001818 p2 = f.tell()
1819 f.seek(0)
1820 self.assertEquals(f.tell(), p0)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001821 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001822 self.assertEquals(f.tell(), p1)
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001823 self.assertEquals(f.readline(), "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001824 self.assertEquals(f.tell(), p2)
1825 f.seek(0)
1826 for line in f:
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001827 self.assertEquals(line, "\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00001828 self.assertRaises(IOError, f.tell)
1829 self.assertEquals(f.tell(), p2)
1830 f.close()
1831
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001832 def test_seeking(self):
1833 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00001834 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001835 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00001836 prefix = bytes(u_prefix.encode("utf-8"))
1837 self.assertEquals(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00001838 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00001839 suffix = bytes(u_suffix.encode("utf-8"))
1840 line = prefix + suffix
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001841 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001842 f.write(line*2)
1843 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001844 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001845 s = f.read(prefix_size)
Guido van Rossum98297ee2007-11-06 21:34:58 +00001846 self.assertEquals(s, str(prefix, "ascii"))
Guido van Rossumd76e7792007-04-17 02:38:04 +00001847 self.assertEquals(f.tell(), prefix_size)
1848 self.assertEquals(f.readline(), u_suffix)
1849
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001850 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00001851 # Regression test for a specific bug
1852 data = b'\xe0\xbf\xbf\n'
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001853 f = self.open(support.TESTFN, "wb")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001854 f.write(data)
1855 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001856 f = self.open(support.TESTFN, "r", encoding="utf-8")
Guido van Rossumd76e7792007-04-17 02:38:04 +00001857 f._CHUNK_SIZE # Just test that it exists
1858 f._CHUNK_SIZE = 2
1859 f.readline()
1860 f.tell()
1861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001862 def test_seek_and_tell(self):
1863 #Test seek/tell using the StatefulIncrementalDecoder.
1864 # Make test faster by doing smaller seeks
1865 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001866
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001867 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001868 """Tell/seek to various points within a data stream and ensure
1869 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001870 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001871 f.write(data)
1872 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001873 f = self.open(support.TESTFN, encoding='test_decoder')
1874 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001875 decoded = f.read()
1876 f.close()
1877
Neal Norwitze2b07052008-03-18 19:52:05 +00001878 for i in range(min_pos, len(decoded) + 1): # seek positions
1879 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880 f = self.open(support.TESTFN, encoding='test_decoder')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001881 self.assertEquals(f.read(i), decoded[:i])
1882 cookie = f.tell()
1883 self.assertEquals(f.read(j), decoded[i:i + j])
1884 f.seek(cookie)
1885 self.assertEquals(f.read(), decoded[i:])
1886 f.close()
1887
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001888 # Enable the test decoder.
1889 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001890
1891 # Run the tests.
1892 try:
1893 # Try each test case.
1894 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001895 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001896
1897 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001898 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
1899 offset = CHUNK_SIZE - len(input)//2
1900 prefix = b'.'*offset
1901 # Don't bother seeking into the prefix (takes too long).
1902 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00001903 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001904
1905 # Ensure our test decoder won't interfere with subsequent tests.
1906 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00001907 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00001908
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001909 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001910 data = "1234567890"
1911 tests = ("utf-16",
1912 "utf-16-le",
1913 "utf-16-be",
1914 "utf-32",
1915 "utf-32-le",
1916 "utf-32-be")
1917 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001918 buf = self.BytesIO()
1919 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001920 # Check if the BOM is written only once (see issue1753).
1921 f.write(data)
1922 f.write(data)
1923 f.seek(0)
1924 self.assertEquals(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00001925 f.seek(0)
1926 self.assertEquals(f.read(), data * 2)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00001927 self.assertEquals(buf.getvalue(), (data * 2).encode(encoding))
1928
Benjamin Petersona1b49012009-03-31 23:11:32 +00001929 def test_unreadable(self):
1930 class UnReadable(self.BytesIO):
1931 def readable(self):
1932 return False
1933 txt = self.TextIOWrapper(UnReadable())
1934 self.assertRaises(IOError, txt.read)
1935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001936 def test_read_one_by_one(self):
1937 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001938 reads = ""
1939 while True:
1940 c = txt.read(1)
1941 if not c:
1942 break
1943 reads += c
1944 self.assertEquals(reads, "AA\nBB")
1945
1946 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001947 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001948 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001949 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001950 reads = ""
1951 while True:
1952 c = txt.read(128)
1953 if not c:
1954 break
1955 reads += c
1956 self.assertEquals(reads, "A"*127+"\nB")
1957
1958 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001959 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001960
1961 # read one char at a time
1962 reads = ""
1963 while True:
1964 c = txt.read(1)
1965 if not c:
1966 break
1967 reads += c
1968 self.assertEquals(reads, self.normalized)
1969
1970 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001971 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001972 txt._CHUNK_SIZE = 4
1973
1974 reads = ""
1975 while True:
1976 c = txt.read(4)
1977 if not c:
1978 break
1979 reads += c
1980 self.assertEquals(reads, self.normalized)
1981
1982 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001983 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001984 txt._CHUNK_SIZE = 4
1985
1986 reads = txt.read(4)
1987 reads += txt.read(4)
1988 reads += txt.readline()
1989 reads += txt.readline()
1990 reads += txt.readline()
1991 self.assertEquals(reads, self.normalized)
1992
1993 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001994 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00001995 txt._CHUNK_SIZE = 4
1996
1997 reads = txt.read(4)
1998 reads += txt.read()
1999 self.assertEquals(reads, self.normalized)
2000
2001 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002002 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002003 txt._CHUNK_SIZE = 4
2004
2005 reads = txt.read(4)
2006 pos = txt.tell()
2007 txt.seek(0)
2008 txt.seek(pos)
2009 self.assertEquals(txt.read(4), "BBB\n")
2010
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002011 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002012 buffer = self.BytesIO(self.testdata)
2013 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00002014
2015 self.assertEqual(buffer.seekable(), txt.seekable())
2016
Antoine Pitroue4501852009-05-14 18:55:55 +00002017 def test_append_bom(self):
2018 # The BOM is not written again when appending to a non-empty file
2019 filename = support.TESTFN
2020 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2021 with self.open(filename, 'w', encoding=charset) as f:
2022 f.write('aaa')
2023 pos = f.tell()
2024 with self.open(filename, 'rb') as f:
2025 self.assertEquals(f.read(), 'aaa'.encode(charset))
2026
2027 with self.open(filename, 'a', encoding=charset) as f:
2028 f.write('xxx')
2029 with self.open(filename, 'rb') as f:
2030 self.assertEquals(f.read(), 'aaaxxx'.encode(charset))
2031
2032 def test_seek_bom(self):
2033 # Same test, but when seeking manually
2034 filename = support.TESTFN
2035 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
2036 with self.open(filename, 'w', encoding=charset) as f:
2037 f.write('aaa')
2038 pos = f.tell()
2039 with self.open(filename, 'r+', encoding=charset) as f:
2040 f.seek(pos)
2041 f.write('zzz')
2042 f.seek(0)
2043 f.write('bbb')
2044 with self.open(filename, 'rb') as f:
2045 self.assertEquals(f.read(), 'bbbzzz'.encode(charset))
2046
Benjamin Peterson0926ad12009-06-06 18:02:12 +00002047 def test_errors_property(self):
2048 with self.open(support.TESTFN, "w") as f:
2049 self.assertEqual(f.errors, "strict")
2050 with self.open(support.TESTFN, "w", errors="replace") as f:
2051 self.assertEqual(f.errors, "replace")
2052
Antoine Pitroue4501852009-05-14 18:55:55 +00002053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002054class CTextIOWrapperTest(TextIOWrapperTest):
2055
2056 def test_initialization(self):
2057 r = self.BytesIO(b"\xc3\xa9\n\n")
2058 b = self.BufferedReader(r, 1000)
2059 t = self.TextIOWrapper(b)
2060 self.assertRaises(TypeError, t.__init__, b, newline=42)
2061 self.assertRaises(ValueError, t.read)
2062 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2063 self.assertRaises(ValueError, t.read)
2064
2065 def test_garbage_collection(self):
2066 # C TextIOWrapper objects are collected, and collecting them flushes
2067 # all data to disk.
2068 # The Python version has __del__, so it ends in gc.garbage instead.
2069 rawio = io.FileIO(support.TESTFN, "wb")
2070 b = self.BufferedWriter(rawio)
2071 t = self.TextIOWrapper(b, encoding="ascii")
2072 t.write("456def")
2073 t.x = t
2074 wr = weakref.ref(t)
2075 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002076 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002077 self.assertTrue(wr() is None, wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00002078 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002079 self.assertEqual(f.read(), b"456def")
2080
2081class PyTextIOWrapperTest(TextIOWrapperTest):
2082 pass
2083
2084
2085class IncrementalNewlineDecoderTest(unittest.TestCase):
2086
2087 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002088 # UTF-8 specific tests for a newline decoder
2089 def _check_decode(b, s, **kwargs):
2090 # We exercise getstate() / setstate() as well as decode()
2091 state = decoder.getstate()
2092 self.assertEquals(decoder.decode(b, **kwargs), s)
2093 decoder.setstate(state)
2094 self.assertEquals(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002095
Antoine Pitrou180a3362008-12-14 16:36:46 +00002096 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002097
Antoine Pitrou180a3362008-12-14 16:36:46 +00002098 _check_decode(b'\xe8', "")
2099 _check_decode(b'\xa2', "")
2100 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002101
Antoine Pitrou180a3362008-12-14 16:36:46 +00002102 _check_decode(b'\xe8', "")
2103 _check_decode(b'\xa2', "")
2104 _check_decode(b'\x88', "\u8888")
2105
2106 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002107 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
2108
Antoine Pitrou180a3362008-12-14 16:36:46 +00002109 decoder.reset()
2110 _check_decode(b'\n', "\n")
2111 _check_decode(b'\r', "")
2112 _check_decode(b'', "\n", final=True)
2113 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002114
Antoine Pitrou180a3362008-12-14 16:36:46 +00002115 _check_decode(b'\r', "")
2116 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002117
Antoine Pitrou180a3362008-12-14 16:36:46 +00002118 _check_decode(b'\r\r\n', "\n\n")
2119 _check_decode(b'\r', "")
2120 _check_decode(b'\r', "\n")
2121 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002122
Antoine Pitrou180a3362008-12-14 16:36:46 +00002123 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
2124 _check_decode(b'\xe8\xa2\x88', "\u8888")
2125 _check_decode(b'\n', "\n")
2126 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
2127 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002129 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00002130 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002131 if encoding is not None:
2132 encoder = codecs.getincrementalencoder(encoding)()
2133 def _decode_bytewise(s):
2134 # Decode one byte at a time
2135 for b in encoder.encode(s):
2136 result.append(decoder.decode(bytes([b])))
2137 else:
2138 encoder = None
2139 def _decode_bytewise(s):
2140 # Decode one char at a time
2141 for c in s:
2142 result.append(decoder.decode(c))
Antoine Pitrou180a3362008-12-14 16:36:46 +00002143 self.assertEquals(decoder.newlines, None)
2144 _decode_bytewise("abc\n\r")
2145 self.assertEquals(decoder.newlines, '\n')
2146 _decode_bytewise("\nabc")
2147 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2148 _decode_bytewise("abc\r")
2149 self.assertEquals(decoder.newlines, ('\n', '\r\n'))
2150 _decode_bytewise("abc")
2151 self.assertEquals(decoder.newlines, ('\r', '\n', '\r\n'))
2152 _decode_bytewise("abc\r")
2153 self.assertEquals("".join(result), "abc\n\nabcabc\nabcabc")
2154 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002155 input = "abc"
2156 if encoder is not None:
2157 encoder.reset()
2158 input = encoder.encode(input)
2159 self.assertEquals(decoder.decode(input), "abc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00002160 self.assertEquals(decoder.newlines, None)
2161
2162 def test_newline_decoder(self):
2163 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164 # None meaning the IncrementalNewlineDecoder takes unicode input
2165 # rather than bytes input
2166 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00002167 'utf-16', 'utf-16-le', 'utf-16-be',
2168 'utf-32', 'utf-32-le', 'utf-32-be',
2169 )
2170 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002171 decoder = enc and codecs.getincrementaldecoder(enc)()
2172 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2173 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002174 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002175 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
2176 self.check_newline_decoding_utf8(decoder)
2177
Antoine Pitrou66913e22009-03-06 23:40:56 +00002178 def test_newline_bytes(self):
2179 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
2180 def _check(dec):
2181 self.assertEquals(dec.newlines, None)
2182 self.assertEquals(dec.decode("\u0D00"), "\u0D00")
2183 self.assertEquals(dec.newlines, None)
2184 self.assertEquals(dec.decode("\u0A00"), "\u0A00")
2185 self.assertEquals(dec.newlines, None)
2186 dec = self.IncrementalNewlineDecoder(None, translate=False)
2187 _check(dec)
2188 dec = self.IncrementalNewlineDecoder(None, translate=True)
2189 _check(dec)
2190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002191class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2192 pass
2193
2194class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
2195 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00002196
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00002197
Guido van Rossum01a27522007-03-07 01:00:12 +00002198# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00002199
Guido van Rossum5abbf752007-08-27 17:39:33 +00002200class MiscIOTest(unittest.TestCase):
2201
Barry Warsaw40e82462008-11-20 20:14:50 +00002202 def tearDown(self):
2203 support.unlink(support.TESTFN)
2204
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 def test___all__(self):
2206 for name in self.io.__all__:
2207 obj = getattr(self.io, name, None)
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002208 self.assertTrue(obj is not None, name)
Guido van Rossum5abbf752007-08-27 17:39:33 +00002209 if name == "open":
2210 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00002211 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00002212 self.assertTrue(issubclass(obj, Exception), name)
2213 elif not name.startswith("SEEK_"):
2214 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00002215
Barry Warsaw40e82462008-11-20 20:14:50 +00002216 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002217 f = self.open(support.TESTFN, "wb", buffering=0)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002218 self.assertEquals(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002219 f.close()
2220
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002221 f = self.open(support.TESTFN, "U")
Barry Warsaw40e82462008-11-20 20:14:50 +00002222 self.assertEquals(f.name, support.TESTFN)
2223 self.assertEquals(f.buffer.name, support.TESTFN)
2224 self.assertEquals(f.buffer.raw.name, support.TESTFN)
2225 self.assertEquals(f.mode, "U")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002226 self.assertEquals(f.buffer.mode, "rb")
2227 self.assertEquals(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002228 f.close()
2229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002230 f = self.open(support.TESTFN, "w+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002231 self.assertEquals(f.mode, "w+")
Benjamin Peterson44309e62008-11-22 00:41:45 +00002232 self.assertEquals(f.buffer.mode, "rb+") # Does it really matter?
2233 self.assertEquals(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00002234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002235 g = self.open(f.fileno(), "wb", closefd=False)
Benjamin Peterson44309e62008-11-22 00:41:45 +00002236 self.assertEquals(g.mode, "wb")
2237 self.assertEquals(g.raw.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00002238 self.assertEquals(g.name, f.fileno())
2239 self.assertEquals(g.raw.name, f.fileno())
2240 f.close()
2241 g.close()
2242
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002243 def test_io_after_close(self):
2244 for kwargs in [
2245 {"mode": "w"},
2246 {"mode": "wb"},
2247 {"mode": "w", "buffering": 1},
2248 {"mode": "w", "buffering": 2},
2249 {"mode": "wb", "buffering": 0},
2250 {"mode": "r"},
2251 {"mode": "rb"},
2252 {"mode": "r", "buffering": 1},
2253 {"mode": "r", "buffering": 2},
2254 {"mode": "rb", "buffering": 0},
2255 {"mode": "w+"},
2256 {"mode": "w+b"},
2257 {"mode": "w+", "buffering": 1},
2258 {"mode": "w+", "buffering": 2},
2259 {"mode": "w+b", "buffering": 0},
2260 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002261 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002262 f.close()
2263 self.assertRaises(ValueError, f.flush)
2264 self.assertRaises(ValueError, f.fileno)
2265 self.assertRaises(ValueError, f.isatty)
2266 self.assertRaises(ValueError, f.__iter__)
2267 if hasattr(f, "peek"):
2268 self.assertRaises(ValueError, f.peek, 1)
2269 self.assertRaises(ValueError, f.read)
2270 if hasattr(f, "read1"):
2271 self.assertRaises(ValueError, f.read1, 1024)
2272 if hasattr(f, "readinto"):
2273 self.assertRaises(ValueError, f.readinto, bytearray(1024))
2274 self.assertRaises(ValueError, f.readline)
2275 self.assertRaises(ValueError, f.readlines)
2276 self.assertRaises(ValueError, f.seek, 0)
2277 self.assertRaises(ValueError, f.tell)
2278 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002279 self.assertRaises(ValueError, f.write,
2280 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002281 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002282 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00002283
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002284 def test_blockingioerror(self):
2285 # Various BlockingIOError issues
2286 self.assertRaises(TypeError, self.BlockingIOError)
2287 self.assertRaises(TypeError, self.BlockingIOError, 1)
2288 self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
2289 self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
2290 b = self.BlockingIOError(1, "")
2291 self.assertEqual(b.characters_written, 0)
2292 class C(str):
2293 pass
2294 c = C("")
2295 b = self.BlockingIOError(1, c)
2296 c.b = b
2297 b.c = c
2298 wr = weakref.ref(c)
2299 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002300 support.gc_collect()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00002301 self.assertTrue(wr() is None, wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002302
2303 def test_abcs(self):
2304 # Test the visible base classes are ABCs.
2305 self.assertTrue(isinstance(self.IOBase, abc.ABCMeta))
2306 self.assertTrue(isinstance(self.RawIOBase, abc.ABCMeta))
2307 self.assertTrue(isinstance(self.BufferedIOBase, abc.ABCMeta))
2308 self.assertTrue(isinstance(self.TextIOBase, abc.ABCMeta))
2309
2310 def _check_abc_inheritance(self, abcmodule):
2311 with self.open(support.TESTFN, "wb", buffering=0) as f:
2312 self.assertTrue(isinstance(f, abcmodule.IOBase))
2313 self.assertTrue(isinstance(f, abcmodule.RawIOBase))
2314 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2315 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2316 with self.open(support.TESTFN, "wb") as f:
2317 self.assertTrue(isinstance(f, abcmodule.IOBase))
2318 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2319 self.assertTrue(isinstance(f, abcmodule.BufferedIOBase))
2320 self.assertFalse(isinstance(f, abcmodule.TextIOBase))
2321 with self.open(support.TESTFN, "w") as f:
2322 self.assertTrue(isinstance(f, abcmodule.IOBase))
2323 self.assertFalse(isinstance(f, abcmodule.RawIOBase))
2324 self.assertFalse(isinstance(f, abcmodule.BufferedIOBase))
2325 self.assertTrue(isinstance(f, abcmodule.TextIOBase))
2326
2327 def test_abc_inheritance(self):
2328 # Test implementations inherit from their respective ABCs
2329 self._check_abc_inheritance(self)
2330
2331 def test_abc_inheritance_official(self):
2332 # Test implementations inherit from the official ABCs of the
2333 # baseline "io" module.
2334 self._check_abc_inheritance(io)
2335
2336class CMiscIOTest(MiscIOTest):
2337 io = io
2338
2339class PyMiscIOTest(MiscIOTest):
2340 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00002341
Guido van Rossum28524c72007-02-27 05:47:44 +00002342def test_main():
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002343 tests = (CIOTest, PyIOTest,
2344 CBufferedReaderTest, PyBufferedReaderTest,
2345 CBufferedWriterTest, PyBufferedWriterTest,
2346 CBufferedRWPairTest, PyBufferedRWPairTest,
2347 CBufferedRandomTest, PyBufferedRandomTest,
2348 StatefulIncrementalDecoderTest,
2349 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
2350 CTextIOWrapperTest, PyTextIOWrapperTest,
2351 CMiscIOTest, PyMiscIOTest,)
2352
2353 # Put the namespaces of the IO module we are testing and some useful mock
2354 # classes in the __dict__ of each test.
2355 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
2356 MockNonBlockWriterIO)
2357 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
2358 c_io_ns = {name : getattr(io, name) for name in all_members}
2359 py_io_ns = {name : getattr(pyio, name) for name in all_members}
2360 globs = globals()
2361 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
2362 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
2363 # Avoid turning open into a bound method.
2364 py_io_ns["open"] = pyio.OpenWrapper
2365 for test in tests:
2366 if test.__name__.startswith("C"):
2367 for name, obj in c_io_ns.items():
2368 setattr(test, name, obj)
2369 elif test.__name__.startswith("Py"):
2370 for name, obj in py_io_ns.items():
2371 setattr(test, name, obj)
2372
2373 support.run_unittest(*tests)
Guido van Rossum28524c72007-02-27 05:47:44 +00002374
2375if __name__ == "__main__":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 test_main()