blob: 85fac30e300a659cff924cddae46c4f8469f8398 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Hai Shi883bc632020-07-06 17:12:49 +080043from test.support import import_helper
44from test.support import os_helper
Hai Shie80697d2020-05-28 06:10:27 +080045from test.support import threading_helper
Hai Shi883bc632020-07-06 17:12:49 +080046from test.support import warnings_helper
47from test.support.os_helper import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000048
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000049import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050import io # C implementation of io
51import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Martin Panter6bb91f32016-05-28 00:41:57 +000053try:
54 import ctypes
55except ImportError:
56 def byteslike(*pos, **kw):
57 return array.array("b", bytes(*pos, **kw))
58else:
59 def byteslike(*pos, **kw):
60 """Create a bytes-like object having no string or sequence methods"""
61 data = bytes(*pos, **kw)
62 obj = EmptyStruct()
63 ctypes.resize(obj, len(data))
64 memoryview(obj).cast("B")[:] = data
65 return obj
66 class EmptyStruct(ctypes.Structure):
67 pass
68
Gregory P. Smithe5796c42018-12-30 20:17:57 -080069_cflags = sysconfig.get_config_var('CFLAGS') or ''
70_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
71MEMORY_SANITIZER = (
72 '-fsanitize=memory' in _cflags or
73 '--with-memory-sanitizer' in _config_args
74)
75
Victor Stinnerbc2aa812019-05-23 03:45:09 +020076# Does io.IOBase finalizer log the exception if the close() method fails?
77# The exception is ignored silently by default in release build.
78IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020079
80
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081def _default_chunk_size():
82 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000083 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return f._CHUNK_SIZE
85
86
Antoine Pitrou328ec742010-09-14 18:37:24 +000087class MockRawIOWithoutRead:
88 """A RawIO implementation without read(), so as to exercise the default
89 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000090
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000091 def __init__(self, read_stack=()):
92 self._read_stack = list(read_stack)
93 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000096
Guido van Rossum01a27522007-03-07 01:00:12 +000097 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000099 return len(b)
100
101 def writable(self):
102 return True
103
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000104 def fileno(self):
105 return 42
106
107 def readable(self):
108 return True
109
Guido van Rossum01a27522007-03-07 01:00:12 +0000110 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111 return True
112
Guido van Rossum01a27522007-03-07 01:00:12 +0000113 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000115
116 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000117 return 0 # same comment as above
118
119 def readinto(self, buf):
120 self._reads += 1
121 max_len = len(buf)
122 try:
123 data = self._read_stack[0]
124 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000125 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126 return 0
127 if data is None:
128 del self._read_stack[0]
129 return None
130 n = len(data)
131 if len(data) <= max_len:
132 del self._read_stack[0]
133 buf[:n] = data
134 return n
135 else:
136 buf[:] = data[:max_len]
137 self._read_stack[0] = data[max_len:]
138 return max_len
139
140 def truncate(self, pos=None):
141 return pos
142
Antoine Pitrou328ec742010-09-14 18:37:24 +0000143class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
144 pass
145
146class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
147 pass
148
149
150class MockRawIO(MockRawIOWithoutRead):
151
152 def read(self, n=None):
153 self._reads += 1
154 try:
155 return self._read_stack.pop(0)
156 except:
157 self._extraneous_reads += 1
158 return b""
159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160class CMockRawIO(MockRawIO, io.RawIOBase):
161 pass
162
163class PyMockRawIO(MockRawIO, pyio.RawIOBase):
164 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000165
Guido van Rossuma9e20242007-03-08 00:43:48 +0000166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000167class MisbehavedRawIO(MockRawIO):
168 def write(self, b):
169 return super().write(b) * 2
170
171 def read(self, n=None):
172 return super().read(n) * 2
173
174 def seek(self, pos, whence):
175 return -123
176
177 def tell(self):
178 return -456
179
180 def readinto(self, buf):
181 super().readinto(buf)
182 return len(buf) * 5
183
184class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
185 pass
186
187class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
188 pass
189
190
benfogle9703f092017-11-10 16:03:40 -0500191class SlowFlushRawIO(MockRawIO):
192 def __init__(self):
193 super().__init__()
194 self.in_flush = threading.Event()
195
196 def flush(self):
197 self.in_flush.set()
198 time.sleep(0.25)
199
200class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
201 pass
202
203class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
204 pass
205
206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000207class CloseFailureIO(MockRawIO):
208 closed = 0
209
210 def close(self):
211 if not self.closed:
212 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200213 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214
215class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
216 pass
217
218class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
219 pass
220
221
222class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000223
224 def __init__(self, data):
225 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000226 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000227
228 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000229 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000230 self.read_history.append(None if res is None else len(res))
231 return res
232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000233 def readinto(self, b):
234 res = super().readinto(b)
235 self.read_history.append(res)
236 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238class CMockFileIO(MockFileIO, io.BytesIO):
239 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241class PyMockFileIO(MockFileIO, pyio.BytesIO):
242 pass
243
244
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000245class MockUnseekableIO:
246 def seekable(self):
247 return False
248
249 def seek(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
252 def tell(self, *args):
253 raise self.UnsupportedOperation("not seekable")
254
Martin Panter754aab22016-03-31 07:21:56 +0000255 def truncate(self, *args):
256 raise self.UnsupportedOperation("not seekable")
257
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000258class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
259 UnsupportedOperation = io.UnsupportedOperation
260
261class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
262 UnsupportedOperation = pyio.UnsupportedOperation
263
264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265class MockNonBlockWriterIO:
266
267 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000268 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000269 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000271 def pop_written(self):
272 s = b"".join(self._write_stack)
273 self._write_stack[:] = []
274 return s
275
276 def block_on(self, char):
277 """Block when a given char is encountered."""
278 self._blocker_char = char
279
280 def readable(self):
281 return True
282
283 def seekable(self):
284 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000285
Victor Stinnerb589cef2019-06-11 03:10:59 +0200286 def seek(self, pos, whence=0):
287 # naive implementation, enough for tests
288 return 0
289
Guido van Rossum01a27522007-03-07 01:00:12 +0000290 def writable(self):
291 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000293 def write(self, b):
294 b = bytes(b)
295 n = -1
296 if self._blocker_char:
297 try:
298 n = b.index(self._blocker_char)
299 except ValueError:
300 pass
301 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100302 if n > 0:
303 # write data up to the first blocker
304 self._write_stack.append(b[:n])
305 return n
306 else:
307 # cancel blocker and indicate would block
308 self._blocker_char = None
309 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000310 self._write_stack.append(b)
311 return len(b)
312
313class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
314 BlockingIOError = io.BlockingIOError
315
316class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
317 BlockingIOError = pyio.BlockingIOError
318
Guido van Rossuma9e20242007-03-08 00:43:48 +0000319
Guido van Rossum28524c72007-02-27 05:47:44 +0000320class IOTest(unittest.TestCase):
321
Neal Norwitze7789b12008-03-24 06:18:09 +0000322 def setUp(self):
Hai Shi883bc632020-07-06 17:12:49 +0800323 os_helper.unlink(os_helper.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000324
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000325 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +0800326 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 f.truncate(0)
331 self.assertEqual(f.tell(), 5)
332 f.seek(0)
333
334 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.seek(0), 0)
336 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000337 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000340 buffer = bytearray(b" world\n\n\n")
341 self.assertEqual(f.write(buffer), 9)
342 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000343 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000344 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000345 self.assertEqual(f.seek(-1, 2), 13)
346 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000347
Guido van Rossum87429772007-04-10 21:06:59 +0000348 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000349 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000350 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000351
Guido van Rossum9b76da62007-04-11 01:09:03 +0000352 def read_ops(self, f, buffered=False):
353 data = f.read(5)
354 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000355 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000357 self.assertEqual(bytes(data), b" worl")
358 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000359 self.assertEqual(f.readinto(data), 2)
360 self.assertEqual(len(data), 5)
361 self.assertEqual(data[:2], b"d\n")
362 self.assertEqual(f.seek(0), 0)
363 self.assertEqual(f.read(20), b"hello world\n")
364 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000365 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000366 self.assertEqual(f.seek(-6, 2), 6)
367 self.assertEqual(f.read(5), b"world")
368 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000369 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000370 self.assertEqual(f.seek(-6, 1), 5)
371 self.assertEqual(f.read(5), b" worl")
372 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000373 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000374 if buffered:
375 f.seek(0)
376 self.assertEqual(f.read(), b"hello world\n")
377 f.seek(6)
378 self.assertEqual(f.read(), b"world\n")
379 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000380 f.seek(0)
381 data = byteslike(5)
382 self.assertEqual(f.readinto1(data), 5)
383 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000384
Guido van Rossum34d69e52007-04-10 20:08:41 +0000385 LARGE = 2**31
386
Guido van Rossum53807da2007-04-10 19:01:47 +0000387 def large_file_ops(self, f):
388 assert f.readable()
389 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100390 try:
391 self.assertEqual(f.seek(self.LARGE), self.LARGE)
392 except (OverflowError, ValueError):
393 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000394 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000396 self.assertEqual(f.tell(), self.LARGE + 3)
397 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000398 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000399 self.assertEqual(f.tell(), self.LARGE + 2)
400 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000401 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000402 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000403 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
404 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000405 self.assertEqual(f.read(2), b"x")
406
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000407 def test_invalid_operations(self):
408 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000409 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000410 for mode in ("w", "wb"):
Hai Shi883bc632020-07-06 17:12:49 +0800411 with self.open(os_helper.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000412 self.assertRaises(exc, fp.read)
413 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800414 with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000415 self.assertRaises(exc, fp.read)
416 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800417 with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, b"blah")
419 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800420 with self.open(os_helper.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000421 self.assertRaises(exc, fp.write, b"blah")
422 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800423 with self.open(os_helper.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000424 self.assertRaises(exc, fp.write, "blah")
425 self.assertRaises(exc, fp.writelines, ["blah\n"])
426 # Non-zero seeking from current or end pos
427 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
428 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000429
Martin Panter754aab22016-03-31 07:21:56 +0000430 def test_optional_abilities(self):
431 # Test for OSError when optional APIs are not supported
432 # The purpose of this test is to try fileno(), reading, writing and
433 # seeking operations with various objects that indicate they do not
434 # support these operations.
435
436 def pipe_reader():
437 [r, w] = os.pipe()
438 os.close(w) # So that read() is harmless
439 return self.FileIO(r, "r")
440
441 def pipe_writer():
442 [r, w] = os.pipe()
443 self.addCleanup(os.close, r)
444 # Guarantee that we can write into the pipe without blocking
445 thread = threading.Thread(target=os.read, args=(r, 100))
446 thread.start()
447 self.addCleanup(thread.join)
448 return self.FileIO(w, "w")
449
450 def buffered_reader():
451 return self.BufferedReader(self.MockUnseekableIO())
452
453 def buffered_writer():
454 return self.BufferedWriter(self.MockUnseekableIO())
455
456 def buffered_random():
457 return self.BufferedRandom(self.BytesIO())
458
459 def buffered_rw_pair():
460 return self.BufferedRWPair(self.MockUnseekableIO(),
461 self.MockUnseekableIO())
462
463 def text_reader():
464 class UnseekableReader(self.MockUnseekableIO):
465 writable = self.BufferedIOBase.writable
466 write = self.BufferedIOBase.write
467 return self.TextIOWrapper(UnseekableReader(), "ascii")
468
469 def text_writer():
470 class UnseekableWriter(self.MockUnseekableIO):
471 readable = self.BufferedIOBase.readable
472 read = self.BufferedIOBase.read
473 return self.TextIOWrapper(UnseekableWriter(), "ascii")
474
475 tests = (
476 (pipe_reader, "fr"), (pipe_writer, "fw"),
477 (buffered_reader, "r"), (buffered_writer, "w"),
478 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
479 (text_reader, "r"), (text_writer, "w"),
480 (self.BytesIO, "rws"), (self.StringIO, "rws"),
481 )
482 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000483 with self.subTest(test), test() as obj:
484 readable = "r" in abilities
485 self.assertEqual(obj.readable(), readable)
486 writable = "w" in abilities
487 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000488
489 if isinstance(obj, self.TextIOBase):
490 data = "3"
491 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
492 data = b"3"
493 else:
494 self.fail("Unknown base class")
495
496 if "f" in abilities:
497 obj.fileno()
498 else:
499 self.assertRaises(OSError, obj.fileno)
500
501 if readable:
502 obj.read(1)
503 obj.read()
504 else:
505 self.assertRaises(OSError, obj.read, 1)
506 self.assertRaises(OSError, obj.read)
507
508 if writable:
509 obj.write(data)
510 else:
511 self.assertRaises(OSError, obj.write, data)
512
Martin Panter3ee147f2016-03-31 21:05:31 +0000513 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000514 pipe_reader, pipe_writer):
515 # Pipes seem to appear as seekable on Windows
516 continue
517 seekable = "s" in abilities
518 self.assertEqual(obj.seekable(), seekable)
519
Martin Panter754aab22016-03-31 07:21:56 +0000520 if seekable:
521 obj.tell()
522 obj.seek(0)
523 else:
524 self.assertRaises(OSError, obj.tell)
525 self.assertRaises(OSError, obj.seek, 0)
526
527 if writable and seekable:
528 obj.truncate()
529 obj.truncate(0)
530 else:
531 self.assertRaises(OSError, obj.truncate)
532 self.assertRaises(OSError, obj.truncate, 0)
533
Antoine Pitrou13348842012-01-29 18:36:34 +0100534 def test_open_handles_NUL_chars(self):
535 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300536 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100537
538 bytes_fn = bytes(fn_with_NUL, 'ascii')
539 with warnings.catch_warnings():
540 warnings.simplefilter("ignore", DeprecationWarning)
541 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100542
Guido van Rossum28524c72007-02-27 05:47:44 +0000543 def test_raw_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800544 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000545 self.assertEqual(f.readable(), False)
546 self.assertEqual(f.writable(), True)
547 self.assertEqual(f.seekable(), True)
548 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800549 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000550 self.assertEqual(f.readable(), True)
551 self.assertEqual(f.writable(), False)
552 self.assertEqual(f.seekable(), True)
553 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000554
Guido van Rossum87429772007-04-10 21:06:59 +0000555 def test_buffered_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800556 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000557 self.assertEqual(f.readable(), False)
558 self.assertEqual(f.writable(), True)
559 self.assertEqual(f.seekable(), True)
560 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800561 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000562 self.assertEqual(f.readable(), True)
563 self.assertEqual(f.writable(), False)
564 self.assertEqual(f.seekable(), True)
565 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000566
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000567 def test_readline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800568 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000569 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Hai Shi883bc632020-07-06 17:12:49 +0800570 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000571 self.assertEqual(f.readline(), b"abc\n")
572 self.assertEqual(f.readline(10), b"def\n")
573 self.assertEqual(f.readline(2), b"xy")
574 self.assertEqual(f.readline(4), b"zzy\n")
575 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000576 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000577 self.assertRaises(TypeError, f.readline, 5.3)
Hai Shi883bc632020-07-06 17:12:49 +0800578 with self.open(os_helper.TESTFN, "r") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000579 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000580
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300581 def test_readline_nonsizeable(self):
582 # Issue #30061
583 # Crash when readline() returns an object without __len__
584 class R(self.IOBase):
585 def readline(self):
586 return None
587 self.assertRaises((TypeError, StopIteration), next, R())
588
589 def test_next_nonsizeable(self):
590 # Issue #30061
591 # Crash when __next__() returns an object without __len__
592 class R(self.IOBase):
593 def __next__(self):
594 return None
595 self.assertRaises(TypeError, R().readlines, 1)
596
Guido van Rossum28524c72007-02-27 05:47:44 +0000597 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000599 self.write_ops(f)
600 data = f.getvalue()
601 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000603 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000604
Guido van Rossum53807da2007-04-10 19:01:47 +0000605 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300606 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800607 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000608 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200609 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600610 support.requires(
611 'largefile',
612 'test requires %s bytes and a long time to run' % self.LARGE)
Hai Shi883bc632020-07-06 17:12:49 +0800613 with self.open(os_helper.TESTFN, "w+b", 0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 self.large_file_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800615 with self.open(os_helper.TESTFN, "w+b") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000617
618 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300619 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000620 f = None
Hai Shi883bc632020-07-06 17:12:49 +0800621 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000622 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000623 self.assertEqual(f.closed, True)
624 f = None
625 try:
Hai Shi883bc632020-07-06 17:12:49 +0800626 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000627 1/0
628 except ZeroDivisionError:
629 self.assertEqual(f.closed, True)
630 else:
631 self.fail("1/0 didn't raise an exception")
632
Antoine Pitrou08838b62009-01-21 00:55:13 +0000633 # issue 5008
634 def test_append_mode_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +0800635 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000636 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800637 with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000638 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800639 with self.open(os_helper.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000640 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800641 with self.open(os_helper.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300642 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000643
Guido van Rossum87429772007-04-10 21:06:59 +0000644 def test_destructor(self):
645 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000647 def __del__(self):
648 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 try:
650 f = super().__del__
651 except AttributeError:
652 pass
653 else:
654 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000655 def close(self):
656 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000658 def flush(self):
659 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000660 super().flush()
Hai Shi883bc632020-07-06 17:12:49 +0800661 with warnings_helper.check_warnings(('', ResourceWarning)):
662 f = MyFileIO(os_helper.TESTFN, "wb")
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000663 f.write(b"xxx")
664 del f
665 support.gc_collect()
666 self.assertEqual(record, [1, 2, 3])
Hai Shi883bc632020-07-06 17:12:49 +0800667 with self.open(os_helper.TESTFN, "rb") as f:
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000668 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000669
670 def _check_base_destructor(self, base):
671 record = []
672 class MyIO(base):
673 def __init__(self):
674 # This exercises the availability of attributes on object
675 # destruction.
676 # (in the C version, close() is called by the tp_dealloc
677 # function, not by __del__)
678 self.on_del = 1
679 self.on_close = 2
680 self.on_flush = 3
681 def __del__(self):
682 record.append(self.on_del)
683 try:
684 f = super().__del__
685 except AttributeError:
686 pass
687 else:
688 f()
689 def close(self):
690 record.append(self.on_close)
691 super().close()
692 def flush(self):
693 record.append(self.on_flush)
694 super().flush()
695 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000696 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000697 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000698 self.assertEqual(record, [1, 2, 3])
699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_IOBase_destructor(self):
701 self._check_base_destructor(self.IOBase)
702
703 def test_RawIOBase_destructor(self):
704 self._check_base_destructor(self.RawIOBase)
705
706 def test_BufferedIOBase_destructor(self):
707 self._check_base_destructor(self.BufferedIOBase)
708
709 def test_TextIOBase_destructor(self):
710 self._check_base_destructor(self.TextIOBase)
711
Guido van Rossum87429772007-04-10 21:06:59 +0000712 def test_close_flushes(self):
Hai Shi883bc632020-07-06 17:12:49 +0800713 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000714 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800715 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000716 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000717
Guido van Rossumd4103952007-04-12 05:44:49 +0000718 def test_array_writes(self):
719 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000720 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000721 def check(f):
722 with f:
723 self.assertEqual(f.write(a), n)
724 f.writelines((a,))
725 check(self.BytesIO())
Hai Shi883bc632020-07-06 17:12:49 +0800726 check(self.FileIO(os_helper.TESTFN, "w"))
Martin Panter6bb91f32016-05-28 00:41:57 +0000727 check(self.BufferedWriter(self.MockRawIO()))
728 check(self.BufferedRandom(self.MockRawIO()))
729 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000730
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000731 def test_closefd(self):
Hai Shi883bc632020-07-06 17:12:49 +0800732 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000733 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 def test_read_closed(self):
Hai Shi883bc632020-07-06 17:12:49 +0800736 with self.open(os_helper.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000737 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800738 with self.open(os_helper.TESTFN, "r") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000739 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000740 self.assertEqual(file.read(), "egg\n")
741 file.seek(0)
742 file.close()
743 self.assertRaises(ValueError, file.read)
Hai Shi883bc632020-07-06 17:12:49 +0800744 with self.open(os_helper.TESTFN, "rb") as f:
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100745 file = self.open(f.fileno(), "rb", closefd=False)
746 self.assertEqual(file.read()[:3], b"egg")
747 file.close()
748 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000749
750 def test_no_closefd_with_filename(self):
751 # can't use closefd in combination with a file name
Hai Shi883bc632020-07-06 17:12:49 +0800752 self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000753
754 def test_closefd_attr(self):
Hai Shi883bc632020-07-06 17:12:49 +0800755 with self.open(os_helper.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000756 f.write(b"egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800757 with self.open(os_helper.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000758 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000760 self.assertEqual(file.buffer.raw.closefd, False)
761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000762 def test_garbage_collection(self):
763 # FileIO objects are collected, and collecting them flushes
764 # all data to disk.
Hai Shi883bc632020-07-06 17:12:49 +0800765 with warnings_helper.check_warnings(('', ResourceWarning)):
766 f = self.FileIO(os_helper.TESTFN, "wb")
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000767 f.write(b"abcxxx")
768 f.f = f
769 wr = weakref.ref(f)
770 del f
771 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300772 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +0800773 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000774 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000775
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000776 def test_unbounded_file(self):
777 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
778 zero = "/dev/zero"
779 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000780 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000781 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000782 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000783 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800784 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000785 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000786 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000787 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000788 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000789 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000790 self.assertRaises(OverflowError, f.read)
791
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200792 def check_flush_error_on_close(self, *args, **kwargs):
793 # Test that the file is closed despite failed flush
794 # and that flush() is called before file closed.
795 f = self.open(*args, **kwargs)
796 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000797 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200798 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200799 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000800 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200801 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600802 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200803 self.assertTrue(closed) # flush() called
804 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200805 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200806
807 def test_flush_error_on_close(self):
808 # raw file
809 # Issue #5700: io.FileIO calls flush() after file closed
Hai Shi883bc632020-07-06 17:12:49 +0800810 self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
811 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200812 self.check_flush_error_on_close(fd, 'wb', buffering=0)
Hai Shi883bc632020-07-06 17:12:49 +0800813 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200814 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
815 os.close(fd)
816 # buffered io
Hai Shi883bc632020-07-06 17:12:49 +0800817 self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
818 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200819 self.check_flush_error_on_close(fd, 'wb')
Hai Shi883bc632020-07-06 17:12:49 +0800820 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200821 self.check_flush_error_on_close(fd, 'wb', closefd=False)
822 os.close(fd)
823 # text io
Hai Shi883bc632020-07-06 17:12:49 +0800824 self.check_flush_error_on_close(os_helper.TESTFN, 'w')
825 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200826 self.check_flush_error_on_close(fd, 'w')
Hai Shi883bc632020-07-06 17:12:49 +0800827 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200828 self.check_flush_error_on_close(fd, 'w', closefd=False)
829 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000830
831 def test_multi_close(self):
Hai Shi883bc632020-07-06 17:12:49 +0800832 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000833 f.close()
834 f.close()
835 f.close()
836 self.assertRaises(ValueError, f.flush)
837
Antoine Pitrou328ec742010-09-14 18:37:24 +0000838 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530839 # Exercise the default limited RawIOBase.read(n) implementation (which
840 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000841 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
842 self.assertEqual(rawio.read(2), b"ab")
843 self.assertEqual(rawio.read(2), b"c")
844 self.assertEqual(rawio.read(2), b"d")
845 self.assertEqual(rawio.read(2), None)
846 self.assertEqual(rawio.read(2), b"ef")
847 self.assertEqual(rawio.read(2), b"g")
848 self.assertEqual(rawio.read(2), None)
849 self.assertEqual(rawio.read(2), b"")
850
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400851 def test_types_have_dict(self):
852 test = (
853 self.IOBase(),
854 self.RawIOBase(),
855 self.TextIOBase(),
856 self.StringIO(),
857 self.BytesIO()
858 )
859 for obj in test:
860 self.assertTrue(hasattr(obj, "__dict__"))
861
Ross Lagerwall59142db2011-10-31 20:34:46 +0200862 def test_opener(self):
Hai Shi883bc632020-07-06 17:12:49 +0800863 with self.open(os_helper.TESTFN, "w") as f:
Ross Lagerwall59142db2011-10-31 20:34:46 +0200864 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800865 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Ross Lagerwall59142db2011-10-31 20:34:46 +0200866 def opener(path, flags):
867 return fd
868 with self.open("non-existent", "r", opener=opener) as f:
869 self.assertEqual(f.read(), "egg\n")
870
Barry Warsaw480e2852016-06-08 17:47:26 -0400871 def test_bad_opener_negative_1(self):
872 # Issue #27066.
873 def badopener(fname, flags):
874 return -1
875 with self.assertRaises(ValueError) as cm:
876 open('non-existent', 'r', opener=badopener)
877 self.assertEqual(str(cm.exception), 'opener returned -1')
878
879 def test_bad_opener_other_negative(self):
880 # Issue #27066.
881 def badopener(fname, flags):
882 return -2
883 with self.assertRaises(ValueError) as cm:
884 open('non-existent', 'r', opener=badopener)
885 self.assertEqual(str(cm.exception), 'opener returned -2')
886
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200887 def test_fileio_closefd(self):
888 # Issue #4841
889 with self.open(__file__, 'rb') as f1, \
890 self.open(__file__, 'rb') as f2:
891 fileio = self.FileIO(f1.fileno(), closefd=False)
892 # .__init__() must not close f1
893 fileio.__init__(f2.fileno(), closefd=False)
894 f1.readline()
895 # .close() must not close f2
896 fileio.close()
897 f2.readline()
898
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300899 def test_nonbuffered_textio(self):
Hai Shi883bc632020-07-06 17:12:49 +0800900 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300901 with self.assertRaises(ValueError):
Hai Shi883bc632020-07-06 17:12:49 +0800902 self.open(os_helper.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300903
904 def test_invalid_newline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800905 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300906 with self.assertRaises(ValueError):
Hai Shi883bc632020-07-06 17:12:49 +0800907 self.open(os_helper.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300908
Martin Panter6bb91f32016-05-28 00:41:57 +0000909 def test_buffered_readinto_mixin(self):
910 # Test the implementation provided by BufferedIOBase
911 class Stream(self.BufferedIOBase):
912 def read(self, size):
913 return b"12345"
914 read1 = read
915 stream = Stream()
916 for method in ("readinto", "readinto1"):
917 with self.subTest(method):
918 buffer = byteslike(5)
919 self.assertEqual(getattr(stream, method)(buffer), 5)
920 self.assertEqual(bytes(buffer), b"12345")
921
Ethan Furmand62548a2016-06-04 14:38:43 -0700922 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700923 def check_path_succeeds(path):
924 with self.open(path, "w") as f:
925 f.write("egg\n")
926
927 with self.open(path, "r") as f:
928 self.assertEqual(f.read(), "egg\n")
929
Hai Shi883bc632020-07-06 17:12:49 +0800930 check_path_succeeds(FakePath(os_helper.TESTFN))
Serhiy Storchaka67987ac2020-07-27 20:58:35 +0300931 check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
Ethan Furmand62548a2016-06-04 14:38:43 -0700932
Hai Shi883bc632020-07-06 17:12:49 +0800933 with self.open(os_helper.TESTFN, "w") as f:
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200934 bad_path = FakePath(f.fileno())
935 with self.assertRaises(TypeError):
936 self.open(bad_path, 'w')
937
938 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700939 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700940 self.open(bad_path, 'w')
941
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200942 bad_path = FakePath(FloatingPointError)
943 with self.assertRaises(FloatingPointError):
944 self.open(bad_path, 'w')
945
Ethan Furmand62548a2016-06-04 14:38:43 -0700946 # ensure that refcounting is correct with some error conditions
947 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Hai Shi883bc632020-07-06 17:12:49 +0800948 self.open(FakePath(os_helper.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700949
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530950 def test_RawIOBase_readall(self):
951 # Exercise the default unlimited RawIOBase.read() and readall()
952 # implementations.
953 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
954 self.assertEqual(rawio.read(), b"abcdefg")
955 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
956 self.assertEqual(rawio.readall(), b"abcdefg")
957
958 def test_BufferedIOBase_readinto(self):
959 # Exercise the default BufferedIOBase.readinto() and readinto1()
960 # implementations (which call read() or read1() internally).
961 class Reader(self.BufferedIOBase):
962 def __init__(self, avail):
963 self.avail = avail
964 def read(self, size):
965 result = self.avail[:size]
966 self.avail = self.avail[size:]
967 return result
968 def read1(self, size):
969 """Returns no more than 5 bytes at once"""
970 return self.read(min(size, 5))
971 tests = (
972 # (test method, total data available, read buffer size, expected
973 # read size)
974 ("readinto", 10, 5, 5),
975 ("readinto", 10, 6, 6), # More than read1() can return
976 ("readinto", 5, 6, 5), # Buffer larger than total available
977 ("readinto", 6, 7, 6),
978 ("readinto", 10, 0, 0), # Empty buffer
979 ("readinto1", 10, 5, 5), # Result limited to single read1() call
980 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
981 ("readinto1", 5, 6, 5), # Buffer larger than total available
982 ("readinto1", 6, 7, 5),
983 ("readinto1", 10, 0, 0), # Empty buffer
984 )
985 UNUSED_BYTE = 0x81
986 for test in tests:
987 with self.subTest(test):
988 method, avail, request, result = test
989 reader = Reader(bytes(range(avail)))
990 buffer = bytearray((UNUSED_BYTE,) * request)
991 method = getattr(reader, method)
992 self.assertEqual(method(buffer), result)
993 self.assertEqual(len(buffer), request)
994 self.assertSequenceEqual(buffer[:result], range(result))
995 unused = (UNUSED_BYTE,) * (request - result)
996 self.assertSequenceEqual(buffer[result:], unused)
997 self.assertEqual(len(reader.avail), avail - result)
998
Zackery Spytz28f07362018-07-17 00:31:44 -0600999 def test_close_assert(self):
1000 class R(self.IOBase):
1001 def __setattr__(self, name, value):
1002 pass
1003 def flush(self):
1004 raise OSError()
1005 f = R()
1006 # This would cause an assertion failure.
1007 self.assertRaises(OSError, f.close)
1008
Victor Stinner472f7942019-04-12 21:58:24 +02001009 # Silence destructor error
1010 R.flush = lambda self: None
1011
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001012
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001014
1015 def test_IOBase_finalize(self):
1016 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1017 # class which inherits IOBase and an object of this class are caught
1018 # in a reference cycle and close() is already in the method cache.
1019 class MyIO(self.IOBase):
1020 def close(self):
1021 pass
1022
1023 # create an instance to populate the method cache
1024 MyIO()
1025 obj = MyIO()
1026 obj.obj = obj
1027 wr = weakref.ref(obj)
1028 del MyIO
1029 del obj
1030 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001031 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001032
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001033class PyIOTest(IOTest):
1034 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001035
Guido van Rossuma9e20242007-03-08 00:43:48 +00001036
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001037@support.cpython_only
1038class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001039
Gregory P. Smith054b0652015-04-14 12:58:05 -07001040 def test_RawIOBase_io_in_pyio_match(self):
1041 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001042 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1043 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001044 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1045
1046 def test_RawIOBase_pyio_in_io_match(self):
1047 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1048 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1049 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1050
1051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052class CommonBufferedTests:
1053 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1054
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001055 def test_detach(self):
1056 raw = self.MockRawIO()
1057 buf = self.tp(raw)
1058 self.assertIs(buf.detach(), raw)
1059 self.assertRaises(ValueError, buf.detach)
1060
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001061 repr(buf) # Should still work
1062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 def test_fileno(self):
1064 rawio = self.MockRawIO()
1065 bufio = self.tp(rawio)
1066
Ezio Melottib3aedd42010-11-20 19:04:17 +00001067 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001069 def test_invalid_args(self):
1070 rawio = self.MockRawIO()
1071 bufio = self.tp(rawio)
1072 # Invalid whence
1073 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001074 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075
1076 def test_override_destructor(self):
1077 tp = self.tp
1078 record = []
1079 class MyBufferedIO(tp):
1080 def __del__(self):
1081 record.append(1)
1082 try:
1083 f = super().__del__
1084 except AttributeError:
1085 pass
1086 else:
1087 f()
1088 def close(self):
1089 record.append(2)
1090 super().close()
1091 def flush(self):
1092 record.append(3)
1093 super().flush()
1094 rawio = self.MockRawIO()
1095 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001097 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001098 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099
1100 def test_context_manager(self):
1101 # Test usability as a context manager
1102 rawio = self.MockRawIO()
1103 bufio = self.tp(rawio)
1104 def _with():
1105 with bufio:
1106 pass
1107 _with()
1108 # bufio should now be closed, and using it a second time should raise
1109 # a ValueError.
1110 self.assertRaises(ValueError, _with)
1111
1112 def test_error_through_destructor(self):
1113 # Test that the exception state is not modified by a destructor,
1114 # even if close() fails.
1115 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001116 with support.catch_unraisable_exception() as cm:
1117 with self.assertRaises(AttributeError):
1118 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001119
1120 if not IOBASE_EMITS_UNRAISABLE:
1121 self.assertIsNone(cm.unraisable)
1122 elif cm.unraisable is not None:
1123 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001124
Antoine Pitrou716c4442009-05-23 19:04:03 +00001125 def test_repr(self):
1126 raw = self.MockRawIO()
1127 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001128 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1129 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001130 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001131 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001132 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001133 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001134
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001135 def test_recursive_repr(self):
1136 # Issue #25455
1137 raw = self.MockRawIO()
1138 b = self.tp(raw)
1139 with support.swap_attr(raw, 'name', b):
1140 try:
1141 repr(b) # Should not crash
1142 except RuntimeError:
1143 pass
1144
Antoine Pitrou6be88762010-05-03 16:48:20 +00001145 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001146 # Test that buffered file is closed despite failed flush
1147 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001148 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001149 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001150 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001151 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001152 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001153 raw.flush = bad_flush
1154 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001155 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001156 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001157 self.assertTrue(raw.closed)
1158 self.assertTrue(closed) # flush() called
1159 self.assertFalse(closed[0]) # flush() called before file closed
1160 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001161 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001162
1163 def test_close_error_on_close(self):
1164 raw = self.MockRawIO()
1165 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001166 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001167 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001168 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001169 raw.close = bad_close
1170 b = self.tp(raw)
1171 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001172 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001173 b.close()
1174 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001175 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001176 self.assertEqual(err.exception.__context__.args, ('flush',))
1177 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001178
Victor Stinner472f7942019-04-12 21:58:24 +02001179 # Silence destructor error
1180 raw.close = lambda: None
1181 b.flush = lambda: None
1182
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001183 def test_nonnormalized_close_error_on_close(self):
1184 # Issue #21677
1185 raw = self.MockRawIO()
1186 def bad_flush():
1187 raise non_existing_flush
1188 def bad_close():
1189 raise non_existing_close
1190 raw.close = bad_close
1191 b = self.tp(raw)
1192 b.flush = bad_flush
1193 with self.assertRaises(NameError) as err: # exception not swallowed
1194 b.close()
1195 self.assertIn('non_existing_close', str(err.exception))
1196 self.assertIsInstance(err.exception.__context__, NameError)
1197 self.assertIn('non_existing_flush', str(err.exception.__context__))
1198 self.assertFalse(b.closed)
1199
Victor Stinner472f7942019-04-12 21:58:24 +02001200 # Silence destructor error
1201 b.flush = lambda: None
1202 raw.close = lambda: None
1203
Antoine Pitrou6be88762010-05-03 16:48:20 +00001204 def test_multi_close(self):
1205 raw = self.MockRawIO()
1206 b = self.tp(raw)
1207 b.close()
1208 b.close()
1209 b.close()
1210 self.assertRaises(ValueError, b.flush)
1211
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001212 def test_unseekable(self):
1213 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1214 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1215 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1216
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001217 def test_readonly_attributes(self):
1218 raw = self.MockRawIO()
1219 buf = self.tp(raw)
1220 x = self.MockRawIO()
1221 with self.assertRaises(AttributeError):
1222 buf.raw = x
1223
Guido van Rossum78892e42007-04-06 17:31:18 +00001224
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001225class SizeofTest:
1226
1227 @support.cpython_only
1228 def test_sizeof(self):
1229 bufsize1 = 4096
1230 bufsize2 = 8192
1231 rawio = self.MockRawIO()
1232 bufio = self.tp(rawio, buffer_size=bufsize1)
1233 size = sys.getsizeof(bufio) - bufsize1
1234 rawio = self.MockRawIO()
1235 bufio = self.tp(rawio, buffer_size=bufsize2)
1236 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1237
Jesus Ceadc469452012-10-04 12:37:56 +02001238 @support.cpython_only
1239 def test_buffer_freeing(self) :
1240 bufsize = 4096
1241 rawio = self.MockRawIO()
1242 bufio = self.tp(rawio, buffer_size=bufsize)
1243 size = sys.getsizeof(bufio) - bufsize
1244 bufio.close()
1245 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1248 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 def test_constructor(self):
1251 rawio = self.MockRawIO([b"abc"])
1252 bufio = self.tp(rawio)
1253 bufio.__init__(rawio)
1254 bufio.__init__(rawio, buffer_size=1024)
1255 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001256 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1258 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1259 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1260 rawio = self.MockRawIO([b"abc"])
1261 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001262 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001263
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001264 def test_uninitialized(self):
1265 bufio = self.tp.__new__(self.tp)
1266 del bufio
1267 bufio = self.tp.__new__(self.tp)
1268 self.assertRaisesRegex((ValueError, AttributeError),
1269 'uninitialized|has no attribute',
1270 bufio.read, 0)
1271 bufio.__init__(self.MockRawIO())
1272 self.assertEqual(bufio.read(0), b'')
1273
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001275 for arg in (None, 7):
1276 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1277 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001278 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 # Invalid args
1280 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001281
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001282 def test_read1(self):
1283 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1284 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001285 self.assertEqual(b"a", bufio.read(1))
1286 self.assertEqual(b"b", bufio.read1(1))
1287 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001288 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001289 self.assertEqual(b"c", bufio.read1(100))
1290 self.assertEqual(rawio._reads, 1)
1291 self.assertEqual(b"d", bufio.read1(100))
1292 self.assertEqual(rawio._reads, 2)
1293 self.assertEqual(b"efg", bufio.read1(100))
1294 self.assertEqual(rawio._reads, 3)
1295 self.assertEqual(b"", bufio.read1(100))
1296 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001297
1298 def test_read1_arbitrary(self):
1299 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1300 bufio = self.tp(rawio)
1301 self.assertEqual(b"a", bufio.read(1))
1302 self.assertEqual(b"bc", bufio.read1())
1303 self.assertEqual(b"d", bufio.read1())
1304 self.assertEqual(b"efg", bufio.read1(-1))
1305 self.assertEqual(rawio._reads, 3)
1306 self.assertEqual(b"", bufio.read1())
1307 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308
1309 def test_readinto(self):
1310 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1311 bufio = self.tp(rawio)
1312 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001313 self.assertEqual(bufio.readinto(b), 2)
1314 self.assertEqual(b, b"ab")
1315 self.assertEqual(bufio.readinto(b), 2)
1316 self.assertEqual(b, b"cd")
1317 self.assertEqual(bufio.readinto(b), 2)
1318 self.assertEqual(b, b"ef")
1319 self.assertEqual(bufio.readinto(b), 1)
1320 self.assertEqual(b, b"gf")
1321 self.assertEqual(bufio.readinto(b), 0)
1322 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001323 rawio = self.MockRawIO((b"abc", None))
1324 bufio = self.tp(rawio)
1325 self.assertEqual(bufio.readinto(b), 2)
1326 self.assertEqual(b, b"ab")
1327 self.assertEqual(bufio.readinto(b), 1)
1328 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329
Benjamin Petersona96fea02014-06-22 14:17:44 -07001330 def test_readinto1(self):
1331 buffer_size = 10
1332 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1333 bufio = self.tp(rawio, buffer_size=buffer_size)
1334 b = bytearray(2)
1335 self.assertEqual(bufio.peek(3), b'abc')
1336 self.assertEqual(rawio._reads, 1)
1337 self.assertEqual(bufio.readinto1(b), 2)
1338 self.assertEqual(b, b"ab")
1339 self.assertEqual(rawio._reads, 1)
1340 self.assertEqual(bufio.readinto1(b), 1)
1341 self.assertEqual(b[:1], b"c")
1342 self.assertEqual(rawio._reads, 1)
1343 self.assertEqual(bufio.readinto1(b), 2)
1344 self.assertEqual(b, b"de")
1345 self.assertEqual(rawio._reads, 2)
1346 b = bytearray(2*buffer_size)
1347 self.assertEqual(bufio.peek(3), b'fgh')
1348 self.assertEqual(rawio._reads, 3)
1349 self.assertEqual(bufio.readinto1(b), 6)
1350 self.assertEqual(b[:6], b"fghjkl")
1351 self.assertEqual(rawio._reads, 4)
1352
1353 def test_readinto_array(self):
1354 buffer_size = 60
1355 data = b"a" * 26
1356 rawio = self.MockRawIO((data,))
1357 bufio = self.tp(rawio, buffer_size=buffer_size)
1358
1359 # Create an array with element size > 1 byte
1360 b = array.array('i', b'x' * 32)
1361 assert len(b) != 16
1362
1363 # Read into it. We should get as many *bytes* as we can fit into b
1364 # (which is more than the number of elements)
1365 n = bufio.readinto(b)
1366 self.assertGreater(n, len(b))
1367
1368 # Check that old contents of b are preserved
1369 bm = memoryview(b).cast('B')
1370 self.assertLess(n, len(bm))
1371 self.assertEqual(bm[:n], data[:n])
1372 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1373
1374 def test_readinto1_array(self):
1375 buffer_size = 60
1376 data = b"a" * 26
1377 rawio = self.MockRawIO((data,))
1378 bufio = self.tp(rawio, buffer_size=buffer_size)
1379
1380 # Create an array with element size > 1 byte
1381 b = array.array('i', b'x' * 32)
1382 assert len(b) != 16
1383
1384 # Read into it. We should get as many *bytes* as we can fit into b
1385 # (which is more than the number of elements)
1386 n = bufio.readinto1(b)
1387 self.assertGreater(n, len(b))
1388
1389 # Check that old contents of b are preserved
1390 bm = memoryview(b).cast('B')
1391 self.assertLess(n, len(bm))
1392 self.assertEqual(bm[:n], data[:n])
1393 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1394
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001395 def test_readlines(self):
1396 def bufio():
1397 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1398 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001399 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1400 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1401 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001404 data = b"abcdefghi"
1405 dlen = len(data)
1406
1407 tests = [
1408 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1409 [ 100, [ 3, 3, 3], [ dlen ] ],
1410 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1411 ]
1412
1413 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 rawio = self.MockFileIO(data)
1415 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001416 pos = 0
1417 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001418 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001419 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001420 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001424 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1426 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001427 self.assertEqual(b"abcd", bufio.read(6))
1428 self.assertEqual(b"e", bufio.read(1))
1429 self.assertEqual(b"fg", bufio.read())
1430 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001431 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001432 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001433
Victor Stinnera80987f2011-05-25 22:47:16 +02001434 rawio = self.MockRawIO((b"a", None, None))
1435 self.assertEqual(b"a", rawio.readall())
1436 self.assertIsNone(rawio.readall())
1437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001438 def test_read_past_eof(self):
1439 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1440 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001441
Ezio Melottib3aedd42010-11-20 19:04:17 +00001442 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001443
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001444 def test_read_all(self):
1445 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1446 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001447
Ezio Melottib3aedd42010-11-20 19:04:17 +00001448 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001449
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001450 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001452 try:
1453 # Write out many bytes with exactly the same number of 0's,
1454 # 1's... 255's. This will help us check that concurrent reading
1455 # doesn't duplicate or forget contents.
1456 N = 1000
1457 l = list(range(256)) * N
1458 random.shuffle(l)
1459 s = bytes(bytearray(l))
Hai Shi883bc632020-07-06 17:12:49 +08001460 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001461 f.write(s)
Hai Shi883bc632020-07-06 17:12:49 +08001462 with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001464 errors = []
1465 results = []
1466 def f():
1467 try:
1468 # Intra-buffer read then buffer-flushing read
1469 for n in cycle([1, 19]):
1470 s = bufio.read(n)
1471 if not s:
1472 break
1473 # list.append() is atomic
1474 results.append(s)
1475 except Exception as e:
1476 errors.append(e)
1477 raise
1478 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001479 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001480 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001481 self.assertFalse(errors,
1482 "the following exceptions were caught: %r" % errors)
1483 s = b''.join(results)
1484 for i in range(256):
1485 c = bytes(bytearray([i]))
1486 self.assertEqual(s.count(c), N)
1487 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001488 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001489
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001490 def test_unseekable(self):
1491 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1492 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1493 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1494 bufio.read(1)
1495 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1496 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001498 def test_misbehaved_io(self):
1499 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1500 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001501 self.assertRaises(OSError, bufio.seek, 0)
1502 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001503
Victor Stinnerb589cef2019-06-11 03:10:59 +02001504 # Silence destructor error
1505 bufio.close = lambda: None
1506
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001507 def test_no_extraneous_read(self):
1508 # Issue #9550; when the raw IO object has satisfied the read request,
1509 # we should not issue any additional reads, otherwise it may block
1510 # (e.g. socket).
1511 bufsize = 16
1512 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1513 rawio = self.MockRawIO([b"x" * n])
1514 bufio = self.tp(rawio, bufsize)
1515 self.assertEqual(bufio.read(n), b"x" * n)
1516 # Simple case: one raw read is enough to satisfy the request.
1517 self.assertEqual(rawio._extraneous_reads, 0,
1518 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1519 # A more complex case where two raw reads are needed to satisfy
1520 # the request.
1521 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1522 bufio = self.tp(rawio, bufsize)
1523 self.assertEqual(bufio.read(n), b"x" * n)
1524 self.assertEqual(rawio._extraneous_reads, 0,
1525 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1526
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001527 def test_read_on_closed(self):
1528 # Issue #23796
1529 b = io.BufferedReader(io.BytesIO(b"12"))
1530 b.read(1)
1531 b.close()
1532 self.assertRaises(ValueError, b.peek)
1533 self.assertRaises(ValueError, b.read1, 1)
1534
Berker Peksagfd5116c2020-02-21 20:57:26 +03001535 def test_truncate_on_read_only(self):
1536 rawio = self.MockFileIO(b"abc")
1537 bufio = self.tp(rawio)
1538 self.assertFalse(bufio.writable())
1539 self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1540 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1541
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001542
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001543class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 tp = io.BufferedReader
1545
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001546 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1547 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548 def test_constructor(self):
1549 BufferedReaderTest.test_constructor(self)
1550 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001551 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001552 if sys.maxsize > 0x7FFFFFFF:
1553 rawio = self.MockRawIO()
1554 bufio = self.tp(rawio)
1555 self.assertRaises((OverflowError, MemoryError, ValueError),
1556 bufio.__init__, rawio, sys.maxsize)
1557
1558 def test_initialization(self):
1559 rawio = self.MockRawIO([b"abc"])
1560 bufio = self.tp(rawio)
1561 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1562 self.assertRaises(ValueError, bufio.read)
1563 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1564 self.assertRaises(ValueError, bufio.read)
1565 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1566 self.assertRaises(ValueError, bufio.read)
1567
1568 def test_misbehaved_io_read(self):
1569 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1570 bufio = self.tp(rawio)
1571 # _pyio.BufferedReader seems to implement reading different, so that
1572 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001573 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001574
1575 def test_garbage_collection(self):
1576 # C BufferedReader objects are collected.
1577 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001578 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1579 with warnings_helper.check_warnings(('', ResourceWarning)):
1580 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001581 f = self.tp(rawio)
1582 f.f = f
1583 wr = weakref.ref(f)
1584 del f
1585 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001586 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001587
R David Murray67bfe802013-02-23 21:51:05 -05001588 def test_args_error(self):
1589 # Issue #17275
1590 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1591 self.tp(io.BytesIO(), 1024, 1024, 1024)
1592
David Szotten86663562020-06-16 00:53:57 +01001593 def test_bad_readinto_value(self):
1594 rawio = io.BufferedReader(io.BytesIO(b"12"))
1595 rawio.readinto = lambda buf: -1
1596 bufio = self.tp(rawio)
1597 with self.assertRaises(OSError) as cm:
1598 bufio.readline()
1599 self.assertIsNone(cm.exception.__cause__)
1600
1601 def test_bad_readinto_type(self):
1602 rawio = io.BufferedReader(io.BytesIO(b"12"))
1603 rawio.readinto = lambda buf: b''
1604 bufio = self.tp(rawio)
1605 with self.assertRaises(OSError) as cm:
1606 bufio.readline()
1607 self.assertIsInstance(cm.exception.__cause__, TypeError)
1608
R David Murray67bfe802013-02-23 21:51:05 -05001609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610class PyBufferedReaderTest(BufferedReaderTest):
1611 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001612
Guido van Rossuma9e20242007-03-08 00:43:48 +00001613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001614class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1615 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617 def test_constructor(self):
1618 rawio = self.MockRawIO()
1619 bufio = self.tp(rawio)
1620 bufio.__init__(rawio)
1621 bufio.__init__(rawio, buffer_size=1024)
1622 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001623 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 bufio.flush()
1625 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1626 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1627 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1628 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001629 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001631 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001632
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001633 def test_uninitialized(self):
1634 bufio = self.tp.__new__(self.tp)
1635 del bufio
1636 bufio = self.tp.__new__(self.tp)
1637 self.assertRaisesRegex((ValueError, AttributeError),
1638 'uninitialized|has no attribute',
1639 bufio.write, b'')
1640 bufio.__init__(self.MockRawIO())
1641 self.assertEqual(bufio.write(b''), 0)
1642
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001643 def test_detach_flush(self):
1644 raw = self.MockRawIO()
1645 buf = self.tp(raw)
1646 buf.write(b"howdy!")
1647 self.assertFalse(raw._write_stack)
1648 buf.detach()
1649 self.assertEqual(raw._write_stack, [b"howdy!"])
1650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001652 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 writer = self.MockRawIO()
1654 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001655 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001656 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001657 buffer = bytearray(b"def")
1658 bufio.write(buffer)
1659 buffer[:] = b"***" # Overwrite our copy of the data
1660 bufio.flush()
1661 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 def test_write_overflow(self):
1664 writer = self.MockRawIO()
1665 bufio = self.tp(writer, 8)
1666 contents = b"abcdefghijklmnop"
1667 for n in range(0, len(contents), 3):
1668 bufio.write(contents[n:n+3])
1669 flushed = b"".join(writer._write_stack)
1670 # At least (total - 8) bytes were implicitly flushed, perhaps more
1671 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001672 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 def check_writes(self, intermediate_func):
1675 # Lots of writes, test the flushed output is as expected.
1676 contents = bytes(range(256)) * 1000
1677 n = 0
1678 writer = self.MockRawIO()
1679 bufio = self.tp(writer, 13)
1680 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1681 def gen_sizes():
1682 for size in count(1):
1683 for i in range(15):
1684 yield size
1685 sizes = gen_sizes()
1686 while n < len(contents):
1687 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001688 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 intermediate_func(bufio)
1690 n += size
1691 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001692 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 def test_writes(self):
1695 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 def test_writes_and_flushes(self):
1698 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 def test_writes_and_seeks(self):
1701 def _seekabs(bufio):
1702 pos = bufio.tell()
1703 bufio.seek(pos + 1, 0)
1704 bufio.seek(pos - 1, 0)
1705 bufio.seek(pos, 0)
1706 self.check_writes(_seekabs)
1707 def _seekrel(bufio):
1708 pos = bufio.seek(0, 1)
1709 bufio.seek(+1, 1)
1710 bufio.seek(-1, 1)
1711 bufio.seek(pos, 0)
1712 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001714 def test_writes_and_truncates(self):
1715 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 def test_write_non_blocking(self):
1718 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001719 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001720
Ezio Melottib3aedd42010-11-20 19:04:17 +00001721 self.assertEqual(bufio.write(b"abcd"), 4)
1722 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 # 1 byte will be written, the rest will be buffered
1724 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001725 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001726
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1728 raw.block_on(b"0")
1729 try:
1730 bufio.write(b"opqrwxyz0123456789")
1731 except self.BlockingIOError as e:
1732 written = e.characters_written
1733 else:
1734 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001735 self.assertEqual(written, 16)
1736 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001738
Ezio Melottib3aedd42010-11-20 19:04:17 +00001739 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740 s = raw.pop_written()
1741 # Previously buffered bytes were flushed
1742 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 def test_write_and_rewind(self):
1745 raw = io.BytesIO()
1746 bufio = self.tp(raw, 4)
1747 self.assertEqual(bufio.write(b"abcdef"), 6)
1748 self.assertEqual(bufio.tell(), 6)
1749 bufio.seek(0, 0)
1750 self.assertEqual(bufio.write(b"XY"), 2)
1751 bufio.seek(6, 0)
1752 self.assertEqual(raw.getvalue(), b"XYcdef")
1753 self.assertEqual(bufio.write(b"123456"), 6)
1754 bufio.flush()
1755 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001756
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001757 def test_flush(self):
1758 writer = self.MockRawIO()
1759 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001760 bufio.write(b"abc")
1761 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001762 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001763
Antoine Pitrou131a4892012-10-16 22:57:11 +02001764 def test_writelines(self):
1765 l = [b'ab', b'cd', b'ef']
1766 writer = self.MockRawIO()
1767 bufio = self.tp(writer, 8)
1768 bufio.writelines(l)
1769 bufio.flush()
1770 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1771
1772 def test_writelines_userlist(self):
1773 l = UserList([b'ab', b'cd', b'ef'])
1774 writer = self.MockRawIO()
1775 bufio = self.tp(writer, 8)
1776 bufio.writelines(l)
1777 bufio.flush()
1778 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1779
1780 def test_writelines_error(self):
1781 writer = self.MockRawIO()
1782 bufio = self.tp(writer, 8)
1783 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1784 self.assertRaises(TypeError, bufio.writelines, None)
1785 self.assertRaises(TypeError, bufio.writelines, 'abc')
1786
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 def test_destructor(self):
1788 writer = self.MockRawIO()
1789 bufio = self.tp(writer, 8)
1790 bufio.write(b"abc")
1791 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001792 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001793 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794
1795 def test_truncate(self):
1796 # Truncate implicitly flushes the buffer.
Hai Shi883bc632020-07-06 17:12:49 +08001797 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1798 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 bufio = self.tp(raw, 8)
1800 bufio.write(b"abcdef")
1801 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001802 self.assertEqual(bufio.tell(), 6)
Hai Shi883bc632020-07-06 17:12:49 +08001803 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001804 self.assertEqual(f.read(), b"abc")
1805
Nitish Chandra059f58c2018-01-28 21:30:09 +05301806 def test_truncate_after_write(self):
1807 # Ensure that truncate preserves the file position after
1808 # writes longer than the buffer size.
1809 # Issue: https://bugs.python.org/issue32228
Hai Shi883bc632020-07-06 17:12:49 +08001810 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1811 with self.open(os_helper.TESTFN, "wb") as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301812 # Fill with some buffer
1813 f.write(b'\x00' * 10000)
1814 buffer_sizes = [8192, 4096, 200]
1815 for buffer_size in buffer_sizes:
Hai Shi883bc632020-07-06 17:12:49 +08001816 with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301817 f.write(b'\x00' * (buffer_size + 1))
1818 # After write write_pos and write_end are set to 0
1819 f.read(1)
1820 # read operation makes sure that pos != raw_pos
1821 f.truncate()
1822 self.assertEqual(f.tell(), buffer_size + 2)
1823
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001824 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001826 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001827 # Write out many bytes from many threads and test they were
1828 # all flushed.
1829 N = 1000
1830 contents = bytes(range(256)) * N
1831 sizes = cycle([1, 19])
1832 n = 0
1833 queue = deque()
1834 while n < len(contents):
1835 size = next(sizes)
1836 queue.append(contents[n:n+size])
1837 n += size
1838 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001839 # We use a real file object because it allows us to
1840 # exercise situations where the GIL is released before
1841 # writing the buffer to the raw streams. This is in addition
1842 # to concurrency issues due to switching threads in the middle
1843 # of Python code.
Hai Shi883bc632020-07-06 17:12:49 +08001844 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001846 errors = []
1847 def f():
1848 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001849 while True:
1850 try:
1851 s = queue.popleft()
1852 except IndexError:
1853 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001854 bufio.write(s)
1855 except Exception as e:
1856 errors.append(e)
1857 raise
1858 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001859 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001860 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001861 self.assertFalse(errors,
1862 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 bufio.close()
Hai Shi883bc632020-07-06 17:12:49 +08001864 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 s = f.read()
1866 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001867 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001868 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001869 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 def test_misbehaved_io(self):
1872 rawio = self.MisbehavedRawIO()
1873 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001874 self.assertRaises(OSError, bufio.seek, 0)
1875 self.assertRaises(OSError, bufio.tell)
1876 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877
Victor Stinnerb589cef2019-06-11 03:10:59 +02001878 # Silence destructor error
1879 bufio.close = lambda: None
1880
Florent Xicluna109d5732012-07-07 17:03:22 +02001881 def test_max_buffer_size_removal(self):
1882 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001883 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001884
Benjamin Peterson68623612012-12-20 11:53:11 -06001885 def test_write_error_on_close(self):
1886 raw = self.MockRawIO()
1887 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001888 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001889 raw.write = bad_write
1890 b = self.tp(raw)
1891 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001892 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001893 self.assertTrue(b.closed)
1894
benfogle9703f092017-11-10 16:03:40 -05001895 def test_slow_close_from_thread(self):
1896 # Issue #31976
1897 rawio = self.SlowFlushRawIO()
1898 bufio = self.tp(rawio, 8)
1899 t = threading.Thread(target=bufio.close)
1900 t.start()
1901 rawio.in_flush.wait()
1902 self.assertRaises(ValueError, bufio.write, b'spam')
1903 self.assertTrue(bufio.closed)
1904 t.join()
1905
1906
Benjamin Peterson59406a92009-03-26 17:10:29 +00001907
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001908class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001909 tp = io.BufferedWriter
1910
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001911 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1912 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 def test_constructor(self):
1914 BufferedWriterTest.test_constructor(self)
1915 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001916 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 if sys.maxsize > 0x7FFFFFFF:
1918 rawio = self.MockRawIO()
1919 bufio = self.tp(rawio)
1920 self.assertRaises((OverflowError, MemoryError, ValueError),
1921 bufio.__init__, rawio, sys.maxsize)
1922
1923 def test_initialization(self):
1924 rawio = self.MockRawIO()
1925 bufio = self.tp(rawio)
1926 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1927 self.assertRaises(ValueError, bufio.write, b"def")
1928 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1929 self.assertRaises(ValueError, bufio.write, b"def")
1930 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1931 self.assertRaises(ValueError, bufio.write, b"def")
1932
1933 def test_garbage_collection(self):
1934 # C BufferedWriter objects are collected, and collecting them flushes
1935 # all data to disk.
1936 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001937 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1938 with warnings_helper.check_warnings(('', ResourceWarning)):
1939 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001940 f = self.tp(rawio)
1941 f.write(b"123xxx")
1942 f.x = f
1943 wr = weakref.ref(f)
1944 del f
1945 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001946 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08001947 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 self.assertEqual(f.read(), b"123xxx")
1949
R David Murray67bfe802013-02-23 21:51:05 -05001950 def test_args_error(self):
1951 # Issue #17275
1952 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1953 self.tp(io.BytesIO(), 1024, 1024, 1024)
1954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955
1956class PyBufferedWriterTest(BufferedWriterTest):
1957 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001958
Guido van Rossum01a27522007-03-07 01:00:12 +00001959class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001960
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001961 def test_constructor(self):
1962 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001963 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001964
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001965 def test_uninitialized(self):
1966 pair = self.tp.__new__(self.tp)
1967 del pair
1968 pair = self.tp.__new__(self.tp)
1969 self.assertRaisesRegex((ValueError, AttributeError),
1970 'uninitialized|has no attribute',
1971 pair.read, 0)
1972 self.assertRaisesRegex((ValueError, AttributeError),
1973 'uninitialized|has no attribute',
1974 pair.write, b'')
1975 pair.__init__(self.MockRawIO(), self.MockRawIO())
1976 self.assertEqual(pair.read(0), b'')
1977 self.assertEqual(pair.write(b''), 0)
1978
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001979 def test_detach(self):
1980 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1981 self.assertRaises(self.UnsupportedOperation, pair.detach)
1982
Florent Xicluna109d5732012-07-07 17:03:22 +02001983 def test_constructor_max_buffer_size_removal(self):
1984 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001985 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001986
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001987 def test_constructor_with_not_readable(self):
1988 class NotReadable(MockRawIO):
1989 def readable(self):
1990 return False
1991
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001992 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001993
1994 def test_constructor_with_not_writeable(self):
1995 class NotWriteable(MockRawIO):
1996 def writable(self):
1997 return False
1998
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001999 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002000
2001 def test_read(self):
2002 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2003
2004 self.assertEqual(pair.read(3), b"abc")
2005 self.assertEqual(pair.read(1), b"d")
2006 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002007 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2008 self.assertEqual(pair.read(None), b"abc")
2009
2010 def test_readlines(self):
2011 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2012 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2013 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2014 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002015
2016 def test_read1(self):
2017 # .read1() is delegated to the underlying reader object, so this test
2018 # can be shallow.
2019 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2020
2021 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00002022 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002023
2024 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00002025 for method in ("readinto", "readinto1"):
2026 with self.subTest(method):
2027 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002028
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002029 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002030 self.assertEqual(getattr(pair, method)(data), 5)
2031 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002032
2033 def test_write(self):
2034 w = self.MockRawIO()
2035 pair = self.tp(self.MockRawIO(), w)
2036
2037 pair.write(b"abc")
2038 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002039 buffer = bytearray(b"def")
2040 pair.write(buffer)
2041 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002042 pair.flush()
2043 self.assertEqual(w._write_stack, [b"abc", b"def"])
2044
2045 def test_peek(self):
2046 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2047
2048 self.assertTrue(pair.peek(3).startswith(b"abc"))
2049 self.assertEqual(pair.read(3), b"abc")
2050
2051 def test_readable(self):
2052 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2053 self.assertTrue(pair.readable())
2054
2055 def test_writeable(self):
2056 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2057 self.assertTrue(pair.writable())
2058
2059 def test_seekable(self):
2060 # BufferedRWPairs are never seekable, even if their readers and writers
2061 # are.
2062 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2063 self.assertFalse(pair.seekable())
2064
2065 # .flush() is delegated to the underlying writer object and has been
2066 # tested in the test_write method.
2067
2068 def test_close_and_closed(self):
2069 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2070 self.assertFalse(pair.closed)
2071 pair.close()
2072 self.assertTrue(pair.closed)
2073
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002074 def test_reader_close_error_on_close(self):
2075 def reader_close():
2076 reader_non_existing
2077 reader = self.MockRawIO()
2078 reader.close = reader_close
2079 writer = self.MockRawIO()
2080 pair = self.tp(reader, writer)
2081 with self.assertRaises(NameError) as err:
2082 pair.close()
2083 self.assertIn('reader_non_existing', str(err.exception))
2084 self.assertTrue(pair.closed)
2085 self.assertFalse(reader.closed)
2086 self.assertTrue(writer.closed)
2087
Victor Stinner472f7942019-04-12 21:58:24 +02002088 # Silence destructor error
2089 reader.close = lambda: None
2090
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002091 def test_writer_close_error_on_close(self):
2092 def writer_close():
2093 writer_non_existing
2094 reader = self.MockRawIO()
2095 writer = self.MockRawIO()
2096 writer.close = writer_close
2097 pair = self.tp(reader, writer)
2098 with self.assertRaises(NameError) as err:
2099 pair.close()
2100 self.assertIn('writer_non_existing', str(err.exception))
2101 self.assertFalse(pair.closed)
2102 self.assertTrue(reader.closed)
2103 self.assertFalse(writer.closed)
2104
Victor Stinner472f7942019-04-12 21:58:24 +02002105 # Silence destructor error
2106 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002107 writer = None
2108
Victor Stinner212646c2019-06-14 18:03:22 +02002109 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002110 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002111 # Ignore BufferedRWPair unraisable exception
2112 with support.catch_unraisable_exception():
2113 pair = None
2114 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002115 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002116
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002117 def test_reader_writer_close_error_on_close(self):
2118 def reader_close():
2119 reader_non_existing
2120 def writer_close():
2121 writer_non_existing
2122 reader = self.MockRawIO()
2123 reader.close = reader_close
2124 writer = self.MockRawIO()
2125 writer.close = writer_close
2126 pair = self.tp(reader, writer)
2127 with self.assertRaises(NameError) as err:
2128 pair.close()
2129 self.assertIn('reader_non_existing', str(err.exception))
2130 self.assertIsInstance(err.exception.__context__, NameError)
2131 self.assertIn('writer_non_existing', str(err.exception.__context__))
2132 self.assertFalse(pair.closed)
2133 self.assertFalse(reader.closed)
2134 self.assertFalse(writer.closed)
2135
Victor Stinner472f7942019-04-12 21:58:24 +02002136 # Silence destructor error
2137 reader.close = lambda: None
2138 writer.close = lambda: None
2139
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002140 def test_isatty(self):
2141 class SelectableIsAtty(MockRawIO):
2142 def __init__(self, isatty):
2143 MockRawIO.__init__(self)
2144 self._isatty = isatty
2145
2146 def isatty(self):
2147 return self._isatty
2148
2149 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2150 self.assertFalse(pair.isatty())
2151
2152 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2153 self.assertTrue(pair.isatty())
2154
2155 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2156 self.assertTrue(pair.isatty())
2157
2158 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2159 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002160
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002161 def test_weakref_clearing(self):
2162 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2163 ref = weakref.ref(brw)
2164 brw = None
2165 ref = None # Shouldn't segfault.
2166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167class CBufferedRWPairTest(BufferedRWPairTest):
2168 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170class PyBufferedRWPairTest(BufferedRWPairTest):
2171 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173
2174class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2175 read_mode = "rb+"
2176 write_mode = "wb+"
2177
2178 def test_constructor(self):
2179 BufferedReaderTest.test_constructor(self)
2180 BufferedWriterTest.test_constructor(self)
2181
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002182 def test_uninitialized(self):
2183 BufferedReaderTest.test_uninitialized(self)
2184 BufferedWriterTest.test_uninitialized(self)
2185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002186 def test_read_and_write(self):
2187 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002188 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002189
2190 self.assertEqual(b"as", rw.read(2))
2191 rw.write(b"ddd")
2192 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002193 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 def test_seek_and_tell(self):
2198 raw = self.BytesIO(b"asdfghjkl")
2199 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002200
Ezio Melottib3aedd42010-11-20 19:04:17 +00002201 self.assertEqual(b"as", rw.read(2))
2202 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002203 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002205
Antoine Pitroue05565e2011-08-20 14:39:23 +02002206 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002207 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002208 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002209 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002210 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002211 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002212 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(7, rw.tell())
2214 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002215 rw.flush()
2216 self.assertEqual(b"asdf123fl", raw.getvalue())
2217
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002218 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002219
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220 def check_flush_and_read(self, read_func):
2221 raw = self.BytesIO(b"abcdefghi")
2222 bufio = self.tp(raw)
2223
Ezio Melottib3aedd42010-11-20 19:04:17 +00002224 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002226 self.assertEqual(b"ef", read_func(bufio, 2))
2227 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002229 self.assertEqual(6, bufio.tell())
2230 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 raw.seek(0, 0)
2232 raw.write(b"XYZ")
2233 # flush() resets the read buffer
2234 bufio.flush()
2235 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002237
2238 def test_flush_and_read(self):
2239 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2240
2241 def test_flush_and_readinto(self):
2242 def _readinto(bufio, n=-1):
2243 b = bytearray(n if n >= 0 else 9999)
2244 n = bufio.readinto(b)
2245 return bytes(b[:n])
2246 self.check_flush_and_read(_readinto)
2247
2248 def test_flush_and_peek(self):
2249 def _peek(bufio, n=-1):
2250 # This relies on the fact that the buffer can contain the whole
2251 # raw stream, otherwise peek() can return less.
2252 b = bufio.peek(n)
2253 if n != -1:
2254 b = b[:n]
2255 bufio.seek(len(b), 1)
2256 return b
2257 self.check_flush_and_read(_peek)
2258
2259 def test_flush_and_write(self):
2260 raw = self.BytesIO(b"abcdefghi")
2261 bufio = self.tp(raw)
2262
2263 bufio.write(b"123")
2264 bufio.flush()
2265 bufio.write(b"45")
2266 bufio.flush()
2267 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(b"12345fghi", raw.getvalue())
2269 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002270
2271 def test_threads(self):
2272 BufferedReaderTest.test_threads(self)
2273 BufferedWriterTest.test_threads(self)
2274
2275 def test_writes_and_peek(self):
2276 def _peek(bufio):
2277 bufio.peek(1)
2278 self.check_writes(_peek)
2279 def _peek(bufio):
2280 pos = bufio.tell()
2281 bufio.seek(-1, 1)
2282 bufio.peek(1)
2283 bufio.seek(pos, 0)
2284 self.check_writes(_peek)
2285
2286 def test_writes_and_reads(self):
2287 def _read(bufio):
2288 bufio.seek(-1, 1)
2289 bufio.read(1)
2290 self.check_writes(_read)
2291
2292 def test_writes_and_read1s(self):
2293 def _read1(bufio):
2294 bufio.seek(-1, 1)
2295 bufio.read1(1)
2296 self.check_writes(_read1)
2297
2298 def test_writes_and_readintos(self):
2299 def _read(bufio):
2300 bufio.seek(-1, 1)
2301 bufio.readinto(bytearray(1))
2302 self.check_writes(_read)
2303
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002304 def test_write_after_readahead(self):
2305 # Issue #6629: writing after the buffer was filled by readahead should
2306 # first rewind the raw stream.
2307 for overwrite_size in [1, 5]:
2308 raw = self.BytesIO(b"A" * 10)
2309 bufio = self.tp(raw, 4)
2310 # Trigger readahead
2311 self.assertEqual(bufio.read(1), b"A")
2312 self.assertEqual(bufio.tell(), 1)
2313 # Overwriting should rewind the raw stream if it needs so
2314 bufio.write(b"B" * overwrite_size)
2315 self.assertEqual(bufio.tell(), overwrite_size + 1)
2316 # If the write size was smaller than the buffer size, flush() and
2317 # check that rewind happens.
2318 bufio.flush()
2319 self.assertEqual(bufio.tell(), overwrite_size + 1)
2320 s = raw.getvalue()
2321 self.assertEqual(s,
2322 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2323
Antoine Pitrou7c404892011-05-13 00:13:33 +02002324 def test_write_rewind_write(self):
2325 # Various combinations of reading / writing / seeking backwards / writing again
2326 def mutate(bufio, pos1, pos2):
2327 assert pos2 >= pos1
2328 # Fill the buffer
2329 bufio.seek(pos1)
2330 bufio.read(pos2 - pos1)
2331 bufio.write(b'\x02')
2332 # This writes earlier than the previous write, but still inside
2333 # the buffer.
2334 bufio.seek(pos1)
2335 bufio.write(b'\x01')
2336
2337 b = b"\x80\x81\x82\x83\x84"
2338 for i in range(0, len(b)):
2339 for j in range(i, len(b)):
2340 raw = self.BytesIO(b)
2341 bufio = self.tp(raw, 100)
2342 mutate(bufio, i, j)
2343 bufio.flush()
2344 expected = bytearray(b)
2345 expected[j] = 2
2346 expected[i] = 1
2347 self.assertEqual(raw.getvalue(), expected,
2348 "failed result for i=%d, j=%d" % (i, j))
2349
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002350 def test_truncate_after_read_or_write(self):
2351 raw = self.BytesIO(b"A" * 10)
2352 bufio = self.tp(raw, 100)
2353 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2354 self.assertEqual(bufio.truncate(), 2)
2355 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2356 self.assertEqual(bufio.truncate(), 4)
2357
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002358 def test_misbehaved_io(self):
2359 BufferedReaderTest.test_misbehaved_io(self)
2360 BufferedWriterTest.test_misbehaved_io(self)
2361
Antoine Pitroue05565e2011-08-20 14:39:23 +02002362 def test_interleaved_read_write(self):
2363 # Test for issue #12213
2364 with self.BytesIO(b'abcdefgh') as raw:
2365 with self.tp(raw, 100) as f:
2366 f.write(b"1")
2367 self.assertEqual(f.read(1), b'b')
2368 f.write(b'2')
2369 self.assertEqual(f.read1(1), b'd')
2370 f.write(b'3')
2371 buf = bytearray(1)
2372 f.readinto(buf)
2373 self.assertEqual(buf, b'f')
2374 f.write(b'4')
2375 self.assertEqual(f.peek(1), b'h')
2376 f.flush()
2377 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2378
2379 with self.BytesIO(b'abc') as raw:
2380 with self.tp(raw, 100) as f:
2381 self.assertEqual(f.read(1), b'a')
2382 f.write(b"2")
2383 self.assertEqual(f.read(1), b'c')
2384 f.flush()
2385 self.assertEqual(raw.getvalue(), b'a2c')
2386
2387 def test_interleaved_readline_write(self):
2388 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2389 with self.tp(raw) as f:
2390 f.write(b'1')
2391 self.assertEqual(f.readline(), b'b\n')
2392 f.write(b'2')
2393 self.assertEqual(f.readline(), b'def\n')
2394 f.write(b'3')
2395 self.assertEqual(f.readline(), b'\n')
2396 f.flush()
2397 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2398
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002399 # You can't construct a BufferedRandom over a non-seekable stream.
2400 test_unseekable = None
2401
Berker Peksagfd5116c2020-02-21 20:57:26 +03002402 # writable() returns True, so there's no point to test it over
2403 # a writable stream.
2404 test_truncate_on_read_only = None
2405
R David Murray67bfe802013-02-23 21:51:05 -05002406
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002407class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002408 tp = io.BufferedRandom
2409
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002410 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2411 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002412 def test_constructor(self):
2413 BufferedRandomTest.test_constructor(self)
2414 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002415 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002416 if sys.maxsize > 0x7FFFFFFF:
2417 rawio = self.MockRawIO()
2418 bufio = self.tp(rawio)
2419 self.assertRaises((OverflowError, MemoryError, ValueError),
2420 bufio.__init__, rawio, sys.maxsize)
2421
2422 def test_garbage_collection(self):
2423 CBufferedReaderTest.test_garbage_collection(self)
2424 CBufferedWriterTest.test_garbage_collection(self)
2425
R David Murray67bfe802013-02-23 21:51:05 -05002426 def test_args_error(self):
2427 # Issue #17275
2428 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2429 self.tp(io.BytesIO(), 1024, 1024, 1024)
2430
2431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432class PyBufferedRandomTest(BufferedRandomTest):
2433 tp = pyio.BufferedRandom
2434
2435
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002436# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2437# properties:
2438# - A single output character can correspond to many bytes of input.
2439# - The number of input bytes to complete the character can be
2440# undetermined until the last input byte is received.
2441# - The number of input bytes can vary depending on previous input.
2442# - A single input byte can correspond to many characters of output.
2443# - The number of output characters can be undetermined until the
2444# last input byte is received.
2445# - The number of output characters can vary depending on previous input.
2446
2447class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2448 """
2449 For testing seek/tell behavior with a stateful, buffering decoder.
2450
2451 Input is a sequence of words. Words may be fixed-length (length set
2452 by input) or variable-length (period-terminated). In variable-length
2453 mode, extra periods are ignored. Possible words are:
2454 - 'i' followed by a number sets the input length, I (maximum 99).
2455 When I is set to 0, words are space-terminated.
2456 - 'o' followed by a number sets the output length, O (maximum 99).
2457 - Any other word is converted into a word followed by a period on
2458 the output. The output word consists of the input word truncated
2459 or padded out with hyphens to make its length equal to O. If O
2460 is 0, the word is output verbatim without truncating or padding.
2461 I and O are initially set to 1. When I changes, any buffered input is
2462 re-scanned according to the new I. EOF also terminates the last word.
2463 """
2464
2465 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002466 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002467 self.reset()
2468
2469 def __repr__(self):
2470 return '<SID %x>' % id(self)
2471
2472 def reset(self):
2473 self.i = 1
2474 self.o = 1
2475 self.buffer = bytearray()
2476
2477 def getstate(self):
2478 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2479 return bytes(self.buffer), i*100 + o
2480
2481 def setstate(self, state):
2482 buffer, io = state
2483 self.buffer = bytearray(buffer)
2484 i, o = divmod(io, 100)
2485 self.i, self.o = i ^ 1, o ^ 1
2486
2487 def decode(self, input, final=False):
2488 output = ''
2489 for b in input:
2490 if self.i == 0: # variable-length, terminated with period
2491 if b == ord('.'):
2492 if self.buffer:
2493 output += self.process_word()
2494 else:
2495 self.buffer.append(b)
2496 else: # fixed-length, terminate after self.i bytes
2497 self.buffer.append(b)
2498 if len(self.buffer) == self.i:
2499 output += self.process_word()
2500 if final and self.buffer: # EOF terminates the last word
2501 output += self.process_word()
2502 return output
2503
2504 def process_word(self):
2505 output = ''
2506 if self.buffer[0] == ord('i'):
2507 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2508 elif self.buffer[0] == ord('o'):
2509 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2510 else:
2511 output = self.buffer.decode('ascii')
2512 if len(output) < self.o:
2513 output += '-'*self.o # pad out with hyphens
2514 if self.o:
2515 output = output[:self.o] # truncate to output length
2516 output += '.'
2517 self.buffer = bytearray()
2518 return output
2519
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002520 codecEnabled = False
2521
2522 @classmethod
2523 def lookupTestDecoder(cls, name):
2524 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002525 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002526 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002527 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002528 incrementalencoder=None,
2529 streamreader=None, streamwriter=None,
2530 incrementaldecoder=cls)
2531
2532# Register the previous decoder for testing.
2533# Disabled by default, tests will enable it.
2534codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2535
2536
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002537class StatefulIncrementalDecoderTest(unittest.TestCase):
2538 """
2539 Make sure the StatefulIncrementalDecoder actually works.
2540 """
2541
2542 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002543 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002544 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002545 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002546 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002547 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002548 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002549 # I=0, O=6 (variable-length input, fixed-length output)
2550 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2551 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002552 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002553 # I=6, O=3 (fixed-length input > fixed-length output)
2554 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2555 # I=0, then 3; O=29, then 15 (with longer output)
2556 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2557 'a----------------------------.' +
2558 'b----------------------------.' +
2559 'cde--------------------------.' +
2560 'abcdefghijabcde.' +
2561 'a.b------------.' +
2562 '.c.------------.' +
2563 'd.e------------.' +
2564 'k--------------.' +
2565 'l--------------.' +
2566 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002567 ]
2568
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002569 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002570 # Try a few one-shot test cases.
2571 for input, eof, output in self.test_cases:
2572 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002573 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002574
2575 # Also test an unfinished decode, followed by forcing EOF.
2576 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002577 self.assertEqual(d.decode(b'oiabcd'), '')
2578 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002579
2580class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002581
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002582 def setUp(self):
2583 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2584 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Hai Shi883bc632020-07-06 17:12:49 +08002585 os_helper.unlink(os_helper.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002586
Guido van Rossumd0712812007-04-11 16:32:43 +00002587 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08002588 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002589
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002590 def test_constructor(self):
2591 r = self.BytesIO(b"\xc3\xa9\n\n")
2592 b = self.BufferedReader(r, 1000)
2593 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002594 t.__init__(b, encoding="latin-1", newline="\r\n")
2595 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002596 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002597 t.__init__(b, encoding="utf-8", line_buffering=True)
2598 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002599 self.assertEqual(t.line_buffering, True)
2600 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002601 self.assertRaises(TypeError, t.__init__, b, newline=42)
2602 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2603
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002604 def test_uninitialized(self):
2605 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2606 del t
2607 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2608 self.assertRaises(Exception, repr, t)
2609 self.assertRaisesRegex((ValueError, AttributeError),
2610 'uninitialized|has no attribute',
2611 t.read, 0)
2612 t.__init__(self.MockRawIO())
2613 self.assertEqual(t.read(0), '')
2614
Nick Coghlana9b15242014-02-04 22:11:18 +10002615 def test_non_text_encoding_codecs_are_rejected(self):
2616 # Ensure the constructor complains if passed a codec that isn't
2617 # marked as a text encoding
2618 # http://bugs.python.org/issue20404
2619 r = self.BytesIO()
2620 b = self.BufferedWriter(r)
2621 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2622 self.TextIOWrapper(b, encoding="hex")
2623
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002624 def test_detach(self):
2625 r = self.BytesIO()
2626 b = self.BufferedWriter(r)
2627 t = self.TextIOWrapper(b)
2628 self.assertIs(t.detach(), b)
2629
2630 t = self.TextIOWrapper(b, encoding="ascii")
2631 t.write("howdy")
2632 self.assertFalse(r.getvalue())
2633 t.detach()
2634 self.assertEqual(r.getvalue(), b"howdy")
2635 self.assertRaises(ValueError, t.detach)
2636
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002637 # Operations independent of the detached stream should still work
2638 repr(t)
2639 self.assertEqual(t.encoding, "ascii")
2640 self.assertEqual(t.errors, "strict")
2641 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002642 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002643
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002644 def test_repr(self):
2645 raw = self.BytesIO("hello".encode("utf-8"))
2646 b = self.BufferedReader(raw)
2647 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002648 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002649 self.assertRegex(repr(t),
2650 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002651 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002652 self.assertRegex(repr(t),
2653 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002654 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002655 self.assertRegex(repr(t),
2656 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002657 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002658 self.assertRegex(repr(t),
2659 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002660
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002661 t.buffer.detach()
2662 repr(t) # Should not raise an exception
2663
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002664 def test_recursive_repr(self):
2665 # Issue #25455
2666 raw = self.BytesIO()
2667 t = self.TextIOWrapper(raw)
2668 with support.swap_attr(raw, 'name', t):
2669 try:
2670 repr(t) # Should not crash
2671 except RuntimeError:
2672 pass
2673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 def test_line_buffering(self):
2675 r = self.BytesIO()
2676 b = self.BufferedWriter(r, 1000)
2677 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002678 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002679 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002680 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002681 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002682 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002683 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002684
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002685 def test_reconfigure_line_buffering(self):
2686 r = self.BytesIO()
2687 b = self.BufferedWriter(r, 1000)
2688 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2689 t.write("AB\nC")
2690 self.assertEqual(r.getvalue(), b"")
2691
2692 t.reconfigure(line_buffering=True) # implicit flush
2693 self.assertEqual(r.getvalue(), b"AB\nC")
2694 t.write("DEF\nG")
2695 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2696 t.write("H")
2697 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2698 t.reconfigure(line_buffering=False) # implicit flush
2699 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2700 t.write("IJ")
2701 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2702
2703 # Keeping default value
2704 t.reconfigure()
2705 t.reconfigure(line_buffering=None)
2706 self.assertEqual(t.line_buffering, False)
2707 t.reconfigure(line_buffering=True)
2708 t.reconfigure()
2709 t.reconfigure(line_buffering=None)
2710 self.assertEqual(t.line_buffering, True)
2711
Victor Stinner91106cd2017-12-13 12:29:09 +01002712 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002713 def test_default_encoding(self):
2714 old_environ = dict(os.environ)
2715 try:
2716 # try to get a user preferred encoding different than the current
2717 # locale encoding to check that TextIOWrapper() uses the current
2718 # locale encoding and not the user preferred encoding
2719 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2720 if key in os.environ:
2721 del os.environ[key]
2722
2723 current_locale_encoding = locale.getpreferredencoding(False)
2724 b = self.BytesIO()
2725 t = self.TextIOWrapper(b)
2726 self.assertEqual(t.encoding, current_locale_encoding)
2727 finally:
2728 os.environ.clear()
2729 os.environ.update(old_environ)
2730
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002731 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002732 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002733 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002734 # Issue 15989
2735 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002736 b = self.BytesIO()
2737 b.fileno = lambda: _testcapi.INT_MAX + 1
2738 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2739 b.fileno = lambda: _testcapi.UINT_MAX + 1
2740 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2741
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002742 def test_encoding(self):
2743 # Check the encoding attribute is always set, and valid
2744 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002745 t = self.TextIOWrapper(b, encoding="utf-8")
2746 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002747 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002748 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002749 codecs.lookup(t.encoding)
2750
2751 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002752 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002753 b = self.BytesIO(b"abc\n\xff\n")
2754 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002755 self.assertRaises(UnicodeError, t.read)
2756 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002757 b = self.BytesIO(b"abc\n\xff\n")
2758 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002759 self.assertRaises(UnicodeError, t.read)
2760 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002761 b = self.BytesIO(b"abc\n\xff\n")
2762 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002763 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002764 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002765 b = self.BytesIO(b"abc\n\xff\n")
2766 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002767 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002769 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002770 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002771 b = self.BytesIO()
2772 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002773 self.assertRaises(UnicodeError, t.write, "\xff")
2774 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002775 b = self.BytesIO()
2776 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002777 self.assertRaises(UnicodeError, t.write, "\xff")
2778 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002779 b = self.BytesIO()
2780 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002781 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002782 t.write("abc\xffdef\n")
2783 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002784 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002785 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 b = self.BytesIO()
2787 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002788 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002789 t.write("abc\xffdef\n")
2790 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002791 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002792
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002794 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2795
2796 tests = [
2797 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002798 [ '', input_lines ],
2799 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2800 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2801 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002802 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002803 encodings = (
2804 'utf-8', 'latin-1',
2805 'utf-16', 'utf-16-le', 'utf-16-be',
2806 'utf-32', 'utf-32-le', 'utf-32-be',
2807 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002808
Guido van Rossum8358db22007-08-18 21:39:55 +00002809 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002810 # character in TextIOWrapper._pending_line.
2811 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002812 # XXX: str.encode() should return bytes
2813 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002814 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002815 for bufsize in range(1, 10):
2816 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002817 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2818 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002819 encoding=encoding)
2820 if do_reads:
2821 got_lines = []
2822 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002823 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002824 if c2 == '':
2825 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002826 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002827 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002828 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002829 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002830
2831 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002832 self.assertEqual(got_line, exp_line)
2833 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002835 def test_newlines_input(self):
2836 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002837 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2838 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002839 (None, normalized.decode("ascii").splitlines(keepends=True)),
2840 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002841 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2842 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2843 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002844 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002845 buf = self.BytesIO(testdata)
2846 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002847 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002848 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002849 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002851 def test_newlines_output(self):
2852 testdict = {
2853 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2854 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2855 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2856 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2857 }
2858 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2859 for newline, expected in tests:
2860 buf = self.BytesIO()
2861 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2862 txt.write("AAA\nB")
2863 txt.write("BB\nCCC\n")
2864 txt.write("X\rY\r\nZ")
2865 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002866 self.assertEqual(buf.closed, False)
2867 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002868
2869 def test_destructor(self):
2870 l = []
2871 base = self.BytesIO
2872 class MyBytesIO(base):
2873 def close(self):
2874 l.append(self.getvalue())
2875 base.close(self)
2876 b = MyBytesIO()
2877 t = self.TextIOWrapper(b, encoding="ascii")
2878 t.write("abc")
2879 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002880 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002881 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002882
2883 def test_override_destructor(self):
2884 record = []
2885 class MyTextIO(self.TextIOWrapper):
2886 def __del__(self):
2887 record.append(1)
2888 try:
2889 f = super().__del__
2890 except AttributeError:
2891 pass
2892 else:
2893 f()
2894 def close(self):
2895 record.append(2)
2896 super().close()
2897 def flush(self):
2898 record.append(3)
2899 super().flush()
2900 b = self.BytesIO()
2901 t = MyTextIO(b, encoding="ascii")
2902 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002903 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002904 self.assertEqual(record, [1, 2, 3])
2905
2906 def test_error_through_destructor(self):
2907 # Test that the exception state is not modified by a destructor,
2908 # even if close() fails.
2909 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002910 with support.catch_unraisable_exception() as cm:
2911 with self.assertRaises(AttributeError):
2912 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002913
2914 if not IOBASE_EMITS_UNRAISABLE:
2915 self.assertIsNone(cm.unraisable)
2916 elif cm.unraisable is not None:
2917 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002918
Guido van Rossum9b76da62007-04-11 01:09:03 +00002919 # Systematic tests of the text I/O API
2920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002921 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002922 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002923 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Hai Shi883bc632020-07-06 17:12:49 +08002924 f = self.open(os_helper.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002925 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002926 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002927 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08002928 f = self.open(os_helper.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002929 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002930 self.assertEqual(f.tell(), 0)
2931 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002932 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002933 self.assertEqual(f.seek(0), 0)
2934 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002935 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002936 self.assertEqual(f.read(2), "ab")
2937 self.assertEqual(f.read(1), "c")
2938 self.assertEqual(f.read(1), "")
2939 self.assertEqual(f.read(), "")
2940 self.assertEqual(f.tell(), cookie)
2941 self.assertEqual(f.seek(0), 0)
2942 self.assertEqual(f.seek(0, 2), cookie)
2943 self.assertEqual(f.write("def"), 3)
2944 self.assertEqual(f.seek(cookie), cookie)
2945 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002946 if enc.startswith("utf"):
2947 self.multi_line_test(f, enc)
2948 f.close()
2949
2950 def multi_line_test(self, f, enc):
2951 f.seek(0)
2952 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002953 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002954 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002955 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002956 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002957 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002958 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002959 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002960 wlines.append((f.tell(), line))
2961 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002962 f.seek(0)
2963 rlines = []
2964 while True:
2965 pos = f.tell()
2966 line = f.readline()
2967 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002968 break
2969 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002970 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002972 def test_telling(self):
Hai Shi883bc632020-07-06 17:12:49 +08002973 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002974 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002975 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002976 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002977 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002978 p2 = f.tell()
2979 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002980 self.assertEqual(f.tell(), p0)
2981 self.assertEqual(f.readline(), "\xff\n")
2982 self.assertEqual(f.tell(), p1)
2983 self.assertEqual(f.readline(), "\xff\n")
2984 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002985 f.seek(0)
2986 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002987 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002988 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002989 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002990 f.close()
2991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002992 def test_seeking(self):
2993 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002994 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002995 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002996 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002997 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002998 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002999 suffix = bytes(u_suffix.encode("utf-8"))
3000 line = prefix + suffix
Hai Shi883bc632020-07-06 17:12:49 +08003001 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003002 f.write(line*2)
Hai Shi883bc632020-07-06 17:12:49 +08003003 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003004 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003005 self.assertEqual(s, str(prefix, "ascii"))
3006 self.assertEqual(f.tell(), prefix_size)
3007 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00003008
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003009 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00003010 # Regression test for a specific bug
3011 data = b'\xe0\xbf\xbf\n'
Hai Shi883bc632020-07-06 17:12:49 +08003012 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003013 f.write(data)
Hai Shi883bc632020-07-06 17:12:49 +08003014 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003015 f._CHUNK_SIZE # Just test that it exists
3016 f._CHUNK_SIZE = 2
3017 f.readline()
3018 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003020 def test_seek_and_tell(self):
3021 #Test seek/tell using the StatefulIncrementalDecoder.
3022 # Make test faster by doing smaller seeks
3023 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003024
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003025 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003026 """Tell/seek to various points within a data stream and ensure
3027 that the decoded data returned by read() is consistent."""
Hai Shi883bc632020-07-06 17:12:49 +08003028 f = self.open(os_helper.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003029 f.write(data)
3030 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08003031 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003032 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003033 decoded = f.read()
3034 f.close()
3035
Neal Norwitze2b07052008-03-18 19:52:05 +00003036 for i in range(min_pos, len(decoded) + 1): # seek positions
3037 for j in [1, 5, len(decoded) - i]: # read lengths
Hai Shi883bc632020-07-06 17:12:49 +08003038 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003039 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003040 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003041 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003042 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003043 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003044 f.close()
3045
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003046 # Enable the test decoder.
3047 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003048
3049 # Run the tests.
3050 try:
3051 # Try each test case.
3052 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003053 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003054
3055 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003056 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3057 offset = CHUNK_SIZE - len(input)//2
3058 prefix = b'.'*offset
3059 # Don't bother seeking into the prefix (takes too long).
3060 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003061 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003062
3063 # Ensure our test decoder won't interfere with subsequent tests.
3064 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003065 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003066
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003067 def test_multibyte_seek_and_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +08003068 f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003069 f.write("AB\n\u3046\u3048\n")
3070 f.close()
3071
Hai Shi883bc632020-07-06 17:12:49 +08003072 f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003073 self.assertEqual(f.readline(), "AB\n")
3074 p0 = f.tell()
3075 self.assertEqual(f.readline(), "\u3046\u3048\n")
3076 p1 = f.tell()
3077 f.seek(p0)
3078 self.assertEqual(f.readline(), "\u3046\u3048\n")
3079 self.assertEqual(f.tell(), p1)
3080 f.close()
3081
3082 def test_seek_with_encoder_state(self):
Hai Shi883bc632020-07-06 17:12:49 +08003083 f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003084 f.write("\u00e6\u0300")
3085 p0 = f.tell()
3086 f.write("\u00e6")
3087 f.seek(p0)
3088 f.write("\u0300")
3089 f.close()
3090
Hai Shi883bc632020-07-06 17:12:49 +08003091 f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003092 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3093 f.close()
3094
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003095 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003096 data = "1234567890"
3097 tests = ("utf-16",
3098 "utf-16-le",
3099 "utf-16-be",
3100 "utf-32",
3101 "utf-32-le",
3102 "utf-32-be")
3103 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003104 buf = self.BytesIO()
3105 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003106 # Check if the BOM is written only once (see issue1753).
3107 f.write(data)
3108 f.write(data)
3109 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003110 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003111 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003112 self.assertEqual(f.read(), data * 2)
3113 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003114
Benjamin Petersona1b49012009-03-31 23:11:32 +00003115 def test_unreadable(self):
3116 class UnReadable(self.BytesIO):
3117 def readable(self):
3118 return False
3119 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003120 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003121
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003122 def test_read_one_by_one(self):
3123 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003124 reads = ""
3125 while True:
3126 c = txt.read(1)
3127 if not c:
3128 break
3129 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003130 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003131
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003132 def test_readlines(self):
3133 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3134 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3135 txt.seek(0)
3136 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3137 txt.seek(0)
3138 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3139
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003140 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003141 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003142 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003143 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003144 reads = ""
3145 while True:
3146 c = txt.read(128)
3147 if not c:
3148 break
3149 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003150 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003151
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003152 def test_writelines(self):
3153 l = ['ab', 'cd', 'ef']
3154 buf = self.BytesIO()
3155 txt = self.TextIOWrapper(buf)
3156 txt.writelines(l)
3157 txt.flush()
3158 self.assertEqual(buf.getvalue(), b'abcdef')
3159
3160 def test_writelines_userlist(self):
3161 l = UserList(['ab', 'cd', 'ef'])
3162 buf = self.BytesIO()
3163 txt = self.TextIOWrapper(buf)
3164 txt.writelines(l)
3165 txt.flush()
3166 self.assertEqual(buf.getvalue(), b'abcdef')
3167
3168 def test_writelines_error(self):
3169 txt = self.TextIOWrapper(self.BytesIO())
3170 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3171 self.assertRaises(TypeError, txt.writelines, None)
3172 self.assertRaises(TypeError, txt.writelines, b'abc')
3173
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003174 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003175 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003176
3177 # read one char at a time
3178 reads = ""
3179 while True:
3180 c = txt.read(1)
3181 if not c:
3182 break
3183 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003184 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003185
3186 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003187 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003188 txt._CHUNK_SIZE = 4
3189
3190 reads = ""
3191 while True:
3192 c = txt.read(4)
3193 if not c:
3194 break
3195 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003196 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003197
3198 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003199 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003200 txt._CHUNK_SIZE = 4
3201
3202 reads = txt.read(4)
3203 reads += txt.read(4)
3204 reads += txt.readline()
3205 reads += txt.readline()
3206 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003207 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003208
3209 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003210 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003211 txt._CHUNK_SIZE = 4
3212
3213 reads = txt.read(4)
3214 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003215 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003216
3217 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003218 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003219 txt._CHUNK_SIZE = 4
3220
3221 reads = txt.read(4)
3222 pos = txt.tell()
3223 txt.seek(0)
3224 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003225 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003226
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003227 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003228 buffer = self.BytesIO(self.testdata)
3229 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003230
3231 self.assertEqual(buffer.seekable(), txt.seekable())
3232
Antoine Pitroue4501852009-05-14 18:55:55 +00003233 def test_append_bom(self):
3234 # The BOM is not written again when appending to a non-empty file
Hai Shi883bc632020-07-06 17:12:49 +08003235 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003236 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3237 with self.open(filename, 'w', encoding=charset) as f:
3238 f.write('aaa')
3239 pos = f.tell()
3240 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003241 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003242
3243 with self.open(filename, 'a', encoding=charset) as f:
3244 f.write('xxx')
3245 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003246 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003247
3248 def test_seek_bom(self):
3249 # Same test, but when seeking manually
Hai Shi883bc632020-07-06 17:12:49 +08003250 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003251 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3252 with self.open(filename, 'w', encoding=charset) as f:
3253 f.write('aaa')
3254 pos = f.tell()
3255 with self.open(filename, 'r+', encoding=charset) as f:
3256 f.seek(pos)
3257 f.write('zzz')
3258 f.seek(0)
3259 f.write('bbb')
3260 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003261 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003262
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003263 def test_seek_append_bom(self):
3264 # Same test, but first seek to the start and then to the end
Hai Shi883bc632020-07-06 17:12:49 +08003265 filename = os_helper.TESTFN
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003266 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3267 with self.open(filename, 'w', encoding=charset) as f:
3268 f.write('aaa')
3269 with self.open(filename, 'a', encoding=charset) as f:
3270 f.seek(0)
3271 f.seek(0, self.SEEK_END)
3272 f.write('xxx')
3273 with self.open(filename, 'rb') as f:
3274 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3275
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003276 def test_errors_property(self):
Hai Shi883bc632020-07-06 17:12:49 +08003277 with self.open(os_helper.TESTFN, "w") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003278 self.assertEqual(f.errors, "strict")
Hai Shi883bc632020-07-06 17:12:49 +08003279 with self.open(os_helper.TESTFN, "w", errors="replace") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003280 self.assertEqual(f.errors, "replace")
3281
Brett Cannon31f59292011-02-21 19:29:56 +00003282 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003283 def test_threads_write(self):
3284 # Issue6750: concurrent writes could duplicate data
3285 event = threading.Event()
Hai Shi883bc632020-07-06 17:12:49 +08003286 with self.open(os_helper.TESTFN, "w", buffering=1) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003287 def run(n):
3288 text = "Thread%03d\n" % n
3289 event.wait()
3290 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003291 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003292 for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08003293 with threading_helper.start_threads(threads, event.set):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003294 time.sleep(0.02)
Hai Shi883bc632020-07-06 17:12:49 +08003295 with self.open(os_helper.TESTFN) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003296 content = f.read()
3297 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003298 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003299
Antoine Pitrou6be88762010-05-03 16:48:20 +00003300 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003301 # Test that text file is closed despite failed flush
3302 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003303 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003304 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003305 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003306 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003307 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003308 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003309 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003310 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003311 self.assertTrue(txt.buffer.closed)
3312 self.assertTrue(closed) # flush() called
3313 self.assertFalse(closed[0]) # flush() called before file closed
3314 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003315 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003316
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003317 def test_close_error_on_close(self):
3318 buffer = self.BytesIO(self.testdata)
3319 def bad_flush():
3320 raise OSError('flush')
3321 def bad_close():
3322 raise OSError('close')
3323 buffer.close = bad_close
3324 txt = self.TextIOWrapper(buffer, encoding="ascii")
3325 txt.flush = bad_flush
3326 with self.assertRaises(OSError) as err: # exception not swallowed
3327 txt.close()
3328 self.assertEqual(err.exception.args, ('close',))
3329 self.assertIsInstance(err.exception.__context__, OSError)
3330 self.assertEqual(err.exception.__context__.args, ('flush',))
3331 self.assertFalse(txt.closed)
3332
Victor Stinner472f7942019-04-12 21:58:24 +02003333 # Silence destructor error
3334 buffer.close = lambda: None
3335 txt.flush = lambda: None
3336
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003337 def test_nonnormalized_close_error_on_close(self):
3338 # Issue #21677
3339 buffer = self.BytesIO(self.testdata)
3340 def bad_flush():
3341 raise non_existing_flush
3342 def bad_close():
3343 raise non_existing_close
3344 buffer.close = bad_close
3345 txt = self.TextIOWrapper(buffer, encoding="ascii")
3346 txt.flush = bad_flush
3347 with self.assertRaises(NameError) as err: # exception not swallowed
3348 txt.close()
3349 self.assertIn('non_existing_close', str(err.exception))
3350 self.assertIsInstance(err.exception.__context__, NameError)
3351 self.assertIn('non_existing_flush', str(err.exception.__context__))
3352 self.assertFalse(txt.closed)
3353
Victor Stinner472f7942019-04-12 21:58:24 +02003354 # Silence destructor error
3355 buffer.close = lambda: None
3356 txt.flush = lambda: None
3357
Antoine Pitrou6be88762010-05-03 16:48:20 +00003358 def test_multi_close(self):
3359 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3360 txt.close()
3361 txt.close()
3362 txt.close()
3363 self.assertRaises(ValueError, txt.flush)
3364
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003365 def test_unseekable(self):
3366 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3367 self.assertRaises(self.UnsupportedOperation, txt.tell)
3368 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3369
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003370 def test_readonly_attributes(self):
3371 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3372 buf = self.BytesIO(self.testdata)
3373 with self.assertRaises(AttributeError):
3374 txt.buffer = buf
3375
Antoine Pitroue96ec682011-07-23 21:46:35 +02003376 def test_rawio(self):
3377 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3378 # that subprocess.Popen() can have the required unbuffered
3379 # semantics with universal_newlines=True.
3380 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3381 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3382 # Reads
3383 self.assertEqual(txt.read(4), 'abcd')
3384 self.assertEqual(txt.readline(), 'efghi\n')
3385 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3386
3387 def test_rawio_write_through(self):
3388 # Issue #12591: with write_through=True, writes don't need a flush
3389 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3390 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3391 write_through=True)
3392 txt.write('1')
3393 txt.write('23\n4')
3394 txt.write('5')
3395 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3396
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003397 def test_bufio_write_through(self):
3398 # Issue #21396: write_through=True doesn't force a flush()
3399 # on the underlying binary buffered object.
3400 flush_called, write_called = [], []
3401 class BufferedWriter(self.BufferedWriter):
3402 def flush(self, *args, **kwargs):
3403 flush_called.append(True)
3404 return super().flush(*args, **kwargs)
3405 def write(self, *args, **kwargs):
3406 write_called.append(True)
3407 return super().write(*args, **kwargs)
3408
3409 rawio = self.BytesIO()
3410 data = b"a"
3411 bufio = BufferedWriter(rawio, len(data)*2)
3412 textio = self.TextIOWrapper(bufio, encoding='ascii',
3413 write_through=True)
3414 # write to the buffered io but don't overflow the buffer
3415 text = data.decode('ascii')
3416 textio.write(text)
3417
3418 # buffer.flush is not called with write_through=True
3419 self.assertFalse(flush_called)
3420 # buffer.write *is* called with write_through=True
3421 self.assertTrue(write_called)
3422 self.assertEqual(rawio.getvalue(), b"") # no flush
3423
3424 write_called = [] # reset
3425 textio.write(text * 10) # total content is larger than bufio buffer
3426 self.assertTrue(write_called)
3427 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3428
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003429 def test_reconfigure_write_through(self):
3430 raw = self.MockRawIO([])
3431 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3432 t.write('1')
3433 t.reconfigure(write_through=True) # implied flush
3434 self.assertEqual(t.write_through, True)
3435 self.assertEqual(b''.join(raw._write_stack), b'1')
3436 t.write('23')
3437 self.assertEqual(b''.join(raw._write_stack), b'123')
3438 t.reconfigure(write_through=False)
3439 self.assertEqual(t.write_through, False)
3440 t.write('45')
3441 t.flush()
3442 self.assertEqual(b''.join(raw._write_stack), b'12345')
3443 # Keeping default value
3444 t.reconfigure()
3445 t.reconfigure(write_through=None)
3446 self.assertEqual(t.write_through, False)
3447 t.reconfigure(write_through=True)
3448 t.reconfigure()
3449 t.reconfigure(write_through=None)
3450 self.assertEqual(t.write_through, True)
3451
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003452 def test_read_nonbytes(self):
3453 # Issue #17106
3454 # Crash when underlying read() returns non-bytes
3455 t = self.TextIOWrapper(self.StringIO('a'))
3456 self.assertRaises(TypeError, t.read, 1)
3457 t = self.TextIOWrapper(self.StringIO('a'))
3458 self.assertRaises(TypeError, t.readline)
3459 t = self.TextIOWrapper(self.StringIO('a'))
3460 self.assertRaises(TypeError, t.read)
3461
Oren Milmana5b4ea12017-08-25 21:14:54 +03003462 def test_illegal_encoder(self):
3463 # Issue 31271: Calling write() while the return value of encoder's
3464 # encode() is invalid shouldn't cause an assertion failure.
3465 rot13 = codecs.lookup("rot13")
3466 with support.swap_attr(rot13, '_is_text_encoding', True):
3467 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3468 self.assertRaises(TypeError, t.write, 'bar')
3469
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003470 def test_illegal_decoder(self):
3471 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003472 # Bypass the early encoding check added in issue 20404
3473 def _make_illegal_wrapper():
3474 quopri = codecs.lookup("quopri")
3475 quopri._is_text_encoding = True
3476 try:
3477 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3478 newline='\n', encoding="quopri")
3479 finally:
3480 quopri._is_text_encoding = False
3481 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003482 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003483 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003484 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003485 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003486 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003487 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003488 self.assertRaises(TypeError, t.read)
3489
Oren Milmanba7d7362017-08-29 11:58:27 +03003490 # Issue 31243: calling read() while the return value of decoder's
3491 # getstate() is invalid should neither crash the interpreter nor
3492 # raise a SystemError.
3493 def _make_very_illegal_wrapper(getstate_ret_val):
3494 class BadDecoder:
3495 def getstate(self):
3496 return getstate_ret_val
3497 def _get_bad_decoder(dummy):
3498 return BadDecoder()
3499 quopri = codecs.lookup("quopri")
3500 with support.swap_attr(quopri, 'incrementaldecoder',
3501 _get_bad_decoder):
3502 return _make_illegal_wrapper()
3503 t = _make_very_illegal_wrapper(42)
3504 self.assertRaises(TypeError, t.read, 42)
3505 t = _make_very_illegal_wrapper(())
3506 self.assertRaises(TypeError, t.read, 42)
3507 t = _make_very_illegal_wrapper((1, 2))
3508 self.assertRaises(TypeError, t.read, 42)
3509
Antoine Pitrou712cb732013-12-21 15:51:54 +01003510 def _check_create_at_shutdown(self, **kwargs):
3511 # Issue #20037: creating a TextIOWrapper at shutdown
3512 # shouldn't crash the interpreter.
3513 iomod = self.io.__name__
3514 code = """if 1:
3515 import codecs
3516 import {iomod} as io
3517
3518 # Avoid looking up codecs at shutdown
3519 codecs.lookup('utf-8')
3520
3521 class C:
3522 def __init__(self):
3523 self.buf = io.BytesIO()
3524 def __del__(self):
3525 io.TextIOWrapper(self.buf, **{kwargs})
3526 print("ok")
3527 c = C()
3528 """.format(iomod=iomod, kwargs=kwargs)
3529 return assert_python_ok("-c", code)
3530
3531 def test_create_at_shutdown_without_encoding(self):
3532 rc, out, err = self._check_create_at_shutdown()
3533 if err:
3534 # Can error out with a RuntimeError if the module state
3535 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003536 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003537 else:
3538 self.assertEqual("ok", out.decode().strip())
3539
3540 def test_create_at_shutdown_with_encoding(self):
3541 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3542 errors='strict')
3543 self.assertFalse(err)
3544 self.assertEqual("ok", out.decode().strip())
3545
Antoine Pitroub8503892014-04-29 10:14:02 +02003546 def test_read_byteslike(self):
3547 r = MemviewBytesIO(b'Just some random string\n')
3548 t = self.TextIOWrapper(r, 'utf-8')
3549
3550 # TextIOwrapper will not read the full string, because
3551 # we truncate it to a multiple of the native int size
3552 # so that we can construct a more complex memoryview.
3553 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3554
3555 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3556
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003557 def test_issue22849(self):
3558 class F(object):
3559 def readable(self): return True
3560 def writable(self): return True
3561 def seekable(self): return True
3562
3563 for i in range(10):
3564 try:
3565 self.TextIOWrapper(F(), encoding='utf-8')
3566 except Exception:
3567 pass
3568
3569 F.tell = lambda x: 0
3570 t = self.TextIOWrapper(F(), encoding='utf-8')
3571
INADA Naoki507434f2017-12-21 09:59:53 +09003572 def test_reconfigure_encoding_read(self):
3573 # latin1 -> utf8
3574 # (latin1 can decode utf-8 encoded string)
3575 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3576 raw = self.BytesIO(data)
3577 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3578 self.assertEqual(txt.readline(), 'abc\xe9\n')
3579 with self.assertRaises(self.UnsupportedOperation):
3580 txt.reconfigure(encoding='utf-8')
3581 with self.assertRaises(self.UnsupportedOperation):
3582 txt.reconfigure(newline=None)
3583
3584 def test_reconfigure_write_fromascii(self):
3585 # ascii has a specific encodefunc in the C implementation,
3586 # but utf-8-sig has not. Make sure that we get rid of the
3587 # cached encodefunc when we switch encoders.
3588 raw = self.BytesIO()
3589 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3590 txt.write('foo\n')
3591 txt.reconfigure(encoding='utf-8-sig')
3592 txt.write('\xe9\n')
3593 txt.flush()
3594 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3595
3596 def test_reconfigure_write(self):
3597 # latin -> utf8
3598 raw = self.BytesIO()
3599 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3600 txt.write('abc\xe9\n')
3601 txt.reconfigure(encoding='utf-8')
3602 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3603 txt.write('d\xe9f\n')
3604 txt.flush()
3605 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3606
3607 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3608 # the file
3609 raw = self.BytesIO()
3610 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3611 txt.write('abc\n')
3612 txt.reconfigure(encoding='utf-8-sig')
3613 txt.write('d\xe9f\n')
3614 txt.flush()
3615 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3616
3617 def test_reconfigure_write_non_seekable(self):
3618 raw = self.BytesIO()
3619 raw.seekable = lambda: False
3620 raw.seek = None
3621 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3622 txt.write('abc\n')
3623 txt.reconfigure(encoding='utf-8-sig')
3624 txt.write('d\xe9f\n')
3625 txt.flush()
3626
3627 # If the raw stream is not seekable, there'll be a BOM
3628 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3629
3630 def test_reconfigure_defaults(self):
3631 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3632 txt.reconfigure(encoding=None)
3633 self.assertEqual(txt.encoding, 'ascii')
3634 self.assertEqual(txt.errors, 'replace')
3635 txt.write('LF\n')
3636
3637 txt.reconfigure(newline='\r\n')
3638 self.assertEqual(txt.encoding, 'ascii')
3639 self.assertEqual(txt.errors, 'replace')
3640
3641 txt.reconfigure(errors='ignore')
3642 self.assertEqual(txt.encoding, 'ascii')
3643 self.assertEqual(txt.errors, 'ignore')
3644 txt.write('CRLF\n')
3645
3646 txt.reconfigure(encoding='utf-8', newline=None)
3647 self.assertEqual(txt.errors, 'strict')
3648 txt.seek(0)
3649 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3650
3651 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3652
3653 def test_reconfigure_newline(self):
3654 raw = self.BytesIO(b'CR\rEOF')
3655 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3656 txt.reconfigure(newline=None)
3657 self.assertEqual(txt.readline(), 'CR\n')
3658 raw = self.BytesIO(b'CR\rEOF')
3659 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3660 txt.reconfigure(newline='')
3661 self.assertEqual(txt.readline(), 'CR\r')
3662 raw = self.BytesIO(b'CR\rLF\nEOF')
3663 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3664 txt.reconfigure(newline='\n')
3665 self.assertEqual(txt.readline(), 'CR\rLF\n')
3666 raw = self.BytesIO(b'LF\nCR\rEOF')
3667 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3668 txt.reconfigure(newline='\r')
3669 self.assertEqual(txt.readline(), 'LF\nCR\r')
3670 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3671 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3672 txt.reconfigure(newline='\r\n')
3673 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3674
3675 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3676 txt.reconfigure(newline=None)
3677 txt.write('linesep\n')
3678 txt.reconfigure(newline='')
3679 txt.write('LF\n')
3680 txt.reconfigure(newline='\n')
3681 txt.write('LF\n')
3682 txt.reconfigure(newline='\r')
3683 txt.write('CR\n')
3684 txt.reconfigure(newline='\r\n')
3685 txt.write('CRLF\n')
3686 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3687 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3688
Zackery Spytz23db9352018-06-29 04:14:58 -06003689 def test_issue25862(self):
3690 # Assertion failures occurred in tell() after read() and write().
3691 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3692 t.read(1)
3693 t.read()
3694 t.tell()
3695 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3696 t.read(1)
3697 t.write('x')
3698 t.tell()
3699
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003700
Antoine Pitroub8503892014-04-29 10:14:02 +02003701class MemviewBytesIO(io.BytesIO):
3702 '''A BytesIO object whose read method returns memoryviews
3703 rather than bytes'''
3704
3705 def read1(self, len_):
3706 return _to_memoryview(super().read1(len_))
3707
3708 def read(self, len_):
3709 return _to_memoryview(super().read(len_))
3710
3711def _to_memoryview(buf):
3712 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3713
3714 arr = array.array('i')
3715 idx = len(buf) - len(buf) % arr.itemsize
3716 arr.frombytes(buf[:idx])
3717 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003718
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003719
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003720class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003721 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003722 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003723
3724 def test_initialization(self):
3725 r = self.BytesIO(b"\xc3\xa9\n\n")
3726 b = self.BufferedReader(r, 1000)
3727 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003728 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3729 self.assertRaises(ValueError, t.read)
3730
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003731 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3732 self.assertRaises(Exception, repr, t)
3733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003734 def test_garbage_collection(self):
3735 # C TextIOWrapper objects are collected, and collecting them flushes
3736 # all data to disk.
3737 # The Python version has __del__, so it ends in gc.garbage instead.
Hai Shi883bc632020-07-06 17:12:49 +08003738 with warnings_helper.check_warnings(('', ResourceWarning)):
3739 rawio = io.FileIO(os_helper.TESTFN, "wb")
Antoine Pitrou796564c2013-07-30 19:59:21 +02003740 b = self.BufferedWriter(rawio)
3741 t = self.TextIOWrapper(b, encoding="ascii")
3742 t.write("456def")
3743 t.x = t
3744 wr = weakref.ref(t)
3745 del t
3746 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003747 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08003748 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003749 self.assertEqual(f.read(), b"456def")
3750
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003751 def test_rwpair_cleared_before_textio(self):
3752 # Issue 13070: TextIOWrapper's finalization would crash when called
3753 # after the reference to the underlying BufferedRWPair's writer got
3754 # cleared by the GC.
3755 for i in range(1000):
3756 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3757 t1 = self.TextIOWrapper(b1, encoding="ascii")
3758 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3759 t2 = self.TextIOWrapper(b2, encoding="ascii")
3760 # circular references
3761 t1.buddy = t2
3762 t2.buddy = t1
3763 support.gc_collect()
3764
Zackery Spytz842acaa2018-12-17 07:52:45 -07003765 def test_del__CHUNK_SIZE_SystemError(self):
3766 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3767 with self.assertRaises(AttributeError):
3768 del t._CHUNK_SIZE
3769
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003770
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003771class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003772 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003773 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003774
3775
3776class IncrementalNewlineDecoderTest(unittest.TestCase):
3777
3778 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003779 # UTF-8 specific tests for a newline decoder
3780 def _check_decode(b, s, **kwargs):
3781 # We exercise getstate() / setstate() as well as decode()
3782 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003783 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003784 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003785 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003786
Antoine Pitrou180a3362008-12-14 16:36:46 +00003787 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003788
Antoine Pitrou180a3362008-12-14 16:36:46 +00003789 _check_decode(b'\xe8', "")
3790 _check_decode(b'\xa2', "")
3791 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003792
Antoine Pitrou180a3362008-12-14 16:36:46 +00003793 _check_decode(b'\xe8', "")
3794 _check_decode(b'\xa2', "")
3795 _check_decode(b'\x88', "\u8888")
3796
3797 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003798 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3799
Antoine Pitrou180a3362008-12-14 16:36:46 +00003800 decoder.reset()
3801 _check_decode(b'\n', "\n")
3802 _check_decode(b'\r', "")
3803 _check_decode(b'', "\n", final=True)
3804 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003805
Antoine Pitrou180a3362008-12-14 16:36:46 +00003806 _check_decode(b'\r', "")
3807 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003808
Antoine Pitrou180a3362008-12-14 16:36:46 +00003809 _check_decode(b'\r\r\n', "\n\n")
3810 _check_decode(b'\r', "")
3811 _check_decode(b'\r', "\n")
3812 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003813
Antoine Pitrou180a3362008-12-14 16:36:46 +00003814 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3815 _check_decode(b'\xe8\xa2\x88', "\u8888")
3816 _check_decode(b'\n', "\n")
3817 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3818 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003819
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003820 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003821 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003822 if encoding is not None:
3823 encoder = codecs.getincrementalencoder(encoding)()
3824 def _decode_bytewise(s):
3825 # Decode one byte at a time
3826 for b in encoder.encode(s):
3827 result.append(decoder.decode(bytes([b])))
3828 else:
3829 encoder = None
3830 def _decode_bytewise(s):
3831 # Decode one char at a time
3832 for c in s:
3833 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003834 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003835 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003836 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003837 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003838 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003839 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003840 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003841 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003842 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003843 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003844 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003845 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003846 input = "abc"
3847 if encoder is not None:
3848 encoder.reset()
3849 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003850 self.assertEqual(decoder.decode(input), "abc")
3851 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003852
3853 def test_newline_decoder(self):
3854 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003855 # None meaning the IncrementalNewlineDecoder takes unicode input
3856 # rather than bytes input
3857 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003858 'utf-16', 'utf-16-le', 'utf-16-be',
3859 'utf-32', 'utf-32-le', 'utf-32-be',
3860 )
3861 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003862 decoder = enc and codecs.getincrementaldecoder(enc)()
3863 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3864 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003865 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003866 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3867 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003868 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003869
Antoine Pitrou66913e22009-03-06 23:40:56 +00003870 def test_newline_bytes(self):
3871 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3872 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003873 self.assertEqual(dec.newlines, None)
3874 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3875 self.assertEqual(dec.newlines, None)
3876 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3877 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003878 dec = self.IncrementalNewlineDecoder(None, translate=False)
3879 _check(dec)
3880 dec = self.IncrementalNewlineDecoder(None, translate=True)
3881 _check(dec)
3882
Xiang Zhangb08746b2018-10-31 19:49:16 +08003883 def test_translate(self):
3884 # issue 35062
3885 for translate in (-2, -1, 1, 2):
3886 decoder = codecs.getincrementaldecoder("utf-8")()
3887 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3888 self.check_newline_decoding_utf8(decoder)
3889 decoder = codecs.getincrementaldecoder("utf-8")()
3890 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3891 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003893class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3894 pass
3895
3896class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3897 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003898
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003899
Guido van Rossum01a27522007-03-07 01:00:12 +00003900# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003901
Guido van Rossum5abbf752007-08-27 17:39:33 +00003902class MiscIOTest(unittest.TestCase):
3903
Barry Warsaw40e82462008-11-20 20:14:50 +00003904 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08003905 os_helper.unlink(os_helper.TESTFN)
Barry Warsaw40e82462008-11-20 20:14:50 +00003906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003907 def test___all__(self):
3908 for name in self.io.__all__:
3909 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003910 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003911 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003912 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003913 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003914 self.assertTrue(issubclass(obj, Exception), name)
3915 elif not name.startswith("SEEK_"):
3916 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003917
Barry Warsaw40e82462008-11-20 20:14:50 +00003918 def test_attributes(self):
Hai Shi883bc632020-07-06 17:12:49 +08003919 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003920 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003921 f.close()
3922
Hai Shi883bc632020-07-06 17:12:49 +08003923 with warnings_helper.check_warnings(('', DeprecationWarning)):
3924 f = self.open(os_helper.TESTFN, "U")
3925 self.assertEqual(f.name, os_helper.TESTFN)
3926 self.assertEqual(f.buffer.name, os_helper.TESTFN)
3927 self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
Victor Stinner942f7a22020-03-04 18:50:22 +01003928 self.assertEqual(f.mode, "U")
3929 self.assertEqual(f.buffer.mode, "rb")
3930 self.assertEqual(f.buffer.raw.mode, "rb")
3931 f.close()
3932
Hai Shi883bc632020-07-06 17:12:49 +08003933 f = self.open(os_helper.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003934 self.assertEqual(f.mode, "w+")
3935 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3936 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003937
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003938 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003939 self.assertEqual(g.mode, "wb")
3940 self.assertEqual(g.raw.mode, "wb")
3941 self.assertEqual(g.name, f.fileno())
3942 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003943 f.close()
3944 g.close()
3945
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003946 def test_open_pipe_with_append(self):
3947 # bpo-27805: Ignore ESPIPE from lseek() in open().
3948 r, w = os.pipe()
3949 self.addCleanup(os.close, r)
3950 f = self.open(w, 'a')
3951 self.addCleanup(f.close)
3952 # Check that the file is marked non-seekable. On Windows, however, lseek
3953 # somehow succeeds on pipes.
3954 if sys.platform != 'win32':
3955 self.assertFalse(f.seekable())
3956
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003957 def test_io_after_close(self):
3958 for kwargs in [
3959 {"mode": "w"},
3960 {"mode": "wb"},
3961 {"mode": "w", "buffering": 1},
3962 {"mode": "w", "buffering": 2},
3963 {"mode": "wb", "buffering": 0},
3964 {"mode": "r"},
3965 {"mode": "rb"},
3966 {"mode": "r", "buffering": 1},
3967 {"mode": "r", "buffering": 2},
3968 {"mode": "rb", "buffering": 0},
3969 {"mode": "w+"},
3970 {"mode": "w+b"},
3971 {"mode": "w+", "buffering": 1},
3972 {"mode": "w+", "buffering": 2},
3973 {"mode": "w+b", "buffering": 0},
3974 ]:
Hai Shi883bc632020-07-06 17:12:49 +08003975 f = self.open(os_helper.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003976 f.close()
3977 self.assertRaises(ValueError, f.flush)
3978 self.assertRaises(ValueError, f.fileno)
3979 self.assertRaises(ValueError, f.isatty)
3980 self.assertRaises(ValueError, f.__iter__)
3981 if hasattr(f, "peek"):
3982 self.assertRaises(ValueError, f.peek, 1)
3983 self.assertRaises(ValueError, f.read)
3984 if hasattr(f, "read1"):
3985 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003986 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003987 if hasattr(f, "readall"):
3988 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003989 if hasattr(f, "readinto"):
3990 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003991 if hasattr(f, "readinto1"):
3992 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003993 self.assertRaises(ValueError, f.readline)
3994 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003995 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003996 self.assertRaises(ValueError, f.seek, 0)
3997 self.assertRaises(ValueError, f.tell)
3998 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003999 self.assertRaises(ValueError, f.write,
4000 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004001 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004002 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004004 def test_blockingioerror(self):
4005 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004006 class C(str):
4007 pass
4008 c = C("")
4009 b = self.BlockingIOError(1, c)
4010 c.b = b
4011 b.c = c
4012 wr = weakref.ref(c)
4013 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00004014 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004015 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004016
4017 def test_abcs(self):
4018 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00004019 self.assertIsInstance(self.IOBase, abc.ABCMeta)
4020 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4021 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4022 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004023
4024 def _check_abc_inheritance(self, abcmodule):
Hai Shi883bc632020-07-06 17:12:49 +08004025 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004026 self.assertIsInstance(f, abcmodule.IOBase)
4027 self.assertIsInstance(f, abcmodule.RawIOBase)
4028 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4029 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004030 with self.open(os_helper.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004031 self.assertIsInstance(f, abcmodule.IOBase)
4032 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4033 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4034 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004035 with self.open(os_helper.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004036 self.assertIsInstance(f, abcmodule.IOBase)
4037 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4038 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4039 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004040
4041 def test_abc_inheritance(self):
4042 # Test implementations inherit from their respective ABCs
4043 self._check_abc_inheritance(self)
4044
4045 def test_abc_inheritance_official(self):
4046 # Test implementations inherit from the official ABCs of the
4047 # baseline "io" module.
4048 self._check_abc_inheritance(io)
4049
Antoine Pitroue033e062010-10-29 10:38:18 +00004050 def _check_warn_on_dealloc(self, *args, **kwargs):
4051 f = open(*args, **kwargs)
4052 r = repr(f)
4053 with self.assertWarns(ResourceWarning) as cm:
4054 f = None
4055 support.gc_collect()
4056 self.assertIn(r, str(cm.warning.args[0]))
4057
4058 def test_warn_on_dealloc(self):
Hai Shi883bc632020-07-06 17:12:49 +08004059 self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4060 self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
4061 self._check_warn_on_dealloc(os_helper.TESTFN, "w")
Antoine Pitroue033e062010-10-29 10:38:18 +00004062
4063 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4064 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004065 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004066 for fd in fds:
4067 try:
4068 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004069 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004070 if e.errno != errno.EBADF:
4071 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004072 self.addCleanup(cleanup_fds)
4073 r, w = os.pipe()
4074 fds += r, w
4075 self._check_warn_on_dealloc(r, *args, **kwargs)
4076 # When using closefd=False, there's no warning
4077 r, w = os.pipe()
4078 fds += r, w
Hai Shi883bc632020-07-06 17:12:49 +08004079 with warnings_helper.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004080 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004081
4082 def test_warn_on_dealloc_fd(self):
4083 self._check_warn_on_dealloc_fd("rb", buffering=0)
4084 self._check_warn_on_dealloc_fd("rb")
4085 self._check_warn_on_dealloc_fd("r")
4086
4087
Antoine Pitrou243757e2010-11-05 21:15:39 +00004088 def test_pickling(self):
4089 # Pickling file objects is forbidden
4090 for kwargs in [
4091 {"mode": "w"},
4092 {"mode": "wb"},
4093 {"mode": "wb", "buffering": 0},
4094 {"mode": "r"},
4095 {"mode": "rb"},
4096 {"mode": "rb", "buffering": 0},
4097 {"mode": "w+"},
4098 {"mode": "w+b"},
4099 {"mode": "w+b", "buffering": 0},
4100 ]:
4101 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
Hai Shi883bc632020-07-06 17:12:49 +08004102 with self.open(os_helper.TESTFN, **kwargs) as f:
Antoine Pitrou243757e2010-11-05 21:15:39 +00004103 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4104
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004105 def test_nonblock_pipe_write_bigbuf(self):
4106 self._test_nonblock_pipe_write(16*1024)
4107
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004108 def test_nonblock_pipe_write_smallbuf(self):
4109 self._test_nonblock_pipe_write(1024)
4110
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004111 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4112 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004113 def _test_nonblock_pipe_write(self, bufsize):
4114 sent = []
4115 received = []
4116 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004117 os.set_blocking(r, False)
4118 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004119
4120 # To exercise all code paths in the C implementation we need
4121 # to play with buffer sizes. For instance, if we choose a
4122 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4123 # then we will never get a partial write of the buffer.
4124 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4125 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4126
4127 with rf, wf:
4128 for N in 9999, 73, 7574:
4129 try:
4130 i = 0
4131 while True:
4132 msg = bytes([i % 26 + 97]) * N
4133 sent.append(msg)
4134 wf.write(msg)
4135 i += 1
4136
4137 except self.BlockingIOError as e:
4138 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004139 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004140 sent[-1] = sent[-1][:e.characters_written]
4141 received.append(rf.read())
4142 msg = b'BLOCKED'
4143 wf.write(msg)
4144 sent.append(msg)
4145
4146 while True:
4147 try:
4148 wf.flush()
4149 break
4150 except self.BlockingIOError as e:
4151 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004152 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004153 self.assertEqual(e.characters_written, 0)
4154 received.append(rf.read())
4155
4156 received += iter(rf.read, None)
4157
4158 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004159 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004160 self.assertTrue(wf.closed)
4161 self.assertTrue(rf.closed)
4162
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004163 def test_create_fail(self):
4164 # 'x' mode fails if file is existing
Hai Shi883bc632020-07-06 17:12:49 +08004165 with self.open(os_helper.TESTFN, 'w'):
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004166 pass
Hai Shi883bc632020-07-06 17:12:49 +08004167 self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x')
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004168
4169 def test_create_writes(self):
4170 # 'x' mode opens for writing
Hai Shi883bc632020-07-06 17:12:49 +08004171 with self.open(os_helper.TESTFN, 'xb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004172 f.write(b"spam")
Hai Shi883bc632020-07-06 17:12:49 +08004173 with self.open(os_helper.TESTFN, 'rb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004174 self.assertEqual(b"spam", f.read())
4175
Christian Heimes7b648752012-09-10 14:48:43 +02004176 def test_open_allargs(self):
4177 # there used to be a buffer overflow in the parser for rawmode
Hai Shi883bc632020-07-06 17:12:49 +08004178 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+')
Christian Heimes7b648752012-09-10 14:48:43 +02004179
Victor Stinner22eb6892019-06-26 00:51:05 +02004180 def test_check_encoding_errors(self):
4181 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4182 # arguments in dev mode
4183 mod = self.io.__name__
4184 filename = __file__
4185 invalid = 'Boom, Shaka Laka, Boom!'
4186 code = textwrap.dedent(f'''
4187 import sys
4188 from {mod} import open, TextIOWrapper
4189
4190 try:
4191 open({filename!r}, encoding={invalid!r})
4192 except LookupError:
4193 pass
4194 else:
4195 sys.exit(21)
4196
4197 try:
4198 open({filename!r}, errors={invalid!r})
4199 except LookupError:
4200 pass
4201 else:
4202 sys.exit(22)
4203
4204 fp = open({filename!r}, "rb")
4205 with fp:
4206 try:
4207 TextIOWrapper(fp, encoding={invalid!r})
4208 except LookupError:
4209 pass
4210 else:
4211 sys.exit(23)
4212
4213 try:
4214 TextIOWrapper(fp, errors={invalid!r})
4215 except LookupError:
4216 pass
4217 else:
4218 sys.exit(24)
4219
4220 sys.exit(10)
4221 ''')
4222 proc = assert_python_failure('-X', 'dev', '-c', code)
4223 self.assertEqual(proc.rc, 10, proc)
4224
Christian Heimes7b648752012-09-10 14:48:43 +02004225
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004226class CMiscIOTest(MiscIOTest):
4227 io = io
4228
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004229 def test_readinto_buffer_overflow(self):
4230 # Issue #18025
4231 class BadReader(self.io.BufferedIOBase):
4232 def read(self, n=-1):
4233 return b'x' * 10**6
4234 bufio = BadReader()
4235 b = bytearray(2)
4236 self.assertRaises(ValueError, bufio.readinto, b)
4237
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004238 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4239 # Issue #23309: deadlocks at shutdown should be avoided when a
4240 # daemon thread and the main thread both write to a file.
4241 code = """if 1:
4242 import sys
4243 import time
4244 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004245 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004246
4247 file = sys.{stream_name}
4248
4249 def run():
4250 while True:
4251 file.write('.')
4252 file.flush()
4253
Victor Stinner2a1aed02017-04-21 17:59:23 +02004254 crash = SuppressCrashReport()
4255 crash.__enter__()
4256 # don't call __exit__(): the crash occurs at Python shutdown
4257
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004258 thread = threading.Thread(target=run)
4259 thread.daemon = True
4260 thread.start()
4261
4262 time.sleep(0.5)
4263 file.write('!')
4264 file.flush()
4265 """.format_map(locals())
4266 res, _ = run_python_until_end("-c", code)
4267 err = res.err.decode()
4268 if res.rc != 0:
4269 # Failure: should be a fatal error
Victor Stinner9e5d30c2020-03-07 00:54:20 +01004270 pattern = (r"Fatal Python error: _enter_buffered_busy: "
4271 r"could not acquire lock "
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004272 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4273 r"at interpreter shutdown, possibly due to "
4274 r"daemon threads".format_map(locals()))
4275 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004276 else:
4277 self.assertFalse(err.strip('.!'))
4278
4279 def test_daemon_threads_shutdown_stdout_deadlock(self):
4280 self.check_daemon_threads_shutdown_deadlock('stdout')
4281
4282 def test_daemon_threads_shutdown_stderr_deadlock(self):
4283 self.check_daemon_threads_shutdown_deadlock('stderr')
4284
4285
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004286class PyMiscIOTest(MiscIOTest):
4287 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004288
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004289
4290@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4291class SignalsTest(unittest.TestCase):
4292
4293 def setUp(self):
4294 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4295
4296 def tearDown(self):
4297 signal.signal(signal.SIGALRM, self.oldalrm)
4298
4299 def alarm_interrupt(self, sig, frame):
4300 1/0
4301
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004302 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4303 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004304 invokes the signal handler, and bubbles up the exception raised
4305 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004306 read_results = []
4307 def _read():
4308 s = os.read(r, 1)
4309 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004310
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004311 t = threading.Thread(target=_read)
4312 t.daemon = True
4313 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004314 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004315 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004316 try:
4317 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004318 if hasattr(signal, 'pthread_sigmask'):
4319 # create the thread with SIGALRM signal blocked
4320 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4321 t.start()
4322 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4323 else:
4324 t.start()
4325
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004326 # Fill the pipe enough that the write will be blocking.
4327 # It will be interrupted by the timer armed above. Since the
4328 # other thread has read one byte, the low-level write will
4329 # return with a successful (partial) result rather than an EINTR.
4330 # The buffered IO layer must check for pending signal
4331 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004332 signal.alarm(1)
4333 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004334 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004335 finally:
4336 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004337 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004338 # We got one byte, get another one and check that it isn't a
4339 # repeat of the first one.
4340 read_results.append(os.read(r, 1))
4341 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4342 finally:
4343 os.close(w)
4344 os.close(r)
4345 # This is deliberate. If we didn't close the file descriptor
4346 # before closing wio, wio would try to flush its internal
4347 # buffer, and block again.
4348 try:
4349 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004350 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004351 if e.errno != errno.EBADF:
4352 raise
4353
4354 def test_interrupted_write_unbuffered(self):
4355 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4356
4357 def test_interrupted_write_buffered(self):
4358 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4359
4360 def test_interrupted_write_text(self):
4361 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4362
Brett Cannon31f59292011-02-21 19:29:56 +00004363 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004364 def check_reentrant_write(self, data, **fdopen_kwargs):
4365 def on_alarm(*args):
4366 # Will be called reentrantly from the same thread
4367 wio.write(data)
4368 1/0
4369 signal.signal(signal.SIGALRM, on_alarm)
4370 r, w = os.pipe()
4371 wio = self.io.open(w, **fdopen_kwargs)
4372 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004373 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004374 # Either the reentrant call to wio.write() fails with RuntimeError,
4375 # or the signal handler raises ZeroDivisionError.
4376 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4377 while 1:
4378 for i in range(100):
4379 wio.write(data)
4380 wio.flush()
4381 # Make sure the buffer doesn't fill up and block further writes
4382 os.read(r, len(data) * 100)
4383 exc = cm.exception
4384 if isinstance(exc, RuntimeError):
4385 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4386 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004387 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004388 wio.close()
4389 os.close(r)
4390
4391 def test_reentrant_write_buffered(self):
4392 self.check_reentrant_write(b"xy", mode="wb")
4393
4394 def test_reentrant_write_text(self):
4395 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4396
Antoine Pitrou707ce822011-02-25 21:24:11 +00004397 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4398 """Check that a buffered read, when it gets interrupted (either
4399 returning a partial result or EINTR), properly invokes the signal
4400 handler and retries if the latter returned successfully."""
4401 r, w = os.pipe()
4402 fdopen_kwargs["closefd"] = False
4403 def alarm_handler(sig, frame):
4404 os.write(w, b"bar")
4405 signal.signal(signal.SIGALRM, alarm_handler)
4406 try:
4407 rio = self.io.open(r, **fdopen_kwargs)
4408 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004409 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004410 # Expected behaviour:
4411 # - first raw read() returns partial b"foo"
4412 # - second raw read() returns EINTR
4413 # - third raw read() returns b"bar"
4414 self.assertEqual(decode(rio.read(6)), "foobar")
4415 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004416 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004417 rio.close()
4418 os.close(w)
4419 os.close(r)
4420
Antoine Pitrou20db5112011-08-19 20:32:34 +02004421 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004422 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4423 mode="rb")
4424
Antoine Pitrou20db5112011-08-19 20:32:34 +02004425 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004426 self.check_interrupted_read_retry(lambda x: x,
4427 mode="r")
4428
Antoine Pitrou707ce822011-02-25 21:24:11 +00004429 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4430 """Check that a buffered write, when it gets interrupted (either
4431 returning a partial result or EINTR), properly invokes the signal
4432 handler and retries if the latter returned successfully."""
Hai Shi883bc632020-07-06 17:12:49 +08004433 select = import_helper.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004434
Antoine Pitrou707ce822011-02-25 21:24:11 +00004435 # A quantity that exceeds the buffer size of an anonymous pipe's
4436 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004437 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004438 r, w = os.pipe()
4439 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004440
Antoine Pitrou707ce822011-02-25 21:24:11 +00004441 # We need a separate thread to read from the pipe and allow the
4442 # write() to finish. This thread is started after the SIGALRM is
4443 # received (forcing a first EINTR in write()).
4444 read_results = []
4445 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004446 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004447 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004448 try:
4449 while not write_finished:
4450 while r in select.select([r], [], [], 1.0)[0]:
4451 s = os.read(r, 1024)
4452 read_results.append(s)
4453 except BaseException as exc:
4454 nonlocal error
4455 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004456 t = threading.Thread(target=_read)
4457 t.daemon = True
4458 def alarm1(sig, frame):
4459 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004460 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004461 def alarm2(sig, frame):
4462 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004463
4464 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004465 signal.signal(signal.SIGALRM, alarm1)
4466 try:
4467 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004468 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004469 # Expected behaviour:
4470 # - first raw write() is partial (because of the limited pipe buffer
4471 # and the first alarm)
4472 # - second raw write() returns EINTR (because of the second alarm)
4473 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004474 written = wio.write(large_data)
4475 self.assertEqual(N, written)
4476
Antoine Pitrou707ce822011-02-25 21:24:11 +00004477 wio.flush()
4478 write_finished = True
4479 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004480
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004481 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004482 self.assertEqual(N, sum(len(x) for x in read_results))
4483 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004484 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004485 write_finished = True
4486 os.close(w)
4487 os.close(r)
4488 # This is deliberate. If we didn't close the file descriptor
4489 # before closing wio, wio would try to flush its internal
4490 # buffer, and could block (in case of failure).
4491 try:
4492 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004493 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004494 if e.errno != errno.EBADF:
4495 raise
4496
Antoine Pitrou20db5112011-08-19 20:32:34 +02004497 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004498 self.check_interrupted_write_retry(b"x", mode="wb")
4499
Antoine Pitrou20db5112011-08-19 20:32:34 +02004500 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004501 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4502
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004503
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004504class CSignalsTest(SignalsTest):
4505 io = io
4506
4507class PySignalsTest(SignalsTest):
4508 io = pyio
4509
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004510 # Handling reentrancy issues would slow down _pyio even more, so the
4511 # tests are disabled.
4512 test_reentrant_write_buffered = None
4513 test_reentrant_write_text = None
4514
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004515
Ezio Melottidaa42c72013-03-23 16:30:16 +02004516def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004517 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004518 CBufferedReaderTest, PyBufferedReaderTest,
4519 CBufferedWriterTest, PyBufferedWriterTest,
4520 CBufferedRWPairTest, PyBufferedRWPairTest,
4521 CBufferedRandomTest, PyBufferedRandomTest,
4522 StatefulIncrementalDecoderTest,
4523 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4524 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004525 CMiscIOTest, PyMiscIOTest,
4526 CSignalsTest, PySignalsTest,
4527 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004528
4529 # Put the namespaces of the IO module we are testing and some useful mock
4530 # classes in the __dict__ of each test.
4531 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004532 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4533 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004534 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4535 c_io_ns = {name : getattr(io, name) for name in all_members}
4536 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4537 globs = globals()
4538 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4539 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4540 # Avoid turning open into a bound method.
4541 py_io_ns["open"] = pyio.OpenWrapper
4542 for test in tests:
4543 if test.__name__.startswith("C"):
4544 for name, obj in c_io_ns.items():
4545 setattr(test, name, obj)
4546 elif test.__name__.startswith("Py"):
4547 for name, obj in py_io_ns.items():
4548 setattr(test, name, obj)
4549
Ezio Melottidaa42c72013-03-23 16:30:16 +02004550 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4551 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004552
4553if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004554 unittest.main()