blob: 48a3cca3fbde4f1ddc8b39bdac3c38866b183d22 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Hai Shi883bc632020-07-06 17:12:49 +080043from test.support import import_helper
44from test.support import os_helper
Hai Shie80697d2020-05-28 06:10:27 +080045from test.support import threading_helper
Hai Shi883bc632020-07-06 17:12:49 +080046from test.support import warnings_helper
47from test.support.os_helper import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000048
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000049import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050import io # C implementation of io
51import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Martin Panter6bb91f32016-05-28 00:41:57 +000053try:
54 import ctypes
55except ImportError:
56 def byteslike(*pos, **kw):
57 return array.array("b", bytes(*pos, **kw))
58else:
59 def byteslike(*pos, **kw):
60 """Create a bytes-like object having no string or sequence methods"""
61 data = bytes(*pos, **kw)
62 obj = EmptyStruct()
63 ctypes.resize(obj, len(data))
64 memoryview(obj).cast("B")[:] = data
65 return obj
66 class EmptyStruct(ctypes.Structure):
67 pass
68
Gregory P. Smithe5796c42018-12-30 20:17:57 -080069_cflags = sysconfig.get_config_var('CFLAGS') or ''
70_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
71MEMORY_SANITIZER = (
72 '-fsanitize=memory' in _cflags or
73 '--with-memory-sanitizer' in _config_args
74)
75
Victor Stinnerbc2aa812019-05-23 03:45:09 +020076# Does io.IOBase finalizer log the exception if the close() method fails?
77# The exception is ignored silently by default in release build.
78IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020079
80
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081def _default_chunk_size():
82 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000083 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return f._CHUNK_SIZE
85
86
Antoine Pitrou328ec742010-09-14 18:37:24 +000087class MockRawIOWithoutRead:
88 """A RawIO implementation without read(), so as to exercise the default
89 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000090
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000091 def __init__(self, read_stack=()):
92 self._read_stack = list(read_stack)
93 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000096
Guido van Rossum01a27522007-03-07 01:00:12 +000097 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000099 return len(b)
100
101 def writable(self):
102 return True
103
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000104 def fileno(self):
105 return 42
106
107 def readable(self):
108 return True
109
Guido van Rossum01a27522007-03-07 01:00:12 +0000110 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111 return True
112
Guido van Rossum01a27522007-03-07 01:00:12 +0000113 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000115
116 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000117 return 0 # same comment as above
118
119 def readinto(self, buf):
120 self._reads += 1
121 max_len = len(buf)
122 try:
123 data = self._read_stack[0]
124 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000125 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126 return 0
127 if data is None:
128 del self._read_stack[0]
129 return None
130 n = len(data)
131 if len(data) <= max_len:
132 del self._read_stack[0]
133 buf[:n] = data
134 return n
135 else:
136 buf[:] = data[:max_len]
137 self._read_stack[0] = data[max_len:]
138 return max_len
139
140 def truncate(self, pos=None):
141 return pos
142
Antoine Pitrou328ec742010-09-14 18:37:24 +0000143class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
144 pass
145
146class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
147 pass
148
149
150class MockRawIO(MockRawIOWithoutRead):
151
152 def read(self, n=None):
153 self._reads += 1
154 try:
155 return self._read_stack.pop(0)
156 except:
157 self._extraneous_reads += 1
158 return b""
159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160class CMockRawIO(MockRawIO, io.RawIOBase):
161 pass
162
163class PyMockRawIO(MockRawIO, pyio.RawIOBase):
164 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000165
Guido van Rossuma9e20242007-03-08 00:43:48 +0000166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000167class MisbehavedRawIO(MockRawIO):
168 def write(self, b):
169 return super().write(b) * 2
170
171 def read(self, n=None):
172 return super().read(n) * 2
173
174 def seek(self, pos, whence):
175 return -123
176
177 def tell(self):
178 return -456
179
180 def readinto(self, buf):
181 super().readinto(buf)
182 return len(buf) * 5
183
184class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
185 pass
186
187class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
188 pass
189
190
benfogle9703f092017-11-10 16:03:40 -0500191class SlowFlushRawIO(MockRawIO):
192 def __init__(self):
193 super().__init__()
194 self.in_flush = threading.Event()
195
196 def flush(self):
197 self.in_flush.set()
198 time.sleep(0.25)
199
200class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
201 pass
202
203class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
204 pass
205
206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000207class CloseFailureIO(MockRawIO):
208 closed = 0
209
210 def close(self):
211 if not self.closed:
212 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200213 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214
215class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
216 pass
217
218class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
219 pass
220
221
222class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000223
224 def __init__(self, data):
225 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000226 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000227
228 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000229 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000230 self.read_history.append(None if res is None else len(res))
231 return res
232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000233 def readinto(self, b):
234 res = super().readinto(b)
235 self.read_history.append(res)
236 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238class CMockFileIO(MockFileIO, io.BytesIO):
239 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241class PyMockFileIO(MockFileIO, pyio.BytesIO):
242 pass
243
244
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000245class MockUnseekableIO:
246 def seekable(self):
247 return False
248
249 def seek(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
252 def tell(self, *args):
253 raise self.UnsupportedOperation("not seekable")
254
Martin Panter754aab22016-03-31 07:21:56 +0000255 def truncate(self, *args):
256 raise self.UnsupportedOperation("not seekable")
257
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000258class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
259 UnsupportedOperation = io.UnsupportedOperation
260
261class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
262 UnsupportedOperation = pyio.UnsupportedOperation
263
264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265class MockNonBlockWriterIO:
266
267 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000268 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000269 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000271 def pop_written(self):
272 s = b"".join(self._write_stack)
273 self._write_stack[:] = []
274 return s
275
276 def block_on(self, char):
277 """Block when a given char is encountered."""
278 self._blocker_char = char
279
280 def readable(self):
281 return True
282
283 def seekable(self):
284 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000285
Victor Stinnerb589cef2019-06-11 03:10:59 +0200286 def seek(self, pos, whence=0):
287 # naive implementation, enough for tests
288 return 0
289
Guido van Rossum01a27522007-03-07 01:00:12 +0000290 def writable(self):
291 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000293 def write(self, b):
294 b = bytes(b)
295 n = -1
296 if self._blocker_char:
297 try:
298 n = b.index(self._blocker_char)
299 except ValueError:
300 pass
301 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100302 if n > 0:
303 # write data up to the first blocker
304 self._write_stack.append(b[:n])
305 return n
306 else:
307 # cancel blocker and indicate would block
308 self._blocker_char = None
309 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000310 self._write_stack.append(b)
311 return len(b)
312
313class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
314 BlockingIOError = io.BlockingIOError
315
316class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
317 BlockingIOError = pyio.BlockingIOError
318
Guido van Rossuma9e20242007-03-08 00:43:48 +0000319
Guido van Rossum28524c72007-02-27 05:47:44 +0000320class IOTest(unittest.TestCase):
321
Neal Norwitze7789b12008-03-24 06:18:09 +0000322 def setUp(self):
Hai Shi883bc632020-07-06 17:12:49 +0800323 os_helper.unlink(os_helper.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000324
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000325 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +0800326 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 f.truncate(0)
331 self.assertEqual(f.tell(), 5)
332 f.seek(0)
333
334 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.seek(0), 0)
336 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000337 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000340 buffer = bytearray(b" world\n\n\n")
341 self.assertEqual(f.write(buffer), 9)
342 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000343 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000344 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000345 self.assertEqual(f.seek(-1, 2), 13)
346 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000347
Guido van Rossum87429772007-04-10 21:06:59 +0000348 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000349 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000350 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000351
Guido van Rossum9b76da62007-04-11 01:09:03 +0000352 def read_ops(self, f, buffered=False):
353 data = f.read(5)
354 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000355 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000357 self.assertEqual(bytes(data), b" worl")
358 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000359 self.assertEqual(f.readinto(data), 2)
360 self.assertEqual(len(data), 5)
361 self.assertEqual(data[:2], b"d\n")
362 self.assertEqual(f.seek(0), 0)
363 self.assertEqual(f.read(20), b"hello world\n")
364 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000365 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000366 self.assertEqual(f.seek(-6, 2), 6)
367 self.assertEqual(f.read(5), b"world")
368 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000369 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000370 self.assertEqual(f.seek(-6, 1), 5)
371 self.assertEqual(f.read(5), b" worl")
372 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000373 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000374 if buffered:
375 f.seek(0)
376 self.assertEqual(f.read(), b"hello world\n")
377 f.seek(6)
378 self.assertEqual(f.read(), b"world\n")
379 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000380 f.seek(0)
381 data = byteslike(5)
382 self.assertEqual(f.readinto1(data), 5)
383 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000384
Guido van Rossum34d69e52007-04-10 20:08:41 +0000385 LARGE = 2**31
386
Guido van Rossum53807da2007-04-10 19:01:47 +0000387 def large_file_ops(self, f):
388 assert f.readable()
389 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100390 try:
391 self.assertEqual(f.seek(self.LARGE), self.LARGE)
392 except (OverflowError, ValueError):
393 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000394 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000396 self.assertEqual(f.tell(), self.LARGE + 3)
397 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000398 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000399 self.assertEqual(f.tell(), self.LARGE + 2)
400 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000401 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000402 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000403 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
404 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000405 self.assertEqual(f.read(2), b"x")
406
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000407 def test_invalid_operations(self):
408 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000409 exc = self.UnsupportedOperation
Inada Naoki58cffba2021-04-01 11:25:04 +0900410 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp:
411 self.assertRaises(exc, fp.read)
412 self.assertRaises(exc, fp.readline)
413 with self.open(os_helper.TESTFN, "wb") as fp:
414 self.assertRaises(exc, fp.read)
415 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800416 with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000417 self.assertRaises(exc, fp.read)
418 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800419 with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000420 self.assertRaises(exc, fp.write, b"blah")
421 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800422 with self.open(os_helper.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000423 self.assertRaises(exc, fp.write, b"blah")
424 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Inada Naoki58cffba2021-04-01 11:25:04 +0900425 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000426 self.assertRaises(exc, fp.write, "blah")
427 self.assertRaises(exc, fp.writelines, ["blah\n"])
428 # Non-zero seeking from current or end pos
429 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
430 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000431
Martin Panter754aab22016-03-31 07:21:56 +0000432 def test_optional_abilities(self):
433 # Test for OSError when optional APIs are not supported
434 # The purpose of this test is to try fileno(), reading, writing and
435 # seeking operations with various objects that indicate they do not
436 # support these operations.
437
438 def pipe_reader():
439 [r, w] = os.pipe()
440 os.close(w) # So that read() is harmless
441 return self.FileIO(r, "r")
442
443 def pipe_writer():
444 [r, w] = os.pipe()
445 self.addCleanup(os.close, r)
446 # Guarantee that we can write into the pipe without blocking
447 thread = threading.Thread(target=os.read, args=(r, 100))
448 thread.start()
449 self.addCleanup(thread.join)
450 return self.FileIO(w, "w")
451
452 def buffered_reader():
453 return self.BufferedReader(self.MockUnseekableIO())
454
455 def buffered_writer():
456 return self.BufferedWriter(self.MockUnseekableIO())
457
458 def buffered_random():
459 return self.BufferedRandom(self.BytesIO())
460
461 def buffered_rw_pair():
462 return self.BufferedRWPair(self.MockUnseekableIO(),
463 self.MockUnseekableIO())
464
465 def text_reader():
466 class UnseekableReader(self.MockUnseekableIO):
467 writable = self.BufferedIOBase.writable
468 write = self.BufferedIOBase.write
469 return self.TextIOWrapper(UnseekableReader(), "ascii")
470
471 def text_writer():
472 class UnseekableWriter(self.MockUnseekableIO):
473 readable = self.BufferedIOBase.readable
474 read = self.BufferedIOBase.read
475 return self.TextIOWrapper(UnseekableWriter(), "ascii")
476
477 tests = (
478 (pipe_reader, "fr"), (pipe_writer, "fw"),
479 (buffered_reader, "r"), (buffered_writer, "w"),
480 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
481 (text_reader, "r"), (text_writer, "w"),
482 (self.BytesIO, "rws"), (self.StringIO, "rws"),
483 )
484 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000485 with self.subTest(test), test() as obj:
486 readable = "r" in abilities
487 self.assertEqual(obj.readable(), readable)
488 writable = "w" in abilities
489 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000490
491 if isinstance(obj, self.TextIOBase):
492 data = "3"
493 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
494 data = b"3"
495 else:
496 self.fail("Unknown base class")
497
498 if "f" in abilities:
499 obj.fileno()
500 else:
501 self.assertRaises(OSError, obj.fileno)
502
503 if readable:
504 obj.read(1)
505 obj.read()
506 else:
507 self.assertRaises(OSError, obj.read, 1)
508 self.assertRaises(OSError, obj.read)
509
510 if writable:
511 obj.write(data)
512 else:
513 self.assertRaises(OSError, obj.write, data)
514
Martin Panter3ee147f2016-03-31 21:05:31 +0000515 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000516 pipe_reader, pipe_writer):
517 # Pipes seem to appear as seekable on Windows
518 continue
519 seekable = "s" in abilities
520 self.assertEqual(obj.seekable(), seekable)
521
Martin Panter754aab22016-03-31 07:21:56 +0000522 if seekable:
523 obj.tell()
524 obj.seek(0)
525 else:
526 self.assertRaises(OSError, obj.tell)
527 self.assertRaises(OSError, obj.seek, 0)
528
529 if writable and seekable:
530 obj.truncate()
531 obj.truncate(0)
532 else:
533 self.assertRaises(OSError, obj.truncate)
534 self.assertRaises(OSError, obj.truncate, 0)
535
Antoine Pitrou13348842012-01-29 18:36:34 +0100536 def test_open_handles_NUL_chars(self):
537 fn_with_NUL = 'foo\0bar'
Inada Naoki58cffba2021-04-01 11:25:04 +0900538 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8")
Victor Stinner47b45572016-03-25 09:07:07 +0100539
540 bytes_fn = bytes(fn_with_NUL, 'ascii')
541 with warnings.catch_warnings():
542 warnings.simplefilter("ignore", DeprecationWarning)
Inada Naoki58cffba2021-04-01 11:25:04 +0900543 self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8")
Antoine Pitrou13348842012-01-29 18:36:34 +0100544
Guido van Rossum28524c72007-02-27 05:47:44 +0000545 def test_raw_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800546 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000547 self.assertEqual(f.readable(), False)
548 self.assertEqual(f.writable(), True)
549 self.assertEqual(f.seekable(), True)
550 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800551 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000552 self.assertEqual(f.readable(), True)
553 self.assertEqual(f.writable(), False)
554 self.assertEqual(f.seekable(), True)
555 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000556
Guido van Rossum87429772007-04-10 21:06:59 +0000557 def test_buffered_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800558 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000559 self.assertEqual(f.readable(), False)
560 self.assertEqual(f.writable(), True)
561 self.assertEqual(f.seekable(), True)
562 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800563 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000564 self.assertEqual(f.readable(), True)
565 self.assertEqual(f.writable(), False)
566 self.assertEqual(f.seekable(), True)
567 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000568
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000569 def test_readline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800570 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000571 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Hai Shi883bc632020-07-06 17:12:49 +0800572 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000573 self.assertEqual(f.readline(), b"abc\n")
574 self.assertEqual(f.readline(10), b"def\n")
575 self.assertEqual(f.readline(2), b"xy")
576 self.assertEqual(f.readline(4), b"zzy\n")
577 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000578 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000579 self.assertRaises(TypeError, f.readline, 5.3)
Inada Naoki58cffba2021-04-01 11:25:04 +0900580 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000581 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000582
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300583 def test_readline_nonsizeable(self):
584 # Issue #30061
585 # Crash when readline() returns an object without __len__
586 class R(self.IOBase):
587 def readline(self):
588 return None
589 self.assertRaises((TypeError, StopIteration), next, R())
590
591 def test_next_nonsizeable(self):
592 # Issue #30061
593 # Crash when __next__() returns an object without __len__
594 class R(self.IOBase):
595 def __next__(self):
596 return None
597 self.assertRaises(TypeError, R().readlines, 1)
598
Guido van Rossum28524c72007-02-27 05:47:44 +0000599 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000600 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000601 self.write_ops(f)
602 data = f.getvalue()
603 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000604 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000605 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000606
Guido van Rossum53807da2007-04-10 19:01:47 +0000607 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300608 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800609 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000610 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200611 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600612 support.requires(
613 'largefile',
614 'test requires %s bytes and a long time to run' % self.LARGE)
Hai Shi883bc632020-07-06 17:12:49 +0800615 with self.open(os_helper.TESTFN, "w+b", 0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616 self.large_file_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800617 with self.open(os_helper.TESTFN, "w+b") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000618 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000619
620 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300621 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000622 f = None
Hai Shi883bc632020-07-06 17:12:49 +0800623 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000624 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000625 self.assertEqual(f.closed, True)
626 f = None
627 try:
Hai Shi883bc632020-07-06 17:12:49 +0800628 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000629 1/0
630 except ZeroDivisionError:
631 self.assertEqual(f.closed, True)
632 else:
633 self.fail("1/0 didn't raise an exception")
634
Antoine Pitrou08838b62009-01-21 00:55:13 +0000635 # issue 5008
636 def test_append_mode_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +0800637 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000638 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800639 with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000640 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800641 with self.open(os_helper.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000642 self.assertEqual(f.tell(), 3)
Inada Naoki58cffba2021-04-01 11:25:04 +0900643 with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300644 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000645
Guido van Rossum87429772007-04-10 21:06:59 +0000646 def test_destructor(self):
647 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000648 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000649 def __del__(self):
650 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000651 try:
652 f = super().__del__
653 except AttributeError:
654 pass
655 else:
656 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000657 def close(self):
658 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000659 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000660 def flush(self):
661 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000662 super().flush()
Hai Shi883bc632020-07-06 17:12:49 +0800663 with warnings_helper.check_warnings(('', ResourceWarning)):
664 f = MyFileIO(os_helper.TESTFN, "wb")
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000665 f.write(b"xxx")
666 del f
667 support.gc_collect()
668 self.assertEqual(record, [1, 2, 3])
Hai Shi883bc632020-07-06 17:12:49 +0800669 with self.open(os_helper.TESTFN, "rb") as f:
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000670 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671
672 def _check_base_destructor(self, base):
673 record = []
674 class MyIO(base):
675 def __init__(self):
676 # This exercises the availability of attributes on object
677 # destruction.
678 # (in the C version, close() is called by the tp_dealloc
679 # function, not by __del__)
680 self.on_del = 1
681 self.on_close = 2
682 self.on_flush = 3
683 def __del__(self):
684 record.append(self.on_del)
685 try:
686 f = super().__del__
687 except AttributeError:
688 pass
689 else:
690 f()
691 def close(self):
692 record.append(self.on_close)
693 super().close()
694 def flush(self):
695 record.append(self.on_flush)
696 super().flush()
697 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000698 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000699 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000700 self.assertEqual(record, [1, 2, 3])
701
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000702 def test_IOBase_destructor(self):
703 self._check_base_destructor(self.IOBase)
704
705 def test_RawIOBase_destructor(self):
706 self._check_base_destructor(self.RawIOBase)
707
708 def test_BufferedIOBase_destructor(self):
709 self._check_base_destructor(self.BufferedIOBase)
710
711 def test_TextIOBase_destructor(self):
712 self._check_base_destructor(self.TextIOBase)
713
Guido van Rossum87429772007-04-10 21:06:59 +0000714 def test_close_flushes(self):
Hai Shi883bc632020-07-06 17:12:49 +0800715 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000716 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800717 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000718 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000719
Guido van Rossumd4103952007-04-12 05:44:49 +0000720 def test_array_writes(self):
721 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000722 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000723 def check(f):
724 with f:
725 self.assertEqual(f.write(a), n)
726 f.writelines((a,))
727 check(self.BytesIO())
Hai Shi883bc632020-07-06 17:12:49 +0800728 check(self.FileIO(os_helper.TESTFN, "w"))
Martin Panter6bb91f32016-05-28 00:41:57 +0000729 check(self.BufferedWriter(self.MockRawIO()))
730 check(self.BufferedRandom(self.MockRawIO()))
731 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000732
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000733 def test_closefd(self):
Hai Shi883bc632020-07-06 17:12:49 +0800734 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
Inada Naoki58cffba2021-04-01 11:25:04 +0900735 encoding="utf-8", closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000737 def test_read_closed(self):
Inada Naoki58cffba2021-04-01 11:25:04 +0900738 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000739 f.write("egg\n")
Inada Naoki58cffba2021-04-01 11:25:04 +0900740 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
741 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000742 self.assertEqual(file.read(), "egg\n")
743 file.seek(0)
744 file.close()
745 self.assertRaises(ValueError, file.read)
Hai Shi883bc632020-07-06 17:12:49 +0800746 with self.open(os_helper.TESTFN, "rb") as f:
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100747 file = self.open(f.fileno(), "rb", closefd=False)
748 self.assertEqual(file.read()[:3], b"egg")
749 file.close()
750 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000751
752 def test_no_closefd_with_filename(self):
753 # can't use closefd in combination with a file name
Inada Naoki58cffba2021-04-01 11:25:04 +0900754 self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r",
755 encoding="utf-8", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000756
757 def test_closefd_attr(self):
Hai Shi883bc632020-07-06 17:12:49 +0800758 with self.open(os_helper.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000759 f.write(b"egg\n")
Inada Naoki58cffba2021-04-01 11:25:04 +0900760 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000761 self.assertEqual(f.buffer.raw.closefd, True)
Inada Naoki58cffba2021-04-01 11:25:04 +0900762 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000763 self.assertEqual(file.buffer.raw.closefd, False)
764
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000765 def test_garbage_collection(self):
766 # FileIO objects are collected, and collecting them flushes
767 # all data to disk.
Hai Shi883bc632020-07-06 17:12:49 +0800768 with warnings_helper.check_warnings(('', ResourceWarning)):
769 f = self.FileIO(os_helper.TESTFN, "wb")
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000770 f.write(b"abcxxx")
771 f.f = f
772 wr = weakref.ref(f)
773 del f
774 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300775 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +0800776 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000777 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000778
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000779 def test_unbounded_file(self):
780 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
781 zero = "/dev/zero"
782 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000783 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000784 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000785 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000786 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800787 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000788 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000789 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000790 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000791 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000792 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000793 self.assertRaises(OverflowError, f.read)
794
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200795 def check_flush_error_on_close(self, *args, **kwargs):
796 # Test that the file is closed despite failed flush
797 # and that flush() is called before file closed.
798 f = self.open(*args, **kwargs)
799 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000800 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200801 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200802 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000803 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200804 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600805 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200806 self.assertTrue(closed) # flush() called
807 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200808 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200809
810 def test_flush_error_on_close(self):
811 # raw file
812 # Issue #5700: io.FileIO calls flush() after file closed
Hai Shi883bc632020-07-06 17:12:49 +0800813 self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
814 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200815 self.check_flush_error_on_close(fd, 'wb', buffering=0)
Hai Shi883bc632020-07-06 17:12:49 +0800816 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200817 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
818 os.close(fd)
819 # buffered io
Hai Shi883bc632020-07-06 17:12:49 +0800820 self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
821 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200822 self.check_flush_error_on_close(fd, 'wb')
Hai Shi883bc632020-07-06 17:12:49 +0800823 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200824 self.check_flush_error_on_close(fd, 'wb', closefd=False)
825 os.close(fd)
826 # text io
Inada Naoki58cffba2021-04-01 11:25:04 +0900827 self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8")
Hai Shi883bc632020-07-06 17:12:49 +0800828 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Inada Naoki58cffba2021-04-01 11:25:04 +0900829 self.check_flush_error_on_close(fd, 'w', encoding="utf-8")
Hai Shi883bc632020-07-06 17:12:49 +0800830 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Inada Naoki58cffba2021-04-01 11:25:04 +0900831 self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200832 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000833
834 def test_multi_close(self):
Hai Shi883bc632020-07-06 17:12:49 +0800835 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000836 f.close()
837 f.close()
838 f.close()
839 self.assertRaises(ValueError, f.flush)
840
Antoine Pitrou328ec742010-09-14 18:37:24 +0000841 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530842 # Exercise the default limited RawIOBase.read(n) implementation (which
843 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000844 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
845 self.assertEqual(rawio.read(2), b"ab")
846 self.assertEqual(rawio.read(2), b"c")
847 self.assertEqual(rawio.read(2), b"d")
848 self.assertEqual(rawio.read(2), None)
849 self.assertEqual(rawio.read(2), b"ef")
850 self.assertEqual(rawio.read(2), b"g")
851 self.assertEqual(rawio.read(2), None)
852 self.assertEqual(rawio.read(2), b"")
853
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400854 def test_types_have_dict(self):
855 test = (
856 self.IOBase(),
857 self.RawIOBase(),
858 self.TextIOBase(),
859 self.StringIO(),
860 self.BytesIO()
861 )
862 for obj in test:
863 self.assertTrue(hasattr(obj, "__dict__"))
864
Ross Lagerwall59142db2011-10-31 20:34:46 +0200865 def test_opener(self):
Inada Naoki58cffba2021-04-01 11:25:04 +0900866 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Ross Lagerwall59142db2011-10-31 20:34:46 +0200867 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800868 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Ross Lagerwall59142db2011-10-31 20:34:46 +0200869 def opener(path, flags):
870 return fd
Inada Naoki58cffba2021-04-01 11:25:04 +0900871 with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f:
Ross Lagerwall59142db2011-10-31 20:34:46 +0200872 self.assertEqual(f.read(), "egg\n")
873
Barry Warsaw480e2852016-06-08 17:47:26 -0400874 def test_bad_opener_negative_1(self):
875 # Issue #27066.
876 def badopener(fname, flags):
877 return -1
878 with self.assertRaises(ValueError) as cm:
879 open('non-existent', 'r', opener=badopener)
880 self.assertEqual(str(cm.exception), 'opener returned -1')
881
882 def test_bad_opener_other_negative(self):
883 # Issue #27066.
884 def badopener(fname, flags):
885 return -2
886 with self.assertRaises(ValueError) as cm:
887 open('non-existent', 'r', opener=badopener)
888 self.assertEqual(str(cm.exception), 'opener returned -2')
889
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200890 def test_fileio_closefd(self):
891 # Issue #4841
892 with self.open(__file__, 'rb') as f1, \
893 self.open(__file__, 'rb') as f2:
894 fileio = self.FileIO(f1.fileno(), closefd=False)
895 # .__init__() must not close f1
896 fileio.__init__(f2.fileno(), closefd=False)
897 f1.readline()
898 # .close() must not close f2
899 fileio.close()
900 f2.readline()
901
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300902 def test_nonbuffered_textio(self):
Hai Shi883bc632020-07-06 17:12:49 +0800903 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300904 with self.assertRaises(ValueError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900905 self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300906
907 def test_invalid_newline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800908 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300909 with self.assertRaises(ValueError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900910 self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300911
Martin Panter6bb91f32016-05-28 00:41:57 +0000912 def test_buffered_readinto_mixin(self):
913 # Test the implementation provided by BufferedIOBase
914 class Stream(self.BufferedIOBase):
915 def read(self, size):
916 return b"12345"
917 read1 = read
918 stream = Stream()
919 for method in ("readinto", "readinto1"):
920 with self.subTest(method):
921 buffer = byteslike(5)
922 self.assertEqual(getattr(stream, method)(buffer), 5)
923 self.assertEqual(bytes(buffer), b"12345")
924
Ethan Furmand62548a2016-06-04 14:38:43 -0700925 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700926 def check_path_succeeds(path):
Inada Naoki58cffba2021-04-01 11:25:04 +0900927 with self.open(path, "w", encoding="utf-8") as f:
Ethan Furmand62548a2016-06-04 14:38:43 -0700928 f.write("egg\n")
929
Inada Naoki58cffba2021-04-01 11:25:04 +0900930 with self.open(path, "r", encoding="utf-8") as f:
Ethan Furmand62548a2016-06-04 14:38:43 -0700931 self.assertEqual(f.read(), "egg\n")
932
Hai Shi883bc632020-07-06 17:12:49 +0800933 check_path_succeeds(FakePath(os_helper.TESTFN))
Serhiy Storchaka67987ac2020-07-27 20:58:35 +0300934 check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
Ethan Furmand62548a2016-06-04 14:38:43 -0700935
Inada Naoki58cffba2021-04-01 11:25:04 +0900936 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200937 bad_path = FakePath(f.fileno())
938 with self.assertRaises(TypeError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900939 self.open(bad_path, 'w', encoding="utf-8")
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200940
941 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700942 with self.assertRaises(TypeError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900943 self.open(bad_path, 'w', encoding="utf-8")
Ethan Furmand62548a2016-06-04 14:38:43 -0700944
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200945 bad_path = FakePath(FloatingPointError)
946 with self.assertRaises(FloatingPointError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900947 self.open(bad_path, 'w', encoding="utf-8")
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200948
Ethan Furmand62548a2016-06-04 14:38:43 -0700949 # ensure that refcounting is correct with some error conditions
950 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Inada Naoki58cffba2021-04-01 11:25:04 +0900951 self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8")
Ethan Furmand62548a2016-06-04 14:38:43 -0700952
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530953 def test_RawIOBase_readall(self):
954 # Exercise the default unlimited RawIOBase.read() and readall()
955 # implementations.
956 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
957 self.assertEqual(rawio.read(), b"abcdefg")
958 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
959 self.assertEqual(rawio.readall(), b"abcdefg")
960
961 def test_BufferedIOBase_readinto(self):
962 # Exercise the default BufferedIOBase.readinto() and readinto1()
963 # implementations (which call read() or read1() internally).
964 class Reader(self.BufferedIOBase):
965 def __init__(self, avail):
966 self.avail = avail
967 def read(self, size):
968 result = self.avail[:size]
969 self.avail = self.avail[size:]
970 return result
971 def read1(self, size):
972 """Returns no more than 5 bytes at once"""
973 return self.read(min(size, 5))
974 tests = (
975 # (test method, total data available, read buffer size, expected
976 # read size)
977 ("readinto", 10, 5, 5),
978 ("readinto", 10, 6, 6), # More than read1() can return
979 ("readinto", 5, 6, 5), # Buffer larger than total available
980 ("readinto", 6, 7, 6),
981 ("readinto", 10, 0, 0), # Empty buffer
982 ("readinto1", 10, 5, 5), # Result limited to single read1() call
983 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
984 ("readinto1", 5, 6, 5), # Buffer larger than total available
985 ("readinto1", 6, 7, 5),
986 ("readinto1", 10, 0, 0), # Empty buffer
987 )
988 UNUSED_BYTE = 0x81
989 for test in tests:
990 with self.subTest(test):
991 method, avail, request, result = test
992 reader = Reader(bytes(range(avail)))
993 buffer = bytearray((UNUSED_BYTE,) * request)
994 method = getattr(reader, method)
995 self.assertEqual(method(buffer), result)
996 self.assertEqual(len(buffer), request)
997 self.assertSequenceEqual(buffer[:result], range(result))
998 unused = (UNUSED_BYTE,) * (request - result)
999 self.assertSequenceEqual(buffer[result:], unused)
1000 self.assertEqual(len(reader.avail), avail - result)
1001
Zackery Spytz28f07362018-07-17 00:31:44 -06001002 def test_close_assert(self):
1003 class R(self.IOBase):
1004 def __setattr__(self, name, value):
1005 pass
1006 def flush(self):
1007 raise OSError()
1008 f = R()
1009 # This would cause an assertion failure.
1010 self.assertRaises(OSError, f.close)
1011
Victor Stinner472f7942019-04-12 21:58:24 +02001012 # Silence destructor error
1013 R.flush = lambda self: None
1014
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001016class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001017
1018 def test_IOBase_finalize(self):
1019 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1020 # class which inherits IOBase and an object of this class are caught
1021 # in a reference cycle and close() is already in the method cache.
1022 class MyIO(self.IOBase):
1023 def close(self):
1024 pass
1025
1026 # create an instance to populate the method cache
1027 MyIO()
1028 obj = MyIO()
1029 obj.obj = obj
1030 wr = weakref.ref(obj)
1031 del MyIO
1032 del obj
1033 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001034 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001035
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001036class PyIOTest(IOTest):
1037 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001038
Guido van Rossuma9e20242007-03-08 00:43:48 +00001039
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001040@support.cpython_only
1041class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001042
Gregory P. Smith054b0652015-04-14 12:58:05 -07001043 def test_RawIOBase_io_in_pyio_match(self):
1044 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001045 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1046 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001047 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1048
1049 def test_RawIOBase_pyio_in_io_match(self):
1050 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1051 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1052 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1053
1054
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001055class CommonBufferedTests:
1056 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1057
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001058 def test_detach(self):
1059 raw = self.MockRawIO()
1060 buf = self.tp(raw)
1061 self.assertIs(buf.detach(), raw)
1062 self.assertRaises(ValueError, buf.detach)
1063
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001064 repr(buf) # Should still work
1065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066 def test_fileno(self):
1067 rawio = self.MockRawIO()
1068 bufio = self.tp(rawio)
1069
Ezio Melottib3aedd42010-11-20 19:04:17 +00001070 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072 def test_invalid_args(self):
1073 rawio = self.MockRawIO()
1074 bufio = self.tp(rawio)
1075 # Invalid whence
1076 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001077 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001078
1079 def test_override_destructor(self):
1080 tp = self.tp
1081 record = []
1082 class MyBufferedIO(tp):
1083 def __del__(self):
1084 record.append(1)
1085 try:
1086 f = super().__del__
1087 except AttributeError:
1088 pass
1089 else:
1090 f()
1091 def close(self):
1092 record.append(2)
1093 super().close()
1094 def flush(self):
1095 record.append(3)
1096 super().flush()
1097 rawio = self.MockRawIO()
1098 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001100 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001101 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001102
1103 def test_context_manager(self):
1104 # Test usability as a context manager
1105 rawio = self.MockRawIO()
1106 bufio = self.tp(rawio)
1107 def _with():
1108 with bufio:
1109 pass
1110 _with()
1111 # bufio should now be closed, and using it a second time should raise
1112 # a ValueError.
1113 self.assertRaises(ValueError, _with)
1114
1115 def test_error_through_destructor(self):
1116 # Test that the exception state is not modified by a destructor,
1117 # even if close() fails.
1118 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001119 with support.catch_unraisable_exception() as cm:
1120 with self.assertRaises(AttributeError):
1121 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001122
1123 if not IOBASE_EMITS_UNRAISABLE:
1124 self.assertIsNone(cm.unraisable)
1125 elif cm.unraisable is not None:
1126 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001127
Antoine Pitrou716c4442009-05-23 19:04:03 +00001128 def test_repr(self):
1129 raw = self.MockRawIO()
1130 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001131 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1132 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001133 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001134 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001135 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001136 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001137
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001138 def test_recursive_repr(self):
1139 # Issue #25455
1140 raw = self.MockRawIO()
1141 b = self.tp(raw)
1142 with support.swap_attr(raw, 'name', b):
1143 try:
1144 repr(b) # Should not crash
1145 except RuntimeError:
1146 pass
1147
Antoine Pitrou6be88762010-05-03 16:48:20 +00001148 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001149 # Test that buffered file is closed despite failed flush
1150 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001151 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001152 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001153 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001154 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001155 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001156 raw.flush = bad_flush
1157 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001158 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001159 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001160 self.assertTrue(raw.closed)
1161 self.assertTrue(closed) # flush() called
1162 self.assertFalse(closed[0]) # flush() called before file closed
1163 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001164 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001165
1166 def test_close_error_on_close(self):
1167 raw = self.MockRawIO()
1168 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001169 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001170 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001171 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001172 raw.close = bad_close
1173 b = self.tp(raw)
1174 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001175 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001176 b.close()
1177 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001178 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001179 self.assertEqual(err.exception.__context__.args, ('flush',))
1180 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001181
Victor Stinner472f7942019-04-12 21:58:24 +02001182 # Silence destructor error
1183 raw.close = lambda: None
1184 b.flush = lambda: None
1185
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001186 def test_nonnormalized_close_error_on_close(self):
1187 # Issue #21677
1188 raw = self.MockRawIO()
1189 def bad_flush():
1190 raise non_existing_flush
1191 def bad_close():
1192 raise non_existing_close
1193 raw.close = bad_close
1194 b = self.tp(raw)
1195 b.flush = bad_flush
1196 with self.assertRaises(NameError) as err: # exception not swallowed
1197 b.close()
1198 self.assertIn('non_existing_close', str(err.exception))
1199 self.assertIsInstance(err.exception.__context__, NameError)
1200 self.assertIn('non_existing_flush', str(err.exception.__context__))
1201 self.assertFalse(b.closed)
1202
Victor Stinner472f7942019-04-12 21:58:24 +02001203 # Silence destructor error
1204 b.flush = lambda: None
1205 raw.close = lambda: None
1206
Antoine Pitrou6be88762010-05-03 16:48:20 +00001207 def test_multi_close(self):
1208 raw = self.MockRawIO()
1209 b = self.tp(raw)
1210 b.close()
1211 b.close()
1212 b.close()
1213 self.assertRaises(ValueError, b.flush)
1214
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001215 def test_unseekable(self):
1216 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1217 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1218 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1219
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001220 def test_readonly_attributes(self):
1221 raw = self.MockRawIO()
1222 buf = self.tp(raw)
1223 x = self.MockRawIO()
1224 with self.assertRaises(AttributeError):
1225 buf.raw = x
1226
Guido van Rossum78892e42007-04-06 17:31:18 +00001227
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001228class SizeofTest:
1229
1230 @support.cpython_only
1231 def test_sizeof(self):
1232 bufsize1 = 4096
1233 bufsize2 = 8192
1234 rawio = self.MockRawIO()
1235 bufio = self.tp(rawio, buffer_size=bufsize1)
1236 size = sys.getsizeof(bufio) - bufsize1
1237 rawio = self.MockRawIO()
1238 bufio = self.tp(rawio, buffer_size=bufsize2)
1239 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1240
Jesus Ceadc469452012-10-04 12:37:56 +02001241 @support.cpython_only
1242 def test_buffer_freeing(self) :
1243 bufsize = 4096
1244 rawio = self.MockRawIO()
1245 bufio = self.tp(rawio, buffer_size=bufsize)
1246 size = sys.getsizeof(bufio) - bufsize
1247 bufio.close()
1248 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1251 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001252
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001253 def test_constructor(self):
1254 rawio = self.MockRawIO([b"abc"])
1255 bufio = self.tp(rawio)
1256 bufio.__init__(rawio)
1257 bufio.__init__(rawio, buffer_size=1024)
1258 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001259 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001260 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1261 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1262 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1263 rawio = self.MockRawIO([b"abc"])
1264 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001265 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001266
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001267 def test_uninitialized(self):
1268 bufio = self.tp.__new__(self.tp)
1269 del bufio
1270 bufio = self.tp.__new__(self.tp)
1271 self.assertRaisesRegex((ValueError, AttributeError),
1272 'uninitialized|has no attribute',
1273 bufio.read, 0)
1274 bufio.__init__(self.MockRawIO())
1275 self.assertEqual(bufio.read(0), b'')
1276
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001277 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001278 for arg in (None, 7):
1279 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1280 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001281 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001282 # Invalid args
1283 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001284
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001285 def test_read1(self):
1286 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1287 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001288 self.assertEqual(b"a", bufio.read(1))
1289 self.assertEqual(b"b", bufio.read1(1))
1290 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001291 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001292 self.assertEqual(b"c", bufio.read1(100))
1293 self.assertEqual(rawio._reads, 1)
1294 self.assertEqual(b"d", bufio.read1(100))
1295 self.assertEqual(rawio._reads, 2)
1296 self.assertEqual(b"efg", bufio.read1(100))
1297 self.assertEqual(rawio._reads, 3)
1298 self.assertEqual(b"", bufio.read1(100))
1299 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001300
1301 def test_read1_arbitrary(self):
1302 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1303 bufio = self.tp(rawio)
1304 self.assertEqual(b"a", bufio.read(1))
1305 self.assertEqual(b"bc", bufio.read1())
1306 self.assertEqual(b"d", bufio.read1())
1307 self.assertEqual(b"efg", bufio.read1(-1))
1308 self.assertEqual(rawio._reads, 3)
1309 self.assertEqual(b"", bufio.read1())
1310 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001311
1312 def test_readinto(self):
1313 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1314 bufio = self.tp(rawio)
1315 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001316 self.assertEqual(bufio.readinto(b), 2)
1317 self.assertEqual(b, b"ab")
1318 self.assertEqual(bufio.readinto(b), 2)
1319 self.assertEqual(b, b"cd")
1320 self.assertEqual(bufio.readinto(b), 2)
1321 self.assertEqual(b, b"ef")
1322 self.assertEqual(bufio.readinto(b), 1)
1323 self.assertEqual(b, b"gf")
1324 self.assertEqual(bufio.readinto(b), 0)
1325 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001326 rawio = self.MockRawIO((b"abc", None))
1327 bufio = self.tp(rawio)
1328 self.assertEqual(bufio.readinto(b), 2)
1329 self.assertEqual(b, b"ab")
1330 self.assertEqual(bufio.readinto(b), 1)
1331 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001332
Benjamin Petersona96fea02014-06-22 14:17:44 -07001333 def test_readinto1(self):
1334 buffer_size = 10
1335 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1336 bufio = self.tp(rawio, buffer_size=buffer_size)
1337 b = bytearray(2)
1338 self.assertEqual(bufio.peek(3), b'abc')
1339 self.assertEqual(rawio._reads, 1)
1340 self.assertEqual(bufio.readinto1(b), 2)
1341 self.assertEqual(b, b"ab")
1342 self.assertEqual(rawio._reads, 1)
1343 self.assertEqual(bufio.readinto1(b), 1)
1344 self.assertEqual(b[:1], b"c")
1345 self.assertEqual(rawio._reads, 1)
1346 self.assertEqual(bufio.readinto1(b), 2)
1347 self.assertEqual(b, b"de")
1348 self.assertEqual(rawio._reads, 2)
1349 b = bytearray(2*buffer_size)
1350 self.assertEqual(bufio.peek(3), b'fgh')
1351 self.assertEqual(rawio._reads, 3)
1352 self.assertEqual(bufio.readinto1(b), 6)
1353 self.assertEqual(b[:6], b"fghjkl")
1354 self.assertEqual(rawio._reads, 4)
1355
1356 def test_readinto_array(self):
1357 buffer_size = 60
1358 data = b"a" * 26
1359 rawio = self.MockRawIO((data,))
1360 bufio = self.tp(rawio, buffer_size=buffer_size)
1361
1362 # Create an array with element size > 1 byte
1363 b = array.array('i', b'x' * 32)
1364 assert len(b) != 16
1365
1366 # Read into it. We should get as many *bytes* as we can fit into b
1367 # (which is more than the number of elements)
1368 n = bufio.readinto(b)
1369 self.assertGreater(n, len(b))
1370
1371 # Check that old contents of b are preserved
1372 bm = memoryview(b).cast('B')
1373 self.assertLess(n, len(bm))
1374 self.assertEqual(bm[:n], data[:n])
1375 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1376
1377 def test_readinto1_array(self):
1378 buffer_size = 60
1379 data = b"a" * 26
1380 rawio = self.MockRawIO((data,))
1381 bufio = self.tp(rawio, buffer_size=buffer_size)
1382
1383 # Create an array with element size > 1 byte
1384 b = array.array('i', b'x' * 32)
1385 assert len(b) != 16
1386
1387 # Read into it. We should get as many *bytes* as we can fit into b
1388 # (which is more than the number of elements)
1389 n = bufio.readinto1(b)
1390 self.assertGreater(n, len(b))
1391
1392 # Check that old contents of b are preserved
1393 bm = memoryview(b).cast('B')
1394 self.assertLess(n, len(bm))
1395 self.assertEqual(bm[:n], data[:n])
1396 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1397
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001398 def test_readlines(self):
1399 def bufio():
1400 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1401 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001402 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1403 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1404 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001405
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001406 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001407 data = b"abcdefghi"
1408 dlen = len(data)
1409
1410 tests = [
1411 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1412 [ 100, [ 3, 3, 3], [ dlen ] ],
1413 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1414 ]
1415
1416 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001417 rawio = self.MockFileIO(data)
1418 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001419 pos = 0
1420 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001422 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001424 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001425
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001426 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001427 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001428 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1429 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001430 self.assertEqual(b"abcd", bufio.read(6))
1431 self.assertEqual(b"e", bufio.read(1))
1432 self.assertEqual(b"fg", bufio.read())
1433 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001434 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001435 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001436
Victor Stinnera80987f2011-05-25 22:47:16 +02001437 rawio = self.MockRawIO((b"a", None, None))
1438 self.assertEqual(b"a", rawio.readall())
1439 self.assertIsNone(rawio.readall())
1440
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001441 def test_read_past_eof(self):
1442 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1443 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001444
Ezio Melottib3aedd42010-11-20 19:04:17 +00001445 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001446
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001447 def test_read_all(self):
1448 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1449 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001450
Ezio Melottib3aedd42010-11-20 19:04:17 +00001451 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001452
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001453 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001454 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001455 try:
1456 # Write out many bytes with exactly the same number of 0's,
1457 # 1's... 255's. This will help us check that concurrent reading
1458 # doesn't duplicate or forget contents.
1459 N = 1000
1460 l = list(range(256)) * N
1461 random.shuffle(l)
1462 s = bytes(bytearray(l))
Hai Shi883bc632020-07-06 17:12:49 +08001463 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001464 f.write(s)
Hai Shi883bc632020-07-06 17:12:49 +08001465 with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001466 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001467 errors = []
1468 results = []
1469 def f():
1470 try:
1471 # Intra-buffer read then buffer-flushing read
1472 for n in cycle([1, 19]):
1473 s = bufio.read(n)
1474 if not s:
1475 break
1476 # list.append() is atomic
1477 results.append(s)
1478 except Exception as e:
1479 errors.append(e)
1480 raise
1481 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001482 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001483 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001484 self.assertFalse(errors,
1485 "the following exceptions were caught: %r" % errors)
1486 s = b''.join(results)
1487 for i in range(256):
1488 c = bytes(bytearray([i]))
1489 self.assertEqual(s.count(c), N)
1490 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001491 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001492
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001493 def test_unseekable(self):
1494 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1495 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1496 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1497 bufio.read(1)
1498 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1499 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1500
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001501 def test_misbehaved_io(self):
1502 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1503 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001504 self.assertRaises(OSError, bufio.seek, 0)
1505 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001506
Victor Stinnerb589cef2019-06-11 03:10:59 +02001507 # Silence destructor error
1508 bufio.close = lambda: None
1509
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001510 def test_no_extraneous_read(self):
1511 # Issue #9550; when the raw IO object has satisfied the read request,
1512 # we should not issue any additional reads, otherwise it may block
1513 # (e.g. socket).
1514 bufsize = 16
1515 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1516 rawio = self.MockRawIO([b"x" * n])
1517 bufio = self.tp(rawio, bufsize)
1518 self.assertEqual(bufio.read(n), b"x" * n)
1519 # Simple case: one raw read is enough to satisfy the request.
1520 self.assertEqual(rawio._extraneous_reads, 0,
1521 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1522 # A more complex case where two raw reads are needed to satisfy
1523 # the request.
1524 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1525 bufio = self.tp(rawio, bufsize)
1526 self.assertEqual(bufio.read(n), b"x" * n)
1527 self.assertEqual(rawio._extraneous_reads, 0,
1528 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1529
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001530 def test_read_on_closed(self):
1531 # Issue #23796
1532 b = io.BufferedReader(io.BytesIO(b"12"))
1533 b.read(1)
1534 b.close()
1535 self.assertRaises(ValueError, b.peek)
1536 self.assertRaises(ValueError, b.read1, 1)
1537
Berker Peksagfd5116c2020-02-21 20:57:26 +03001538 def test_truncate_on_read_only(self):
1539 rawio = self.MockFileIO(b"abc")
1540 bufio = self.tp(rawio)
1541 self.assertFalse(bufio.writable())
1542 self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1543 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1544
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001545
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001546class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001547 tp = io.BufferedReader
1548
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001549 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1550 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 def test_constructor(self):
1552 BufferedReaderTest.test_constructor(self)
1553 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001554 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555 if sys.maxsize > 0x7FFFFFFF:
1556 rawio = self.MockRawIO()
1557 bufio = self.tp(rawio)
1558 self.assertRaises((OverflowError, MemoryError, ValueError),
1559 bufio.__init__, rawio, sys.maxsize)
1560
1561 def test_initialization(self):
1562 rawio = self.MockRawIO([b"abc"])
1563 bufio = self.tp(rawio)
1564 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1565 self.assertRaises(ValueError, bufio.read)
1566 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1567 self.assertRaises(ValueError, bufio.read)
1568 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1569 self.assertRaises(ValueError, bufio.read)
1570
1571 def test_misbehaved_io_read(self):
1572 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1573 bufio = self.tp(rawio)
1574 # _pyio.BufferedReader seems to implement reading different, so that
1575 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001576 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001577
1578 def test_garbage_collection(self):
1579 # C BufferedReader objects are collected.
1580 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001581 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1582 with warnings_helper.check_warnings(('', ResourceWarning)):
1583 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001584 f = self.tp(rawio)
1585 f.f = f
1586 wr = weakref.ref(f)
1587 del f
1588 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001589 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001590
R David Murray67bfe802013-02-23 21:51:05 -05001591 def test_args_error(self):
1592 # Issue #17275
1593 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1594 self.tp(io.BytesIO(), 1024, 1024, 1024)
1595
David Szotten86663562020-06-16 00:53:57 +01001596 def test_bad_readinto_value(self):
1597 rawio = io.BufferedReader(io.BytesIO(b"12"))
1598 rawio.readinto = lambda buf: -1
1599 bufio = self.tp(rawio)
1600 with self.assertRaises(OSError) as cm:
1601 bufio.readline()
1602 self.assertIsNone(cm.exception.__cause__)
1603
1604 def test_bad_readinto_type(self):
1605 rawio = io.BufferedReader(io.BytesIO(b"12"))
1606 rawio.readinto = lambda buf: b''
1607 bufio = self.tp(rawio)
1608 with self.assertRaises(OSError) as cm:
1609 bufio.readline()
1610 self.assertIsInstance(cm.exception.__cause__, TypeError)
1611
R David Murray67bfe802013-02-23 21:51:05 -05001612
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001613class PyBufferedReaderTest(BufferedReaderTest):
1614 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001615
Guido van Rossuma9e20242007-03-08 00:43:48 +00001616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1618 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001619
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001620 def test_constructor(self):
1621 rawio = self.MockRawIO()
1622 bufio = self.tp(rawio)
1623 bufio.__init__(rawio)
1624 bufio.__init__(rawio, buffer_size=1024)
1625 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001626 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001627 bufio.flush()
1628 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1629 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1630 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1631 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001632 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001634 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001635
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001636 def test_uninitialized(self):
1637 bufio = self.tp.__new__(self.tp)
1638 del bufio
1639 bufio = self.tp.__new__(self.tp)
1640 self.assertRaisesRegex((ValueError, AttributeError),
1641 'uninitialized|has no attribute',
1642 bufio.write, b'')
1643 bufio.__init__(self.MockRawIO())
1644 self.assertEqual(bufio.write(b''), 0)
1645
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001646 def test_detach_flush(self):
1647 raw = self.MockRawIO()
1648 buf = self.tp(raw)
1649 buf.write(b"howdy!")
1650 self.assertFalse(raw._write_stack)
1651 buf.detach()
1652 self.assertEqual(raw._write_stack, [b"howdy!"])
1653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001655 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001656 writer = self.MockRawIO()
1657 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001658 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001659 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001660 buffer = bytearray(b"def")
1661 bufio.write(buffer)
1662 buffer[:] = b"***" # Overwrite our copy of the data
1663 bufio.flush()
1664 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001666 def test_write_overflow(self):
1667 writer = self.MockRawIO()
1668 bufio = self.tp(writer, 8)
1669 contents = b"abcdefghijklmnop"
1670 for n in range(0, len(contents), 3):
1671 bufio.write(contents[n:n+3])
1672 flushed = b"".join(writer._write_stack)
1673 # At least (total - 8) bytes were implicitly flushed, perhaps more
1674 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001675 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001677 def check_writes(self, intermediate_func):
1678 # Lots of writes, test the flushed output is as expected.
1679 contents = bytes(range(256)) * 1000
1680 n = 0
1681 writer = self.MockRawIO()
1682 bufio = self.tp(writer, 13)
1683 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1684 def gen_sizes():
1685 for size in count(1):
1686 for i in range(15):
1687 yield size
1688 sizes = gen_sizes()
1689 while n < len(contents):
1690 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001691 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001692 intermediate_func(bufio)
1693 n += size
1694 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001695 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 def test_writes(self):
1698 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 def test_writes_and_flushes(self):
1701 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001702
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 def test_writes_and_seeks(self):
1704 def _seekabs(bufio):
1705 pos = bufio.tell()
1706 bufio.seek(pos + 1, 0)
1707 bufio.seek(pos - 1, 0)
1708 bufio.seek(pos, 0)
1709 self.check_writes(_seekabs)
1710 def _seekrel(bufio):
1711 pos = bufio.seek(0, 1)
1712 bufio.seek(+1, 1)
1713 bufio.seek(-1, 1)
1714 bufio.seek(pos, 0)
1715 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 def test_writes_and_truncates(self):
1718 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001719
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001720 def test_write_non_blocking(self):
1721 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001722 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001723
Ezio Melottib3aedd42010-11-20 19:04:17 +00001724 self.assertEqual(bufio.write(b"abcd"), 4)
1725 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001726 # 1 byte will be written, the rest will be buffered
1727 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001728 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001729
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001730 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1731 raw.block_on(b"0")
1732 try:
1733 bufio.write(b"opqrwxyz0123456789")
1734 except self.BlockingIOError as e:
1735 written = e.characters_written
1736 else:
1737 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001738 self.assertEqual(written, 16)
1739 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001741
Ezio Melottib3aedd42010-11-20 19:04:17 +00001742 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001743 s = raw.pop_written()
1744 # Previously buffered bytes were flushed
1745 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001746
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 def test_write_and_rewind(self):
1748 raw = io.BytesIO()
1749 bufio = self.tp(raw, 4)
1750 self.assertEqual(bufio.write(b"abcdef"), 6)
1751 self.assertEqual(bufio.tell(), 6)
1752 bufio.seek(0, 0)
1753 self.assertEqual(bufio.write(b"XY"), 2)
1754 bufio.seek(6, 0)
1755 self.assertEqual(raw.getvalue(), b"XYcdef")
1756 self.assertEqual(bufio.write(b"123456"), 6)
1757 bufio.flush()
1758 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001759
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760 def test_flush(self):
1761 writer = self.MockRawIO()
1762 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001763 bufio.write(b"abc")
1764 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001765 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001766
Antoine Pitrou131a4892012-10-16 22:57:11 +02001767 def test_writelines(self):
1768 l = [b'ab', b'cd', b'ef']
1769 writer = self.MockRawIO()
1770 bufio = self.tp(writer, 8)
1771 bufio.writelines(l)
1772 bufio.flush()
1773 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1774
1775 def test_writelines_userlist(self):
1776 l = UserList([b'ab', b'cd', b'ef'])
1777 writer = self.MockRawIO()
1778 bufio = self.tp(writer, 8)
1779 bufio.writelines(l)
1780 bufio.flush()
1781 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1782
1783 def test_writelines_error(self):
1784 writer = self.MockRawIO()
1785 bufio = self.tp(writer, 8)
1786 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1787 self.assertRaises(TypeError, bufio.writelines, None)
1788 self.assertRaises(TypeError, bufio.writelines, 'abc')
1789
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001790 def test_destructor(self):
1791 writer = self.MockRawIO()
1792 bufio = self.tp(writer, 8)
1793 bufio.write(b"abc")
1794 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001795 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001796 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001797
1798 def test_truncate(self):
1799 # Truncate implicitly flushes the buffer.
Hai Shi883bc632020-07-06 17:12:49 +08001800 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1801 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001802 bufio = self.tp(raw, 8)
1803 bufio.write(b"abcdef")
1804 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001805 self.assertEqual(bufio.tell(), 6)
Hai Shi883bc632020-07-06 17:12:49 +08001806 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 self.assertEqual(f.read(), b"abc")
1808
Nitish Chandra059f58c2018-01-28 21:30:09 +05301809 def test_truncate_after_write(self):
1810 # Ensure that truncate preserves the file position after
1811 # writes longer than the buffer size.
1812 # Issue: https://bugs.python.org/issue32228
Hai Shi883bc632020-07-06 17:12:49 +08001813 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1814 with self.open(os_helper.TESTFN, "wb") as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301815 # Fill with some buffer
1816 f.write(b'\x00' * 10000)
1817 buffer_sizes = [8192, 4096, 200]
1818 for buffer_size in buffer_sizes:
Hai Shi883bc632020-07-06 17:12:49 +08001819 with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301820 f.write(b'\x00' * (buffer_size + 1))
1821 # After write write_pos and write_end are set to 0
1822 f.read(1)
1823 # read operation makes sure that pos != raw_pos
1824 f.truncate()
1825 self.assertEqual(f.tell(), buffer_size + 2)
1826
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001827 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001828 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001829 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 # Write out many bytes from many threads and test they were
1831 # all flushed.
1832 N = 1000
1833 contents = bytes(range(256)) * N
1834 sizes = cycle([1, 19])
1835 n = 0
1836 queue = deque()
1837 while n < len(contents):
1838 size = next(sizes)
1839 queue.append(contents[n:n+size])
1840 n += size
1841 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001842 # We use a real file object because it allows us to
1843 # exercise situations where the GIL is released before
1844 # writing the buffer to the raw streams. This is in addition
1845 # to concurrency issues due to switching threads in the middle
1846 # of Python code.
Hai Shi883bc632020-07-06 17:12:49 +08001847 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001848 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001849 errors = []
1850 def f():
1851 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001852 while True:
1853 try:
1854 s = queue.popleft()
1855 except IndexError:
1856 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001857 bufio.write(s)
1858 except Exception as e:
1859 errors.append(e)
1860 raise
1861 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001862 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001863 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001864 self.assertFalse(errors,
1865 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001866 bufio.close()
Hai Shi883bc632020-07-06 17:12:49 +08001867 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 s = f.read()
1869 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001870 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001871 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001872 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001873
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001874 def test_misbehaved_io(self):
1875 rawio = self.MisbehavedRawIO()
1876 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001877 self.assertRaises(OSError, bufio.seek, 0)
1878 self.assertRaises(OSError, bufio.tell)
1879 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880
Victor Stinnerb589cef2019-06-11 03:10:59 +02001881 # Silence destructor error
1882 bufio.close = lambda: None
1883
Florent Xicluna109d5732012-07-07 17:03:22 +02001884 def test_max_buffer_size_removal(self):
1885 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001886 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001887
Benjamin Peterson68623612012-12-20 11:53:11 -06001888 def test_write_error_on_close(self):
1889 raw = self.MockRawIO()
1890 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001891 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001892 raw.write = bad_write
1893 b = self.tp(raw)
1894 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001895 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001896 self.assertTrue(b.closed)
1897
benfogle9703f092017-11-10 16:03:40 -05001898 def test_slow_close_from_thread(self):
1899 # Issue #31976
1900 rawio = self.SlowFlushRawIO()
1901 bufio = self.tp(rawio, 8)
1902 t = threading.Thread(target=bufio.close)
1903 t.start()
1904 rawio.in_flush.wait()
1905 self.assertRaises(ValueError, bufio.write, b'spam')
1906 self.assertTrue(bufio.closed)
1907 t.join()
1908
1909
Benjamin Peterson59406a92009-03-26 17:10:29 +00001910
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001911class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001912 tp = io.BufferedWriter
1913
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001914 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1915 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 def test_constructor(self):
1917 BufferedWriterTest.test_constructor(self)
1918 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001919 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 if sys.maxsize > 0x7FFFFFFF:
1921 rawio = self.MockRawIO()
1922 bufio = self.tp(rawio)
1923 self.assertRaises((OverflowError, MemoryError, ValueError),
1924 bufio.__init__, rawio, sys.maxsize)
1925
1926 def test_initialization(self):
1927 rawio = self.MockRawIO()
1928 bufio = self.tp(rawio)
1929 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1930 self.assertRaises(ValueError, bufio.write, b"def")
1931 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1932 self.assertRaises(ValueError, bufio.write, b"def")
1933 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1934 self.assertRaises(ValueError, bufio.write, b"def")
1935
1936 def test_garbage_collection(self):
1937 # C BufferedWriter objects are collected, and collecting them flushes
1938 # all data to disk.
1939 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001940 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1941 with warnings_helper.check_warnings(('', ResourceWarning)):
1942 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001943 f = self.tp(rawio)
1944 f.write(b"123xxx")
1945 f.x = f
1946 wr = weakref.ref(f)
1947 del f
1948 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001949 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08001950 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001951 self.assertEqual(f.read(), b"123xxx")
1952
R David Murray67bfe802013-02-23 21:51:05 -05001953 def test_args_error(self):
1954 # Issue #17275
1955 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1956 self.tp(io.BytesIO(), 1024, 1024, 1024)
1957
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001958
1959class PyBufferedWriterTest(BufferedWriterTest):
1960 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001961
Guido van Rossum01a27522007-03-07 01:00:12 +00001962class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001963
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001964 def test_constructor(self):
1965 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001966 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001967
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001968 def test_uninitialized(self):
1969 pair = self.tp.__new__(self.tp)
1970 del pair
1971 pair = self.tp.__new__(self.tp)
1972 self.assertRaisesRegex((ValueError, AttributeError),
1973 'uninitialized|has no attribute',
1974 pair.read, 0)
1975 self.assertRaisesRegex((ValueError, AttributeError),
1976 'uninitialized|has no attribute',
1977 pair.write, b'')
1978 pair.__init__(self.MockRawIO(), self.MockRawIO())
1979 self.assertEqual(pair.read(0), b'')
1980 self.assertEqual(pair.write(b''), 0)
1981
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001982 def test_detach(self):
1983 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1984 self.assertRaises(self.UnsupportedOperation, pair.detach)
1985
Florent Xicluna109d5732012-07-07 17:03:22 +02001986 def test_constructor_max_buffer_size_removal(self):
1987 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001988 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001989
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001990 def test_constructor_with_not_readable(self):
1991 class NotReadable(MockRawIO):
1992 def readable(self):
1993 return False
1994
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001995 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001996
1997 def test_constructor_with_not_writeable(self):
1998 class NotWriteable(MockRawIO):
1999 def writable(self):
2000 return False
2001
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002002 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002003
2004 def test_read(self):
2005 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2006
2007 self.assertEqual(pair.read(3), b"abc")
2008 self.assertEqual(pair.read(1), b"d")
2009 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002010 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2011 self.assertEqual(pair.read(None), b"abc")
2012
2013 def test_readlines(self):
2014 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2015 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2016 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2017 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002018
2019 def test_read1(self):
2020 # .read1() is delegated to the underlying reader object, so this test
2021 # can be shallow.
2022 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2023
2024 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00002025 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002026
2027 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00002028 for method in ("readinto", "readinto1"):
2029 with self.subTest(method):
2030 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002031
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002032 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002033 self.assertEqual(getattr(pair, method)(data), 5)
2034 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002035
2036 def test_write(self):
2037 w = self.MockRawIO()
2038 pair = self.tp(self.MockRawIO(), w)
2039
2040 pair.write(b"abc")
2041 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002042 buffer = bytearray(b"def")
2043 pair.write(buffer)
2044 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002045 pair.flush()
2046 self.assertEqual(w._write_stack, [b"abc", b"def"])
2047
2048 def test_peek(self):
2049 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2050
2051 self.assertTrue(pair.peek(3).startswith(b"abc"))
2052 self.assertEqual(pair.read(3), b"abc")
2053
2054 def test_readable(self):
2055 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2056 self.assertTrue(pair.readable())
2057
2058 def test_writeable(self):
2059 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2060 self.assertTrue(pair.writable())
2061
2062 def test_seekable(self):
2063 # BufferedRWPairs are never seekable, even if their readers and writers
2064 # are.
2065 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2066 self.assertFalse(pair.seekable())
2067
2068 # .flush() is delegated to the underlying writer object and has been
2069 # tested in the test_write method.
2070
2071 def test_close_and_closed(self):
2072 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2073 self.assertFalse(pair.closed)
2074 pair.close()
2075 self.assertTrue(pair.closed)
2076
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002077 def test_reader_close_error_on_close(self):
2078 def reader_close():
2079 reader_non_existing
2080 reader = self.MockRawIO()
2081 reader.close = reader_close
2082 writer = self.MockRawIO()
2083 pair = self.tp(reader, writer)
2084 with self.assertRaises(NameError) as err:
2085 pair.close()
2086 self.assertIn('reader_non_existing', str(err.exception))
2087 self.assertTrue(pair.closed)
2088 self.assertFalse(reader.closed)
2089 self.assertTrue(writer.closed)
2090
Victor Stinner472f7942019-04-12 21:58:24 +02002091 # Silence destructor error
2092 reader.close = lambda: None
2093
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002094 def test_writer_close_error_on_close(self):
2095 def writer_close():
2096 writer_non_existing
2097 reader = self.MockRawIO()
2098 writer = self.MockRawIO()
2099 writer.close = writer_close
2100 pair = self.tp(reader, writer)
2101 with self.assertRaises(NameError) as err:
2102 pair.close()
2103 self.assertIn('writer_non_existing', str(err.exception))
2104 self.assertFalse(pair.closed)
2105 self.assertTrue(reader.closed)
2106 self.assertFalse(writer.closed)
2107
Victor Stinner472f7942019-04-12 21:58:24 +02002108 # Silence destructor error
2109 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002110 writer = None
2111
Victor Stinner212646c2019-06-14 18:03:22 +02002112 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002113 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002114 # Ignore BufferedRWPair unraisable exception
2115 with support.catch_unraisable_exception():
2116 pair = None
2117 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002118 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002119
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002120 def test_reader_writer_close_error_on_close(self):
2121 def reader_close():
2122 reader_non_existing
2123 def writer_close():
2124 writer_non_existing
2125 reader = self.MockRawIO()
2126 reader.close = reader_close
2127 writer = self.MockRawIO()
2128 writer.close = writer_close
2129 pair = self.tp(reader, writer)
2130 with self.assertRaises(NameError) as err:
2131 pair.close()
2132 self.assertIn('reader_non_existing', str(err.exception))
2133 self.assertIsInstance(err.exception.__context__, NameError)
2134 self.assertIn('writer_non_existing', str(err.exception.__context__))
2135 self.assertFalse(pair.closed)
2136 self.assertFalse(reader.closed)
2137 self.assertFalse(writer.closed)
2138
Victor Stinner472f7942019-04-12 21:58:24 +02002139 # Silence destructor error
2140 reader.close = lambda: None
2141 writer.close = lambda: None
2142
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002143 def test_isatty(self):
2144 class SelectableIsAtty(MockRawIO):
2145 def __init__(self, isatty):
2146 MockRawIO.__init__(self)
2147 self._isatty = isatty
2148
2149 def isatty(self):
2150 return self._isatty
2151
2152 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2153 self.assertFalse(pair.isatty())
2154
2155 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2156 self.assertTrue(pair.isatty())
2157
2158 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2159 self.assertTrue(pair.isatty())
2160
2161 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2162 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002163
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002164 def test_weakref_clearing(self):
2165 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2166 ref = weakref.ref(brw)
2167 brw = None
2168 ref = None # Shouldn't segfault.
2169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170class CBufferedRWPairTest(BufferedRWPairTest):
2171 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173class PyBufferedRWPairTest(BufferedRWPairTest):
2174 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002175
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002176
2177class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2178 read_mode = "rb+"
2179 write_mode = "wb+"
2180
2181 def test_constructor(self):
2182 BufferedReaderTest.test_constructor(self)
2183 BufferedWriterTest.test_constructor(self)
2184
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002185 def test_uninitialized(self):
2186 BufferedReaderTest.test_uninitialized(self)
2187 BufferedWriterTest.test_uninitialized(self)
2188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002189 def test_read_and_write(self):
2190 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002191 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002192
2193 self.assertEqual(b"as", rw.read(2))
2194 rw.write(b"ddd")
2195 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002196 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002200 def test_seek_and_tell(self):
2201 raw = self.BytesIO(b"asdfghjkl")
2202 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002203
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(b"as", rw.read(2))
2205 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002206 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002207 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002208
Antoine Pitroue05565e2011-08-20 14:39:23 +02002209 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002210 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002211 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002212 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002213 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002214 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002215 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002216 self.assertEqual(7, rw.tell())
2217 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002218 rw.flush()
2219 self.assertEqual(b"asdf123fl", raw.getvalue())
2220
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002221 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002223 def check_flush_and_read(self, read_func):
2224 raw = self.BytesIO(b"abcdefghi")
2225 bufio = self.tp(raw)
2226
Ezio Melottib3aedd42010-11-20 19:04:17 +00002227 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002229 self.assertEqual(b"ef", read_func(bufio, 2))
2230 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002232 self.assertEqual(6, bufio.tell())
2233 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002234 raw.seek(0, 0)
2235 raw.write(b"XYZ")
2236 # flush() resets the read buffer
2237 bufio.flush()
2238 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002239 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002240
2241 def test_flush_and_read(self):
2242 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2243
2244 def test_flush_and_readinto(self):
2245 def _readinto(bufio, n=-1):
2246 b = bytearray(n if n >= 0 else 9999)
2247 n = bufio.readinto(b)
2248 return bytes(b[:n])
2249 self.check_flush_and_read(_readinto)
2250
2251 def test_flush_and_peek(self):
2252 def _peek(bufio, n=-1):
2253 # This relies on the fact that the buffer can contain the whole
2254 # raw stream, otherwise peek() can return less.
2255 b = bufio.peek(n)
2256 if n != -1:
2257 b = b[:n]
2258 bufio.seek(len(b), 1)
2259 return b
2260 self.check_flush_and_read(_peek)
2261
2262 def test_flush_and_write(self):
2263 raw = self.BytesIO(b"abcdefghi")
2264 bufio = self.tp(raw)
2265
2266 bufio.write(b"123")
2267 bufio.flush()
2268 bufio.write(b"45")
2269 bufio.flush()
2270 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002271 self.assertEqual(b"12345fghi", raw.getvalue())
2272 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002273
2274 def test_threads(self):
2275 BufferedReaderTest.test_threads(self)
2276 BufferedWriterTest.test_threads(self)
2277
2278 def test_writes_and_peek(self):
2279 def _peek(bufio):
2280 bufio.peek(1)
2281 self.check_writes(_peek)
2282 def _peek(bufio):
2283 pos = bufio.tell()
2284 bufio.seek(-1, 1)
2285 bufio.peek(1)
2286 bufio.seek(pos, 0)
2287 self.check_writes(_peek)
2288
2289 def test_writes_and_reads(self):
2290 def _read(bufio):
2291 bufio.seek(-1, 1)
2292 bufio.read(1)
2293 self.check_writes(_read)
2294
2295 def test_writes_and_read1s(self):
2296 def _read1(bufio):
2297 bufio.seek(-1, 1)
2298 bufio.read1(1)
2299 self.check_writes(_read1)
2300
2301 def test_writes_and_readintos(self):
2302 def _read(bufio):
2303 bufio.seek(-1, 1)
2304 bufio.readinto(bytearray(1))
2305 self.check_writes(_read)
2306
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002307 def test_write_after_readahead(self):
2308 # Issue #6629: writing after the buffer was filled by readahead should
2309 # first rewind the raw stream.
2310 for overwrite_size in [1, 5]:
2311 raw = self.BytesIO(b"A" * 10)
2312 bufio = self.tp(raw, 4)
2313 # Trigger readahead
2314 self.assertEqual(bufio.read(1), b"A")
2315 self.assertEqual(bufio.tell(), 1)
2316 # Overwriting should rewind the raw stream if it needs so
2317 bufio.write(b"B" * overwrite_size)
2318 self.assertEqual(bufio.tell(), overwrite_size + 1)
2319 # If the write size was smaller than the buffer size, flush() and
2320 # check that rewind happens.
2321 bufio.flush()
2322 self.assertEqual(bufio.tell(), overwrite_size + 1)
2323 s = raw.getvalue()
2324 self.assertEqual(s,
2325 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2326
Antoine Pitrou7c404892011-05-13 00:13:33 +02002327 def test_write_rewind_write(self):
2328 # Various combinations of reading / writing / seeking backwards / writing again
2329 def mutate(bufio, pos1, pos2):
2330 assert pos2 >= pos1
2331 # Fill the buffer
2332 bufio.seek(pos1)
2333 bufio.read(pos2 - pos1)
2334 bufio.write(b'\x02')
2335 # This writes earlier than the previous write, but still inside
2336 # the buffer.
2337 bufio.seek(pos1)
2338 bufio.write(b'\x01')
2339
2340 b = b"\x80\x81\x82\x83\x84"
2341 for i in range(0, len(b)):
2342 for j in range(i, len(b)):
2343 raw = self.BytesIO(b)
2344 bufio = self.tp(raw, 100)
2345 mutate(bufio, i, j)
2346 bufio.flush()
2347 expected = bytearray(b)
2348 expected[j] = 2
2349 expected[i] = 1
2350 self.assertEqual(raw.getvalue(), expected,
2351 "failed result for i=%d, j=%d" % (i, j))
2352
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002353 def test_truncate_after_read_or_write(self):
2354 raw = self.BytesIO(b"A" * 10)
2355 bufio = self.tp(raw, 100)
2356 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2357 self.assertEqual(bufio.truncate(), 2)
2358 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2359 self.assertEqual(bufio.truncate(), 4)
2360
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361 def test_misbehaved_io(self):
2362 BufferedReaderTest.test_misbehaved_io(self)
2363 BufferedWriterTest.test_misbehaved_io(self)
2364
Antoine Pitroue05565e2011-08-20 14:39:23 +02002365 def test_interleaved_read_write(self):
2366 # Test for issue #12213
2367 with self.BytesIO(b'abcdefgh') as raw:
2368 with self.tp(raw, 100) as f:
2369 f.write(b"1")
2370 self.assertEqual(f.read(1), b'b')
2371 f.write(b'2')
2372 self.assertEqual(f.read1(1), b'd')
2373 f.write(b'3')
2374 buf = bytearray(1)
2375 f.readinto(buf)
2376 self.assertEqual(buf, b'f')
2377 f.write(b'4')
2378 self.assertEqual(f.peek(1), b'h')
2379 f.flush()
2380 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2381
2382 with self.BytesIO(b'abc') as raw:
2383 with self.tp(raw, 100) as f:
2384 self.assertEqual(f.read(1), b'a')
2385 f.write(b"2")
2386 self.assertEqual(f.read(1), b'c')
2387 f.flush()
2388 self.assertEqual(raw.getvalue(), b'a2c')
2389
2390 def test_interleaved_readline_write(self):
2391 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2392 with self.tp(raw) as f:
2393 f.write(b'1')
2394 self.assertEqual(f.readline(), b'b\n')
2395 f.write(b'2')
2396 self.assertEqual(f.readline(), b'def\n')
2397 f.write(b'3')
2398 self.assertEqual(f.readline(), b'\n')
2399 f.flush()
2400 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2401
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002402 # You can't construct a BufferedRandom over a non-seekable stream.
2403 test_unseekable = None
2404
Berker Peksagfd5116c2020-02-21 20:57:26 +03002405 # writable() returns True, so there's no point to test it over
2406 # a writable stream.
2407 test_truncate_on_read_only = None
2408
R David Murray67bfe802013-02-23 21:51:05 -05002409
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002410class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002411 tp = io.BufferedRandom
2412
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002413 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2414 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 def test_constructor(self):
2416 BufferedRandomTest.test_constructor(self)
2417 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002418 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002419 if sys.maxsize > 0x7FFFFFFF:
2420 rawio = self.MockRawIO()
2421 bufio = self.tp(rawio)
2422 self.assertRaises((OverflowError, MemoryError, ValueError),
2423 bufio.__init__, rawio, sys.maxsize)
2424
2425 def test_garbage_collection(self):
2426 CBufferedReaderTest.test_garbage_collection(self)
2427 CBufferedWriterTest.test_garbage_collection(self)
2428
R David Murray67bfe802013-02-23 21:51:05 -05002429 def test_args_error(self):
2430 # Issue #17275
2431 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2432 self.tp(io.BytesIO(), 1024, 1024, 1024)
2433
2434
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002435class PyBufferedRandomTest(BufferedRandomTest):
2436 tp = pyio.BufferedRandom
2437
2438
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002439# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2440# properties:
2441# - A single output character can correspond to many bytes of input.
2442# - The number of input bytes to complete the character can be
2443# undetermined until the last input byte is received.
2444# - The number of input bytes can vary depending on previous input.
2445# - A single input byte can correspond to many characters of output.
2446# - The number of output characters can be undetermined until the
2447# last input byte is received.
2448# - The number of output characters can vary depending on previous input.
2449
2450class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2451 """
2452 For testing seek/tell behavior with a stateful, buffering decoder.
2453
2454 Input is a sequence of words. Words may be fixed-length (length set
2455 by input) or variable-length (period-terminated). In variable-length
2456 mode, extra periods are ignored. Possible words are:
2457 - 'i' followed by a number sets the input length, I (maximum 99).
2458 When I is set to 0, words are space-terminated.
2459 - 'o' followed by a number sets the output length, O (maximum 99).
2460 - Any other word is converted into a word followed by a period on
2461 the output. The output word consists of the input word truncated
2462 or padded out with hyphens to make its length equal to O. If O
2463 is 0, the word is output verbatim without truncating or padding.
2464 I and O are initially set to 1. When I changes, any buffered input is
2465 re-scanned according to the new I. EOF also terminates the last word.
2466 """
2467
2468 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002469 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002470 self.reset()
2471
2472 def __repr__(self):
2473 return '<SID %x>' % id(self)
2474
2475 def reset(self):
2476 self.i = 1
2477 self.o = 1
2478 self.buffer = bytearray()
2479
2480 def getstate(self):
2481 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2482 return bytes(self.buffer), i*100 + o
2483
2484 def setstate(self, state):
2485 buffer, io = state
2486 self.buffer = bytearray(buffer)
2487 i, o = divmod(io, 100)
2488 self.i, self.o = i ^ 1, o ^ 1
2489
2490 def decode(self, input, final=False):
2491 output = ''
2492 for b in input:
2493 if self.i == 0: # variable-length, terminated with period
2494 if b == ord('.'):
2495 if self.buffer:
2496 output += self.process_word()
2497 else:
2498 self.buffer.append(b)
2499 else: # fixed-length, terminate after self.i bytes
2500 self.buffer.append(b)
2501 if len(self.buffer) == self.i:
2502 output += self.process_word()
2503 if final and self.buffer: # EOF terminates the last word
2504 output += self.process_word()
2505 return output
2506
2507 def process_word(self):
2508 output = ''
2509 if self.buffer[0] == ord('i'):
2510 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2511 elif self.buffer[0] == ord('o'):
2512 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2513 else:
2514 output = self.buffer.decode('ascii')
2515 if len(output) < self.o:
2516 output += '-'*self.o # pad out with hyphens
2517 if self.o:
2518 output = output[:self.o] # truncate to output length
2519 output += '.'
2520 self.buffer = bytearray()
2521 return output
2522
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002523 codecEnabled = False
2524
Hai Shi14cdc212020-10-26 02:38:33 +08002525
2526# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
2527# when registering codecs and cleanup functions.
2528def lookupTestDecoder(name):
2529 if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
2530 latin1 = codecs.lookup('latin-1')
2531 return codecs.CodecInfo(
2532 name='test_decoder', encode=latin1.encode, decode=None,
2533 incrementalencoder=None,
2534 streamreader=None, streamwriter=None,
2535 incrementaldecoder=StatefulIncrementalDecoder)
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002536
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002537
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002538class StatefulIncrementalDecoderTest(unittest.TestCase):
2539 """
2540 Make sure the StatefulIncrementalDecoder actually works.
2541 """
2542
2543 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002544 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002545 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002546 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002547 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002548 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002549 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002550 # I=0, O=6 (variable-length input, fixed-length output)
2551 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2552 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002553 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002554 # I=6, O=3 (fixed-length input > fixed-length output)
2555 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2556 # I=0, then 3; O=29, then 15 (with longer output)
2557 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2558 'a----------------------------.' +
2559 'b----------------------------.' +
2560 'cde--------------------------.' +
2561 'abcdefghijabcde.' +
2562 'a.b------------.' +
2563 '.c.------------.' +
2564 'd.e------------.' +
2565 'k--------------.' +
2566 'l--------------.' +
2567 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002568 ]
2569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002571 # Try a few one-shot test cases.
2572 for input, eof, output in self.test_cases:
2573 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002574 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002575
2576 # Also test an unfinished decode, followed by forcing EOF.
2577 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002578 self.assertEqual(d.decode(b'oiabcd'), '')
2579 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002580
2581class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002582
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002583 def setUp(self):
2584 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2585 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Hai Shi883bc632020-07-06 17:12:49 +08002586 os_helper.unlink(os_helper.TESTFN)
Hai Shi14cdc212020-10-26 02:38:33 +08002587 codecs.register(lookupTestDecoder)
2588 self.addCleanup(codecs.unregister, lookupTestDecoder)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002589
Guido van Rossumd0712812007-04-11 16:32:43 +00002590 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08002591 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002592
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002593 def test_constructor(self):
2594 r = self.BytesIO(b"\xc3\xa9\n\n")
2595 b = self.BufferedReader(r, 1000)
Inada Naoki58cffba2021-04-01 11:25:04 +09002596 t = self.TextIOWrapper(b, encoding="utf-8")
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002597 t.__init__(b, encoding="latin-1", newline="\r\n")
2598 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002599 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002600 t.__init__(b, encoding="utf-8", line_buffering=True)
2601 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002602 self.assertEqual(t.line_buffering, True)
2603 self.assertEqual("\xe9\n", t.readline())
Inada Naokifb786922021-04-06 11:18:41 +09002604 self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
2605 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002606
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002607 def test_uninitialized(self):
2608 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2609 del t
2610 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2611 self.assertRaises(Exception, repr, t)
2612 self.assertRaisesRegex((ValueError, AttributeError),
2613 'uninitialized|has no attribute',
2614 t.read, 0)
Inada Naoki58cffba2021-04-01 11:25:04 +09002615 t.__init__(self.MockRawIO(), encoding="utf-8")
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002616 self.assertEqual(t.read(0), '')
2617
Nick Coghlana9b15242014-02-04 22:11:18 +10002618 def test_non_text_encoding_codecs_are_rejected(self):
2619 # Ensure the constructor complains if passed a codec that isn't
2620 # marked as a text encoding
2621 # http://bugs.python.org/issue20404
2622 r = self.BytesIO()
2623 b = self.BufferedWriter(r)
2624 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2625 self.TextIOWrapper(b, encoding="hex")
2626
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002627 def test_detach(self):
2628 r = self.BytesIO()
2629 b = self.BufferedWriter(r)
Inada Naoki58cffba2021-04-01 11:25:04 +09002630 t = self.TextIOWrapper(b, encoding="ascii")
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002631 self.assertIs(t.detach(), b)
2632
2633 t = self.TextIOWrapper(b, encoding="ascii")
2634 t.write("howdy")
2635 self.assertFalse(r.getvalue())
2636 t.detach()
2637 self.assertEqual(r.getvalue(), b"howdy")
2638 self.assertRaises(ValueError, t.detach)
2639
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002640 # Operations independent of the detached stream should still work
2641 repr(t)
2642 self.assertEqual(t.encoding, "ascii")
2643 self.assertEqual(t.errors, "strict")
2644 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002645 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002646
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002647 def test_repr(self):
2648 raw = self.BytesIO("hello".encode("utf-8"))
2649 b = self.BufferedReader(raw)
2650 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002651 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002652 self.assertRegex(repr(t),
2653 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002654 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002655 self.assertRegex(repr(t),
2656 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002657 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002658 self.assertRegex(repr(t),
2659 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002660 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002661 self.assertRegex(repr(t),
2662 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002663
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002664 t.buffer.detach()
2665 repr(t) # Should not raise an exception
2666
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002667 def test_recursive_repr(self):
2668 # Issue #25455
2669 raw = self.BytesIO()
Inada Naoki58cffba2021-04-01 11:25:04 +09002670 t = self.TextIOWrapper(raw, encoding="utf-8")
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002671 with support.swap_attr(raw, 'name', t):
2672 try:
2673 repr(t) # Should not crash
2674 except RuntimeError:
2675 pass
2676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002677 def test_line_buffering(self):
2678 r = self.BytesIO()
2679 b = self.BufferedWriter(r, 1000)
Inada Naoki58cffba2021-04-01 11:25:04 +09002680 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002681 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002682 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002683 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002684 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002685 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002686 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002687
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002688 def test_reconfigure_line_buffering(self):
2689 r = self.BytesIO()
2690 b = self.BufferedWriter(r, 1000)
Inada Naoki58cffba2021-04-01 11:25:04 +09002691 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002692 t.write("AB\nC")
2693 self.assertEqual(r.getvalue(), b"")
2694
2695 t.reconfigure(line_buffering=True) # implicit flush
2696 self.assertEqual(r.getvalue(), b"AB\nC")
2697 t.write("DEF\nG")
2698 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2699 t.write("H")
2700 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2701 t.reconfigure(line_buffering=False) # implicit flush
2702 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2703 t.write("IJ")
2704 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2705
2706 # Keeping default value
2707 t.reconfigure()
2708 t.reconfigure(line_buffering=None)
2709 self.assertEqual(t.line_buffering, False)
2710 t.reconfigure(line_buffering=True)
2711 t.reconfigure()
2712 t.reconfigure(line_buffering=None)
2713 self.assertEqual(t.line_buffering, True)
2714
Victor Stinner91106cd2017-12-13 12:29:09 +01002715 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002716 def test_default_encoding(self):
2717 old_environ = dict(os.environ)
2718 try:
2719 # try to get a user preferred encoding different than the current
2720 # locale encoding to check that TextIOWrapper() uses the current
2721 # locale encoding and not the user preferred encoding
2722 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2723 if key in os.environ:
2724 del os.environ[key]
2725
2726 current_locale_encoding = locale.getpreferredencoding(False)
2727 b = self.BytesIO()
Inada Naoki58cffba2021-04-01 11:25:04 +09002728 with warnings.catch_warnings():
2729 warnings.simplefilter("ignore", EncodingWarning)
2730 t = self.TextIOWrapper(b)
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002731 self.assertEqual(t.encoding, current_locale_encoding)
2732 finally:
2733 os.environ.clear()
2734 os.environ.update(old_environ)
2735
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002736 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002737 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002738 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002739 # Issue 15989
2740 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002741 b = self.BytesIO()
2742 b.fileno = lambda: _testcapi.INT_MAX + 1
Inada Naoki58cffba2021-04-01 11:25:04 +09002743 self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002744 b.fileno = lambda: _testcapi.UINT_MAX + 1
Inada Naoki58cffba2021-04-01 11:25:04 +09002745 self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002746
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002747 def test_encoding(self):
2748 # Check the encoding attribute is always set, and valid
2749 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002750 t = self.TextIOWrapper(b, encoding="utf-8")
2751 self.assertEqual(t.encoding, "utf-8")
Inada Naoki58cffba2021-04-01 11:25:04 +09002752 with warnings.catch_warnings():
2753 warnings.simplefilter("ignore", EncodingWarning)
2754 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002755 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002756 codecs.lookup(t.encoding)
2757
2758 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002759 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 b = self.BytesIO(b"abc\n\xff\n")
2761 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002762 self.assertRaises(UnicodeError, t.read)
2763 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002764 b = self.BytesIO(b"abc\n\xff\n")
2765 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002766 self.assertRaises(UnicodeError, t.read)
2767 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002768 b = self.BytesIO(b"abc\n\xff\n")
2769 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002770 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002771 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002772 b = self.BytesIO(b"abc\n\xff\n")
2773 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002774 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002775
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002776 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002777 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002778 b = self.BytesIO()
2779 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002780 self.assertRaises(UnicodeError, t.write, "\xff")
2781 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002782 b = self.BytesIO()
2783 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002784 self.assertRaises(UnicodeError, t.write, "\xff")
2785 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 b = self.BytesIO()
2787 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002788 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002789 t.write("abc\xffdef\n")
2790 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002791 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002792 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 b = self.BytesIO()
2794 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002795 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002796 t.write("abc\xffdef\n")
2797 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002798 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002799
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002800 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002801 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2802
2803 tests = [
2804 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002805 [ '', input_lines ],
2806 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2807 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2808 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002809 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002810 encodings = (
2811 'utf-8', 'latin-1',
2812 'utf-16', 'utf-16-le', 'utf-16-be',
2813 'utf-32', 'utf-32-le', 'utf-32-be',
2814 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002815
Guido van Rossum8358db22007-08-18 21:39:55 +00002816 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002817 # character in TextIOWrapper._pending_line.
2818 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002819 # XXX: str.encode() should return bytes
2820 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002821 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002822 for bufsize in range(1, 10):
2823 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002824 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2825 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002826 encoding=encoding)
2827 if do_reads:
2828 got_lines = []
2829 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002830 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002831 if c2 == '':
2832 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002833 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002834 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002835 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002836 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002837
2838 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002839 self.assertEqual(got_line, exp_line)
2840 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002842 def test_newlines_input(self):
2843 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002844 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2845 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002846 (None, normalized.decode("ascii").splitlines(keepends=True)),
2847 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002848 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2849 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2850 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002851 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002852 buf = self.BytesIO(testdata)
2853 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002854 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002855 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002856 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002858 def test_newlines_output(self):
2859 testdict = {
2860 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2861 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2862 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2863 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2864 }
2865 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2866 for newline, expected in tests:
2867 buf = self.BytesIO()
2868 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2869 txt.write("AAA\nB")
2870 txt.write("BB\nCCC\n")
2871 txt.write("X\rY\r\nZ")
2872 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002873 self.assertEqual(buf.closed, False)
2874 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002875
2876 def test_destructor(self):
2877 l = []
2878 base = self.BytesIO
2879 class MyBytesIO(base):
2880 def close(self):
2881 l.append(self.getvalue())
2882 base.close(self)
2883 b = MyBytesIO()
2884 t = self.TextIOWrapper(b, encoding="ascii")
2885 t.write("abc")
2886 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002887 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002888 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002889
2890 def test_override_destructor(self):
2891 record = []
2892 class MyTextIO(self.TextIOWrapper):
2893 def __del__(self):
2894 record.append(1)
2895 try:
2896 f = super().__del__
2897 except AttributeError:
2898 pass
2899 else:
2900 f()
2901 def close(self):
2902 record.append(2)
2903 super().close()
2904 def flush(self):
2905 record.append(3)
2906 super().flush()
2907 b = self.BytesIO()
2908 t = MyTextIO(b, encoding="ascii")
2909 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002910 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002911 self.assertEqual(record, [1, 2, 3])
2912
2913 def test_error_through_destructor(self):
2914 # Test that the exception state is not modified by a destructor,
2915 # even if close() fails.
2916 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002917 with support.catch_unraisable_exception() as cm:
2918 with self.assertRaises(AttributeError):
Inada Naoki58cffba2021-04-01 11:25:04 +09002919 self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002920
2921 if not IOBASE_EMITS_UNRAISABLE:
2922 self.assertIsNone(cm.unraisable)
2923 elif cm.unraisable is not None:
2924 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002925
Guido van Rossum9b76da62007-04-11 01:09:03 +00002926 # Systematic tests of the text I/O API
2927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002928 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002929 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002930 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Hai Shi883bc632020-07-06 17:12:49 +08002931 f = self.open(os_helper.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002932 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002933 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002934 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08002935 f = self.open(os_helper.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002936 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002937 self.assertEqual(f.tell(), 0)
2938 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002939 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002940 self.assertEqual(f.seek(0), 0)
2941 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002942 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002943 self.assertEqual(f.read(2), "ab")
2944 self.assertEqual(f.read(1), "c")
2945 self.assertEqual(f.read(1), "")
2946 self.assertEqual(f.read(), "")
2947 self.assertEqual(f.tell(), cookie)
2948 self.assertEqual(f.seek(0), 0)
2949 self.assertEqual(f.seek(0, 2), cookie)
2950 self.assertEqual(f.write("def"), 3)
2951 self.assertEqual(f.seek(cookie), cookie)
2952 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002953 if enc.startswith("utf"):
2954 self.multi_line_test(f, enc)
2955 f.close()
2956
2957 def multi_line_test(self, f, enc):
2958 f.seek(0)
2959 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002960 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002961 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002962 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002963 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002964 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002965 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002966 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002967 wlines.append((f.tell(), line))
2968 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002969 f.seek(0)
2970 rlines = []
2971 while True:
2972 pos = f.tell()
2973 line = f.readline()
2974 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002975 break
2976 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002977 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002978
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002979 def test_telling(self):
Hai Shi883bc632020-07-06 17:12:49 +08002980 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002981 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002982 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002983 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002984 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002985 p2 = f.tell()
2986 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002987 self.assertEqual(f.tell(), p0)
2988 self.assertEqual(f.readline(), "\xff\n")
2989 self.assertEqual(f.tell(), p1)
2990 self.assertEqual(f.readline(), "\xff\n")
2991 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002992 f.seek(0)
2993 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002994 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002995 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002996 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002997 f.close()
2998
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002999 def test_seeking(self):
3000 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003001 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00003002 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00003003 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003004 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00003005 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00003006 suffix = bytes(u_suffix.encode("utf-8"))
3007 line = prefix + suffix
Hai Shi883bc632020-07-06 17:12:49 +08003008 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003009 f.write(line*2)
Hai Shi883bc632020-07-06 17:12:49 +08003010 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003011 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003012 self.assertEqual(s, str(prefix, "ascii"))
3013 self.assertEqual(f.tell(), prefix_size)
3014 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00003015
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003016 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00003017 # Regression test for a specific bug
3018 data = b'\xe0\xbf\xbf\n'
Hai Shi883bc632020-07-06 17:12:49 +08003019 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003020 f.write(data)
Hai Shi883bc632020-07-06 17:12:49 +08003021 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003022 f._CHUNK_SIZE # Just test that it exists
3023 f._CHUNK_SIZE = 2
3024 f.readline()
3025 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003027 def test_seek_and_tell(self):
3028 #Test seek/tell using the StatefulIncrementalDecoder.
3029 # Make test faster by doing smaller seeks
3030 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003031
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003032 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003033 """Tell/seek to various points within a data stream and ensure
3034 that the decoded data returned by read() is consistent."""
Hai Shi883bc632020-07-06 17:12:49 +08003035 f = self.open(os_helper.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003036 f.write(data)
3037 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08003038 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003039 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003040 decoded = f.read()
3041 f.close()
3042
Neal Norwitze2b07052008-03-18 19:52:05 +00003043 for i in range(min_pos, len(decoded) + 1): # seek positions
3044 for j in [1, 5, len(decoded) - i]: # read lengths
Hai Shi883bc632020-07-06 17:12:49 +08003045 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003046 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003047 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003048 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003049 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003050 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003051 f.close()
3052
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003053 # Enable the test decoder.
3054 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003055
3056 # Run the tests.
3057 try:
3058 # Try each test case.
3059 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003060 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003061
3062 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003063 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3064 offset = CHUNK_SIZE - len(input)//2
3065 prefix = b'.'*offset
3066 # Don't bother seeking into the prefix (takes too long).
3067 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003068 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003069
3070 # Ensure our test decoder won't interfere with subsequent tests.
3071 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003072 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003073
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003074 def test_multibyte_seek_and_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +08003075 f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003076 f.write("AB\n\u3046\u3048\n")
3077 f.close()
3078
Hai Shi883bc632020-07-06 17:12:49 +08003079 f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003080 self.assertEqual(f.readline(), "AB\n")
3081 p0 = f.tell()
3082 self.assertEqual(f.readline(), "\u3046\u3048\n")
3083 p1 = f.tell()
3084 f.seek(p0)
3085 self.assertEqual(f.readline(), "\u3046\u3048\n")
3086 self.assertEqual(f.tell(), p1)
3087 f.close()
3088
3089 def test_seek_with_encoder_state(self):
Hai Shi883bc632020-07-06 17:12:49 +08003090 f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003091 f.write("\u00e6\u0300")
3092 p0 = f.tell()
3093 f.write("\u00e6")
3094 f.seek(p0)
3095 f.write("\u0300")
3096 f.close()
3097
Hai Shi883bc632020-07-06 17:12:49 +08003098 f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003099 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3100 f.close()
3101
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003102 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003103 data = "1234567890"
3104 tests = ("utf-16",
3105 "utf-16-le",
3106 "utf-16-be",
3107 "utf-32",
3108 "utf-32-le",
3109 "utf-32-be")
3110 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003111 buf = self.BytesIO()
3112 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003113 # Check if the BOM is written only once (see issue1753).
3114 f.write(data)
3115 f.write(data)
3116 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003117 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003118 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003119 self.assertEqual(f.read(), data * 2)
3120 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003121
Benjamin Petersona1b49012009-03-31 23:11:32 +00003122 def test_unreadable(self):
3123 class UnReadable(self.BytesIO):
3124 def readable(self):
3125 return False
Inada Naoki58cffba2021-04-01 11:25:04 +09003126 txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003127 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003129 def test_read_one_by_one(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003130 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003131 reads = ""
3132 while True:
3133 c = txt.read(1)
3134 if not c:
3135 break
3136 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003137 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003138
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003139 def test_readlines(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003140 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003141 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3142 txt.seek(0)
3143 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3144 txt.seek(0)
3145 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3146
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003147 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003148 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003149 # make sure "\r\n" straddles 128 char boundary.
Inada Naoki58cffba2021-04-01 11:25:04 +09003150 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003151 reads = ""
3152 while True:
3153 c = txt.read(128)
3154 if not c:
3155 break
3156 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003157 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003158
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003159 def test_writelines(self):
3160 l = ['ab', 'cd', 'ef']
3161 buf = self.BytesIO()
Inada Naoki58cffba2021-04-01 11:25:04 +09003162 txt = self.TextIOWrapper(buf, encoding="utf-8")
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003163 txt.writelines(l)
3164 txt.flush()
3165 self.assertEqual(buf.getvalue(), b'abcdef')
3166
3167 def test_writelines_userlist(self):
3168 l = UserList(['ab', 'cd', 'ef'])
3169 buf = self.BytesIO()
Inada Naoki58cffba2021-04-01 11:25:04 +09003170 txt = self.TextIOWrapper(buf, encoding="utf-8")
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003171 txt.writelines(l)
3172 txt.flush()
3173 self.assertEqual(buf.getvalue(), b'abcdef')
3174
3175 def test_writelines_error(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003176 txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003177 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3178 self.assertRaises(TypeError, txt.writelines, None)
3179 self.assertRaises(TypeError, txt.writelines, b'abc')
3180
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003181 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003182 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003183
3184 # read one char at a time
3185 reads = ""
3186 while True:
3187 c = txt.read(1)
3188 if not c:
3189 break
3190 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003191 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003192
3193 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003194 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003195 txt._CHUNK_SIZE = 4
3196
3197 reads = ""
3198 while True:
3199 c = txt.read(4)
3200 if not c:
3201 break
3202 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003203 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003204
3205 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003206 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003207 txt._CHUNK_SIZE = 4
3208
3209 reads = txt.read(4)
3210 reads += txt.read(4)
3211 reads += txt.readline()
3212 reads += txt.readline()
3213 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003214 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003215
3216 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003217 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003218 txt._CHUNK_SIZE = 4
3219
3220 reads = txt.read(4)
3221 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003222 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003223
3224 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003225 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003226 txt._CHUNK_SIZE = 4
3227
3228 reads = txt.read(4)
3229 pos = txt.tell()
3230 txt.seek(0)
3231 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003232 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003233
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003234 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003235 buffer = self.BytesIO(self.testdata)
3236 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003237
3238 self.assertEqual(buffer.seekable(), txt.seekable())
3239
Antoine Pitroue4501852009-05-14 18:55:55 +00003240 def test_append_bom(self):
3241 # The BOM is not written again when appending to a non-empty file
Hai Shi883bc632020-07-06 17:12:49 +08003242 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003243 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3244 with self.open(filename, 'w', encoding=charset) as f:
3245 f.write('aaa')
3246 pos = f.tell()
3247 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003248 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003249
3250 with self.open(filename, 'a', encoding=charset) as f:
3251 f.write('xxx')
3252 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003253 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003254
3255 def test_seek_bom(self):
3256 # Same test, but when seeking manually
Hai Shi883bc632020-07-06 17:12:49 +08003257 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003258 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3259 with self.open(filename, 'w', encoding=charset) as f:
3260 f.write('aaa')
3261 pos = f.tell()
3262 with self.open(filename, 'r+', encoding=charset) as f:
3263 f.seek(pos)
3264 f.write('zzz')
3265 f.seek(0)
3266 f.write('bbb')
3267 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003268 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003269
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003270 def test_seek_append_bom(self):
3271 # Same test, but first seek to the start and then to the end
Hai Shi883bc632020-07-06 17:12:49 +08003272 filename = os_helper.TESTFN
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003273 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3274 with self.open(filename, 'w', encoding=charset) as f:
3275 f.write('aaa')
3276 with self.open(filename, 'a', encoding=charset) as f:
3277 f.seek(0)
3278 f.seek(0, self.SEEK_END)
3279 f.write('xxx')
3280 with self.open(filename, 'rb') as f:
3281 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3282
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003283 def test_errors_property(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003284 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003285 self.assertEqual(f.errors, "strict")
Inada Naoki58cffba2021-04-01 11:25:04 +09003286 with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003287 self.assertEqual(f.errors, "replace")
3288
Brett Cannon31f59292011-02-21 19:29:56 +00003289 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003290 def test_threads_write(self):
3291 # Issue6750: concurrent writes could duplicate data
3292 event = threading.Event()
Inada Naoki58cffba2021-04-01 11:25:04 +09003293 with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003294 def run(n):
3295 text = "Thread%03d\n" % n
3296 event.wait()
3297 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003298 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003299 for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08003300 with threading_helper.start_threads(threads, event.set):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003301 time.sleep(0.02)
Inada Naoki58cffba2021-04-01 11:25:04 +09003302 with self.open(os_helper.TESTFN, encoding="utf-8") as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003303 content = f.read()
3304 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003305 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003306
Antoine Pitrou6be88762010-05-03 16:48:20 +00003307 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003308 # Test that text file is closed despite failed flush
3309 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003310 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003311 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003312 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003313 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003314 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003315 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003316 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003317 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003318 self.assertTrue(txt.buffer.closed)
3319 self.assertTrue(closed) # flush() called
3320 self.assertFalse(closed[0]) # flush() called before file closed
3321 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003322 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003323
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003324 def test_close_error_on_close(self):
3325 buffer = self.BytesIO(self.testdata)
3326 def bad_flush():
3327 raise OSError('flush')
3328 def bad_close():
3329 raise OSError('close')
3330 buffer.close = bad_close
3331 txt = self.TextIOWrapper(buffer, encoding="ascii")
3332 txt.flush = bad_flush
3333 with self.assertRaises(OSError) as err: # exception not swallowed
3334 txt.close()
3335 self.assertEqual(err.exception.args, ('close',))
3336 self.assertIsInstance(err.exception.__context__, OSError)
3337 self.assertEqual(err.exception.__context__.args, ('flush',))
3338 self.assertFalse(txt.closed)
3339
Victor Stinner472f7942019-04-12 21:58:24 +02003340 # Silence destructor error
3341 buffer.close = lambda: None
3342 txt.flush = lambda: None
3343
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003344 def test_nonnormalized_close_error_on_close(self):
3345 # Issue #21677
3346 buffer = self.BytesIO(self.testdata)
3347 def bad_flush():
3348 raise non_existing_flush
3349 def bad_close():
3350 raise non_existing_close
3351 buffer.close = bad_close
3352 txt = self.TextIOWrapper(buffer, encoding="ascii")
3353 txt.flush = bad_flush
3354 with self.assertRaises(NameError) as err: # exception not swallowed
3355 txt.close()
3356 self.assertIn('non_existing_close', str(err.exception))
3357 self.assertIsInstance(err.exception.__context__, NameError)
3358 self.assertIn('non_existing_flush', str(err.exception.__context__))
3359 self.assertFalse(txt.closed)
3360
Victor Stinner472f7942019-04-12 21:58:24 +02003361 # Silence destructor error
3362 buffer.close = lambda: None
3363 txt.flush = lambda: None
3364
Antoine Pitrou6be88762010-05-03 16:48:20 +00003365 def test_multi_close(self):
3366 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3367 txt.close()
3368 txt.close()
3369 txt.close()
3370 self.assertRaises(ValueError, txt.flush)
3371
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003372 def test_unseekable(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003373 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003374 self.assertRaises(self.UnsupportedOperation, txt.tell)
3375 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3376
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003377 def test_readonly_attributes(self):
3378 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3379 buf = self.BytesIO(self.testdata)
3380 with self.assertRaises(AttributeError):
3381 txt.buffer = buf
3382
Antoine Pitroue96ec682011-07-23 21:46:35 +02003383 def test_rawio(self):
3384 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3385 # that subprocess.Popen() can have the required unbuffered
3386 # semantics with universal_newlines=True.
3387 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3388 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3389 # Reads
3390 self.assertEqual(txt.read(4), 'abcd')
3391 self.assertEqual(txt.readline(), 'efghi\n')
3392 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3393
3394 def test_rawio_write_through(self):
3395 # Issue #12591: with write_through=True, writes don't need a flush
3396 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3397 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3398 write_through=True)
3399 txt.write('1')
3400 txt.write('23\n4')
3401 txt.write('5')
3402 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3403
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003404 def test_bufio_write_through(self):
3405 # Issue #21396: write_through=True doesn't force a flush()
3406 # on the underlying binary buffered object.
3407 flush_called, write_called = [], []
3408 class BufferedWriter(self.BufferedWriter):
3409 def flush(self, *args, **kwargs):
3410 flush_called.append(True)
3411 return super().flush(*args, **kwargs)
3412 def write(self, *args, **kwargs):
3413 write_called.append(True)
3414 return super().write(*args, **kwargs)
3415
3416 rawio = self.BytesIO()
3417 data = b"a"
3418 bufio = BufferedWriter(rawio, len(data)*2)
3419 textio = self.TextIOWrapper(bufio, encoding='ascii',
3420 write_through=True)
3421 # write to the buffered io but don't overflow the buffer
3422 text = data.decode('ascii')
3423 textio.write(text)
3424
3425 # buffer.flush is not called with write_through=True
3426 self.assertFalse(flush_called)
3427 # buffer.write *is* called with write_through=True
3428 self.assertTrue(write_called)
3429 self.assertEqual(rawio.getvalue(), b"") # no flush
3430
3431 write_called = [] # reset
3432 textio.write(text * 10) # total content is larger than bufio buffer
3433 self.assertTrue(write_called)
3434 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3435
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003436 def test_reconfigure_write_through(self):
3437 raw = self.MockRawIO([])
3438 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3439 t.write('1')
3440 t.reconfigure(write_through=True) # implied flush
3441 self.assertEqual(t.write_through, True)
3442 self.assertEqual(b''.join(raw._write_stack), b'1')
3443 t.write('23')
3444 self.assertEqual(b''.join(raw._write_stack), b'123')
3445 t.reconfigure(write_through=False)
3446 self.assertEqual(t.write_through, False)
3447 t.write('45')
3448 t.flush()
3449 self.assertEqual(b''.join(raw._write_stack), b'12345')
3450 # Keeping default value
3451 t.reconfigure()
3452 t.reconfigure(write_through=None)
3453 self.assertEqual(t.write_through, False)
3454 t.reconfigure(write_through=True)
3455 t.reconfigure()
3456 t.reconfigure(write_through=None)
3457 self.assertEqual(t.write_through, True)
3458
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003459 def test_read_nonbytes(self):
3460 # Issue #17106
3461 # Crash when underlying read() returns non-bytes
Inada Naoki58cffba2021-04-01 11:25:04 +09003462 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003463 self.assertRaises(TypeError, t.read, 1)
Inada Naoki58cffba2021-04-01 11:25:04 +09003464 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003465 self.assertRaises(TypeError, t.readline)
Inada Naoki58cffba2021-04-01 11:25:04 +09003466 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003467 self.assertRaises(TypeError, t.read)
3468
Oren Milmana5b4ea12017-08-25 21:14:54 +03003469 def test_illegal_encoder(self):
3470 # Issue 31271: Calling write() while the return value of encoder's
3471 # encode() is invalid shouldn't cause an assertion failure.
3472 rot13 = codecs.lookup("rot13")
3473 with support.swap_attr(rot13, '_is_text_encoding', True):
3474 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3475 self.assertRaises(TypeError, t.write, 'bar')
3476
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003477 def test_illegal_decoder(self):
3478 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003479 # Bypass the early encoding check added in issue 20404
3480 def _make_illegal_wrapper():
3481 quopri = codecs.lookup("quopri")
3482 quopri._is_text_encoding = True
3483 try:
3484 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3485 newline='\n', encoding="quopri")
3486 finally:
3487 quopri._is_text_encoding = False
3488 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003489 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003490 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003491 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003492 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003493 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003494 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003495 self.assertRaises(TypeError, t.read)
3496
Oren Milmanba7d7362017-08-29 11:58:27 +03003497 # Issue 31243: calling read() while the return value of decoder's
3498 # getstate() is invalid should neither crash the interpreter nor
3499 # raise a SystemError.
3500 def _make_very_illegal_wrapper(getstate_ret_val):
3501 class BadDecoder:
3502 def getstate(self):
3503 return getstate_ret_val
3504 def _get_bad_decoder(dummy):
3505 return BadDecoder()
3506 quopri = codecs.lookup("quopri")
3507 with support.swap_attr(quopri, 'incrementaldecoder',
3508 _get_bad_decoder):
3509 return _make_illegal_wrapper()
3510 t = _make_very_illegal_wrapper(42)
3511 self.assertRaises(TypeError, t.read, 42)
3512 t = _make_very_illegal_wrapper(())
3513 self.assertRaises(TypeError, t.read, 42)
3514 t = _make_very_illegal_wrapper((1, 2))
3515 self.assertRaises(TypeError, t.read, 42)
3516
Antoine Pitrou712cb732013-12-21 15:51:54 +01003517 def _check_create_at_shutdown(self, **kwargs):
3518 # Issue #20037: creating a TextIOWrapper at shutdown
3519 # shouldn't crash the interpreter.
3520 iomod = self.io.__name__
3521 code = """if 1:
3522 import codecs
3523 import {iomod} as io
3524
3525 # Avoid looking up codecs at shutdown
3526 codecs.lookup('utf-8')
3527
3528 class C:
3529 def __init__(self):
3530 self.buf = io.BytesIO()
3531 def __del__(self):
3532 io.TextIOWrapper(self.buf, **{kwargs})
3533 print("ok")
3534 c = C()
3535 """.format(iomod=iomod, kwargs=kwargs)
3536 return assert_python_ok("-c", code)
3537
3538 def test_create_at_shutdown_without_encoding(self):
3539 rc, out, err = self._check_create_at_shutdown()
3540 if err:
3541 # Can error out with a RuntimeError if the module state
3542 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003543 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003544 else:
3545 self.assertEqual("ok", out.decode().strip())
3546
3547 def test_create_at_shutdown_with_encoding(self):
3548 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3549 errors='strict')
3550 self.assertFalse(err)
3551 self.assertEqual("ok", out.decode().strip())
3552
Antoine Pitroub8503892014-04-29 10:14:02 +02003553 def test_read_byteslike(self):
3554 r = MemviewBytesIO(b'Just some random string\n')
3555 t = self.TextIOWrapper(r, 'utf-8')
3556
3557 # TextIOwrapper will not read the full string, because
3558 # we truncate it to a multiple of the native int size
3559 # so that we can construct a more complex memoryview.
3560 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3561
3562 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3563
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003564 def test_issue22849(self):
3565 class F(object):
3566 def readable(self): return True
3567 def writable(self): return True
3568 def seekable(self): return True
3569
3570 for i in range(10):
3571 try:
3572 self.TextIOWrapper(F(), encoding='utf-8')
3573 except Exception:
3574 pass
3575
3576 F.tell = lambda x: 0
3577 t = self.TextIOWrapper(F(), encoding='utf-8')
3578
INADA Naoki507434f2017-12-21 09:59:53 +09003579 def test_reconfigure_encoding_read(self):
3580 # latin1 -> utf8
3581 # (latin1 can decode utf-8 encoded string)
3582 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3583 raw = self.BytesIO(data)
3584 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3585 self.assertEqual(txt.readline(), 'abc\xe9\n')
3586 with self.assertRaises(self.UnsupportedOperation):
3587 txt.reconfigure(encoding='utf-8')
3588 with self.assertRaises(self.UnsupportedOperation):
3589 txt.reconfigure(newline=None)
3590
3591 def test_reconfigure_write_fromascii(self):
3592 # ascii has a specific encodefunc in the C implementation,
3593 # but utf-8-sig has not. Make sure that we get rid of the
3594 # cached encodefunc when we switch encoders.
3595 raw = self.BytesIO()
3596 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3597 txt.write('foo\n')
3598 txt.reconfigure(encoding='utf-8-sig')
3599 txt.write('\xe9\n')
3600 txt.flush()
3601 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3602
3603 def test_reconfigure_write(self):
3604 # latin -> utf8
3605 raw = self.BytesIO()
3606 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3607 txt.write('abc\xe9\n')
3608 txt.reconfigure(encoding='utf-8')
3609 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3610 txt.write('d\xe9f\n')
3611 txt.flush()
3612 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3613
3614 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3615 # the file
3616 raw = self.BytesIO()
3617 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3618 txt.write('abc\n')
3619 txt.reconfigure(encoding='utf-8-sig')
3620 txt.write('d\xe9f\n')
3621 txt.flush()
3622 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3623
3624 def test_reconfigure_write_non_seekable(self):
3625 raw = self.BytesIO()
3626 raw.seekable = lambda: False
3627 raw.seek = None
3628 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3629 txt.write('abc\n')
3630 txt.reconfigure(encoding='utf-8-sig')
3631 txt.write('d\xe9f\n')
3632 txt.flush()
3633
3634 # If the raw stream is not seekable, there'll be a BOM
3635 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3636
3637 def test_reconfigure_defaults(self):
3638 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3639 txt.reconfigure(encoding=None)
3640 self.assertEqual(txt.encoding, 'ascii')
3641 self.assertEqual(txt.errors, 'replace')
3642 txt.write('LF\n')
3643
3644 txt.reconfigure(newline='\r\n')
3645 self.assertEqual(txt.encoding, 'ascii')
3646 self.assertEqual(txt.errors, 'replace')
3647
3648 txt.reconfigure(errors='ignore')
3649 self.assertEqual(txt.encoding, 'ascii')
3650 self.assertEqual(txt.errors, 'ignore')
3651 txt.write('CRLF\n')
3652
3653 txt.reconfigure(encoding='utf-8', newline=None)
3654 self.assertEqual(txt.errors, 'strict')
3655 txt.seek(0)
3656 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3657
3658 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3659
3660 def test_reconfigure_newline(self):
3661 raw = self.BytesIO(b'CR\rEOF')
3662 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3663 txt.reconfigure(newline=None)
3664 self.assertEqual(txt.readline(), 'CR\n')
3665 raw = self.BytesIO(b'CR\rEOF')
3666 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3667 txt.reconfigure(newline='')
3668 self.assertEqual(txt.readline(), 'CR\r')
3669 raw = self.BytesIO(b'CR\rLF\nEOF')
3670 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3671 txt.reconfigure(newline='\n')
3672 self.assertEqual(txt.readline(), 'CR\rLF\n')
3673 raw = self.BytesIO(b'LF\nCR\rEOF')
3674 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3675 txt.reconfigure(newline='\r')
3676 self.assertEqual(txt.readline(), 'LF\nCR\r')
3677 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3678 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3679 txt.reconfigure(newline='\r\n')
3680 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3681
3682 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3683 txt.reconfigure(newline=None)
3684 txt.write('linesep\n')
3685 txt.reconfigure(newline='')
3686 txt.write('LF\n')
3687 txt.reconfigure(newline='\n')
3688 txt.write('LF\n')
3689 txt.reconfigure(newline='\r')
3690 txt.write('CR\n')
3691 txt.reconfigure(newline='\r\n')
3692 txt.write('CRLF\n')
3693 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3694 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3695
Zackery Spytz23db9352018-06-29 04:14:58 -06003696 def test_issue25862(self):
3697 # Assertion failures occurred in tell() after read() and write().
3698 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3699 t.read(1)
3700 t.read()
3701 t.tell()
3702 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3703 t.read(1)
3704 t.write('x')
3705 t.tell()
3706
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003707
Antoine Pitroub8503892014-04-29 10:14:02 +02003708class MemviewBytesIO(io.BytesIO):
3709 '''A BytesIO object whose read method returns memoryviews
3710 rather than bytes'''
3711
3712 def read1(self, len_):
3713 return _to_memoryview(super().read1(len_))
3714
3715 def read(self, len_):
3716 return _to_memoryview(super().read(len_))
3717
3718def _to_memoryview(buf):
3719 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3720
3721 arr = array.array('i')
3722 idx = len(buf) - len(buf) % arr.itemsize
3723 arr.frombytes(buf[:idx])
3724 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003725
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003726
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003727class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003728 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003729 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003730
3731 def test_initialization(self):
3732 r = self.BytesIO(b"\xc3\xa9\n\n")
3733 b = self.BufferedReader(r, 1000)
Inada Naoki58cffba2021-04-01 11:25:04 +09003734 t = self.TextIOWrapper(b, encoding="utf-8")
Inada Naokifb786922021-04-06 11:18:41 +09003735 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003736 self.assertRaises(ValueError, t.read)
3737
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003738 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3739 self.assertRaises(Exception, repr, t)
3740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003741 def test_garbage_collection(self):
3742 # C TextIOWrapper objects are collected, and collecting them flushes
3743 # all data to disk.
3744 # The Python version has __del__, so it ends in gc.garbage instead.
Hai Shi883bc632020-07-06 17:12:49 +08003745 with warnings_helper.check_warnings(('', ResourceWarning)):
3746 rawio = io.FileIO(os_helper.TESTFN, "wb")
Antoine Pitrou796564c2013-07-30 19:59:21 +02003747 b = self.BufferedWriter(rawio)
3748 t = self.TextIOWrapper(b, encoding="ascii")
3749 t.write("456def")
3750 t.x = t
3751 wr = weakref.ref(t)
3752 del t
3753 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003754 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08003755 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003756 self.assertEqual(f.read(), b"456def")
3757
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003758 def test_rwpair_cleared_before_textio(self):
3759 # Issue 13070: TextIOWrapper's finalization would crash when called
3760 # after the reference to the underlying BufferedRWPair's writer got
3761 # cleared by the GC.
3762 for i in range(1000):
3763 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3764 t1 = self.TextIOWrapper(b1, encoding="ascii")
3765 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3766 t2 = self.TextIOWrapper(b2, encoding="ascii")
3767 # circular references
3768 t1.buddy = t2
3769 t2.buddy = t1
3770 support.gc_collect()
3771
Zackery Spytz842acaa2018-12-17 07:52:45 -07003772 def test_del__CHUNK_SIZE_SystemError(self):
3773 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3774 with self.assertRaises(AttributeError):
3775 del t._CHUNK_SIZE
3776
Inada Naoki01806d52021-02-22 08:29:30 +09003777 def test_internal_buffer_size(self):
3778 # bpo-43260: TextIOWrapper's internal buffer should not store
3779 # data larger than chunk size.
3780 chunk_size = 8192 # default chunk size, updated later
3781
3782 class MockIO(self.MockRawIO):
3783 def write(self, data):
3784 if len(data) > chunk_size:
3785 raise RuntimeError
3786 return super().write(data)
3787
3788 buf = MockIO()
3789 t = self.TextIOWrapper(buf, encoding="ascii")
3790 chunk_size = t._CHUNK_SIZE
3791 t.write("abc")
3792 t.write("def")
3793 # default chunk size is 8192 bytes so t don't write data to buf.
3794 self.assertEqual([], buf._write_stack)
3795
3796 with self.assertRaises(RuntimeError):
3797 t.write("x"*(chunk_size+1))
3798
3799 self.assertEqual([b"abcdef"], buf._write_stack)
3800 t.write("ghi")
3801 t.write("x"*chunk_size)
3802 self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
3803
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003804
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003805class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003806 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003807 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003808
3809
3810class IncrementalNewlineDecoderTest(unittest.TestCase):
3811
3812 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003813 # UTF-8 specific tests for a newline decoder
3814 def _check_decode(b, s, **kwargs):
3815 # We exercise getstate() / setstate() as well as decode()
3816 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003817 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003818 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003819 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003820
Antoine Pitrou180a3362008-12-14 16:36:46 +00003821 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003822
Antoine Pitrou180a3362008-12-14 16:36:46 +00003823 _check_decode(b'\xe8', "")
3824 _check_decode(b'\xa2', "")
3825 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003826
Antoine Pitrou180a3362008-12-14 16:36:46 +00003827 _check_decode(b'\xe8', "")
3828 _check_decode(b'\xa2', "")
3829 _check_decode(b'\x88', "\u8888")
3830
3831 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003832 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3833
Antoine Pitrou180a3362008-12-14 16:36:46 +00003834 decoder.reset()
3835 _check_decode(b'\n', "\n")
3836 _check_decode(b'\r', "")
3837 _check_decode(b'', "\n", final=True)
3838 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003839
Antoine Pitrou180a3362008-12-14 16:36:46 +00003840 _check_decode(b'\r', "")
3841 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003842
Antoine Pitrou180a3362008-12-14 16:36:46 +00003843 _check_decode(b'\r\r\n', "\n\n")
3844 _check_decode(b'\r', "")
3845 _check_decode(b'\r', "\n")
3846 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003847
Antoine Pitrou180a3362008-12-14 16:36:46 +00003848 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3849 _check_decode(b'\xe8\xa2\x88', "\u8888")
3850 _check_decode(b'\n', "\n")
3851 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3852 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003853
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003854 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003855 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003856 if encoding is not None:
3857 encoder = codecs.getincrementalencoder(encoding)()
3858 def _decode_bytewise(s):
3859 # Decode one byte at a time
3860 for b in encoder.encode(s):
3861 result.append(decoder.decode(bytes([b])))
3862 else:
3863 encoder = None
3864 def _decode_bytewise(s):
3865 # Decode one char at a time
3866 for c in s:
3867 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003868 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003869 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003870 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003871 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003872 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003873 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003874 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003875 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003876 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003877 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003878 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003879 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003880 input = "abc"
3881 if encoder is not None:
3882 encoder.reset()
3883 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003884 self.assertEqual(decoder.decode(input), "abc")
3885 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003886
3887 def test_newline_decoder(self):
3888 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003889 # None meaning the IncrementalNewlineDecoder takes unicode input
3890 # rather than bytes input
3891 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003892 'utf-16', 'utf-16-le', 'utf-16-be',
3893 'utf-32', 'utf-32-le', 'utf-32-be',
3894 )
3895 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003896 decoder = enc and codecs.getincrementaldecoder(enc)()
3897 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3898 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003899 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003900 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3901 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003902 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003903
Antoine Pitrou66913e22009-03-06 23:40:56 +00003904 def test_newline_bytes(self):
3905 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3906 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003907 self.assertEqual(dec.newlines, None)
3908 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3909 self.assertEqual(dec.newlines, None)
3910 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3911 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003912 dec = self.IncrementalNewlineDecoder(None, translate=False)
3913 _check(dec)
3914 dec = self.IncrementalNewlineDecoder(None, translate=True)
3915 _check(dec)
3916
Xiang Zhangb08746b2018-10-31 19:49:16 +08003917 def test_translate(self):
3918 # issue 35062
3919 for translate in (-2, -1, 1, 2):
3920 decoder = codecs.getincrementaldecoder("utf-8")()
3921 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3922 self.check_newline_decoding_utf8(decoder)
3923 decoder = codecs.getincrementaldecoder("utf-8")()
3924 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3925 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3926
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003927class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3928 pass
3929
3930class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3931 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003932
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003933
Guido van Rossum01a27522007-03-07 01:00:12 +00003934# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003935
Guido van Rossum5abbf752007-08-27 17:39:33 +00003936class MiscIOTest(unittest.TestCase):
3937
Barry Warsaw40e82462008-11-20 20:14:50 +00003938 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08003939 os_helper.unlink(os_helper.TESTFN)
Barry Warsaw40e82462008-11-20 20:14:50 +00003940
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003941 def test___all__(self):
3942 for name in self.io.__all__:
3943 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003944 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003945 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003946 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003947 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003948 self.assertTrue(issubclass(obj, Exception), name)
3949 elif not name.startswith("SEEK_"):
3950 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003951
Barry Warsaw40e82462008-11-20 20:14:50 +00003952 def test_attributes(self):
Hai Shi883bc632020-07-06 17:12:49 +08003953 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003954 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003955 f.close()
3956
Hai Shi883bc632020-07-06 17:12:49 +08003957 with warnings_helper.check_warnings(('', DeprecationWarning)):
Inada Naoki58cffba2021-04-01 11:25:04 +09003958 f = self.open(os_helper.TESTFN, "U", encoding="utf-8")
Hai Shi883bc632020-07-06 17:12:49 +08003959 self.assertEqual(f.name, os_helper.TESTFN)
3960 self.assertEqual(f.buffer.name, os_helper.TESTFN)
3961 self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
Victor Stinner942f7a22020-03-04 18:50:22 +01003962 self.assertEqual(f.mode, "U")
3963 self.assertEqual(f.buffer.mode, "rb")
3964 self.assertEqual(f.buffer.raw.mode, "rb")
3965 f.close()
3966
Inada Naoki58cffba2021-04-01 11:25:04 +09003967 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003968 self.assertEqual(f.mode, "w+")
3969 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3970 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003972 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003973 self.assertEqual(g.mode, "wb")
3974 self.assertEqual(g.raw.mode, "wb")
3975 self.assertEqual(g.name, f.fileno())
3976 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003977 f.close()
3978 g.close()
3979
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003980 def test_open_pipe_with_append(self):
3981 # bpo-27805: Ignore ESPIPE from lseek() in open().
3982 r, w = os.pipe()
3983 self.addCleanup(os.close, r)
Inada Naoki58cffba2021-04-01 11:25:04 +09003984 f = self.open(w, 'a', encoding="utf-8")
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003985 self.addCleanup(f.close)
3986 # Check that the file is marked non-seekable. On Windows, however, lseek
3987 # somehow succeeds on pipes.
3988 if sys.platform != 'win32':
3989 self.assertFalse(f.seekable())
3990
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003991 def test_io_after_close(self):
3992 for kwargs in [
3993 {"mode": "w"},
3994 {"mode": "wb"},
3995 {"mode": "w", "buffering": 1},
3996 {"mode": "w", "buffering": 2},
3997 {"mode": "wb", "buffering": 0},
3998 {"mode": "r"},
3999 {"mode": "rb"},
4000 {"mode": "r", "buffering": 1},
4001 {"mode": "r", "buffering": 2},
4002 {"mode": "rb", "buffering": 0},
4003 {"mode": "w+"},
4004 {"mode": "w+b"},
4005 {"mode": "w+", "buffering": 1},
4006 {"mode": "w+", "buffering": 2},
4007 {"mode": "w+b", "buffering": 0},
4008 ]:
Inada Naoki58cffba2021-04-01 11:25:04 +09004009 if "b" not in kwargs["mode"]:
4010 kwargs["encoding"] = "utf-8"
Hai Shi883bc632020-07-06 17:12:49 +08004011 f = self.open(os_helper.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004012 f.close()
4013 self.assertRaises(ValueError, f.flush)
4014 self.assertRaises(ValueError, f.fileno)
4015 self.assertRaises(ValueError, f.isatty)
4016 self.assertRaises(ValueError, f.__iter__)
4017 if hasattr(f, "peek"):
4018 self.assertRaises(ValueError, f.peek, 1)
4019 self.assertRaises(ValueError, f.read)
4020 if hasattr(f, "read1"):
4021 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00004022 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02004023 if hasattr(f, "readall"):
4024 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004025 if hasattr(f, "readinto"):
4026 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07004027 if hasattr(f, "readinto1"):
4028 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004029 self.assertRaises(ValueError, f.readline)
4030 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08004031 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004032 self.assertRaises(ValueError, f.seek, 0)
4033 self.assertRaises(ValueError, f.tell)
4034 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004035 self.assertRaises(ValueError, f.write,
4036 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004037 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004038 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004040 def test_blockingioerror(self):
4041 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004042 class C(str):
4043 pass
4044 c = C("")
4045 b = self.BlockingIOError(1, c)
4046 c.b = b
4047 b.c = c
4048 wr = weakref.ref(c)
4049 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00004050 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004051 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004052
4053 def test_abcs(self):
4054 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00004055 self.assertIsInstance(self.IOBase, abc.ABCMeta)
4056 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4057 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4058 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004059
4060 def _check_abc_inheritance(self, abcmodule):
Hai Shi883bc632020-07-06 17:12:49 +08004061 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004062 self.assertIsInstance(f, abcmodule.IOBase)
4063 self.assertIsInstance(f, abcmodule.RawIOBase)
4064 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4065 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004066 with self.open(os_helper.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004067 self.assertIsInstance(f, abcmodule.IOBase)
4068 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4069 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4070 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Inada Naoki58cffba2021-04-01 11:25:04 +09004071 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004072 self.assertIsInstance(f, abcmodule.IOBase)
4073 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4074 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4075 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004076
4077 def test_abc_inheritance(self):
4078 # Test implementations inherit from their respective ABCs
4079 self._check_abc_inheritance(self)
4080
4081 def test_abc_inheritance_official(self):
4082 # Test implementations inherit from the official ABCs of the
4083 # baseline "io" module.
4084 self._check_abc_inheritance(io)
4085
Antoine Pitroue033e062010-10-29 10:38:18 +00004086 def _check_warn_on_dealloc(self, *args, **kwargs):
4087 f = open(*args, **kwargs)
4088 r = repr(f)
4089 with self.assertWarns(ResourceWarning) as cm:
4090 f = None
4091 support.gc_collect()
4092 self.assertIn(r, str(cm.warning.args[0]))
4093
4094 def test_warn_on_dealloc(self):
Hai Shi883bc632020-07-06 17:12:49 +08004095 self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4096 self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
Inada Naoki58cffba2021-04-01 11:25:04 +09004097 self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8")
Antoine Pitroue033e062010-10-29 10:38:18 +00004098
4099 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4100 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004101 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004102 for fd in fds:
4103 try:
4104 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004105 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004106 if e.errno != errno.EBADF:
4107 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004108 self.addCleanup(cleanup_fds)
4109 r, w = os.pipe()
4110 fds += r, w
4111 self._check_warn_on_dealloc(r, *args, **kwargs)
4112 # When using closefd=False, there's no warning
4113 r, w = os.pipe()
4114 fds += r, w
Hai Shi883bc632020-07-06 17:12:49 +08004115 with warnings_helper.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004116 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004117
4118 def test_warn_on_dealloc_fd(self):
4119 self._check_warn_on_dealloc_fd("rb", buffering=0)
4120 self._check_warn_on_dealloc_fd("rb")
Inada Naoki58cffba2021-04-01 11:25:04 +09004121 self._check_warn_on_dealloc_fd("r", encoding="utf-8")
Antoine Pitroue033e062010-10-29 10:38:18 +00004122
4123
Antoine Pitrou243757e2010-11-05 21:15:39 +00004124 def test_pickling(self):
4125 # Pickling file objects is forbidden
4126 for kwargs in [
4127 {"mode": "w"},
4128 {"mode": "wb"},
4129 {"mode": "wb", "buffering": 0},
4130 {"mode": "r"},
4131 {"mode": "rb"},
4132 {"mode": "rb", "buffering": 0},
4133 {"mode": "w+"},
4134 {"mode": "w+b"},
4135 {"mode": "w+b", "buffering": 0},
4136 ]:
Inada Naoki58cffba2021-04-01 11:25:04 +09004137 if "b" not in kwargs["mode"]:
4138 kwargs["encoding"] = "utf-8"
Antoine Pitrou243757e2010-11-05 21:15:39 +00004139 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
Hai Shi883bc632020-07-06 17:12:49 +08004140 with self.open(os_helper.TESTFN, **kwargs) as f:
Antoine Pitrou243757e2010-11-05 21:15:39 +00004141 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4142
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004143 def test_nonblock_pipe_write_bigbuf(self):
4144 self._test_nonblock_pipe_write(16*1024)
4145
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004146 def test_nonblock_pipe_write_smallbuf(self):
4147 self._test_nonblock_pipe_write(1024)
4148
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004149 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4150 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004151 def _test_nonblock_pipe_write(self, bufsize):
4152 sent = []
4153 received = []
4154 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004155 os.set_blocking(r, False)
4156 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004157
4158 # To exercise all code paths in the C implementation we need
4159 # to play with buffer sizes. For instance, if we choose a
4160 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4161 # then we will never get a partial write of the buffer.
4162 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4163 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4164
4165 with rf, wf:
4166 for N in 9999, 73, 7574:
4167 try:
4168 i = 0
4169 while True:
4170 msg = bytes([i % 26 + 97]) * N
4171 sent.append(msg)
4172 wf.write(msg)
4173 i += 1
4174
4175 except self.BlockingIOError as e:
4176 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004177 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004178 sent[-1] = sent[-1][:e.characters_written]
4179 received.append(rf.read())
4180 msg = b'BLOCKED'
4181 wf.write(msg)
4182 sent.append(msg)
4183
4184 while True:
4185 try:
4186 wf.flush()
4187 break
4188 except self.BlockingIOError as e:
4189 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004190 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004191 self.assertEqual(e.characters_written, 0)
4192 received.append(rf.read())
4193
4194 received += iter(rf.read, None)
4195
4196 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004197 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004198 self.assertTrue(wf.closed)
4199 self.assertTrue(rf.closed)
4200
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004201 def test_create_fail(self):
4202 # 'x' mode fails if file is existing
Inada Naoki58cffba2021-04-01 11:25:04 +09004203 with self.open(os_helper.TESTFN, 'w', encoding="utf-8"):
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004204 pass
Inada Naoki58cffba2021-04-01 11:25:04 +09004205 self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8")
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004206
4207 def test_create_writes(self):
4208 # 'x' mode opens for writing
Hai Shi883bc632020-07-06 17:12:49 +08004209 with self.open(os_helper.TESTFN, 'xb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004210 f.write(b"spam")
Hai Shi883bc632020-07-06 17:12:49 +08004211 with self.open(os_helper.TESTFN, 'rb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004212 self.assertEqual(b"spam", f.read())
4213
Christian Heimes7b648752012-09-10 14:48:43 +02004214 def test_open_allargs(self):
4215 # there used to be a buffer overflow in the parser for rawmode
Inada Naoki58cffba2021-04-01 11:25:04 +09004216 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8")
Christian Heimes7b648752012-09-10 14:48:43 +02004217
Victor Stinner22eb6892019-06-26 00:51:05 +02004218 def test_check_encoding_errors(self):
4219 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4220 # arguments in dev mode
4221 mod = self.io.__name__
4222 filename = __file__
4223 invalid = 'Boom, Shaka Laka, Boom!'
4224 code = textwrap.dedent(f'''
4225 import sys
4226 from {mod} import open, TextIOWrapper
4227
4228 try:
4229 open({filename!r}, encoding={invalid!r})
4230 except LookupError:
4231 pass
4232 else:
4233 sys.exit(21)
4234
4235 try:
4236 open({filename!r}, errors={invalid!r})
4237 except LookupError:
4238 pass
4239 else:
4240 sys.exit(22)
4241
4242 fp = open({filename!r}, "rb")
4243 with fp:
4244 try:
4245 TextIOWrapper(fp, encoding={invalid!r})
4246 except LookupError:
4247 pass
4248 else:
4249 sys.exit(23)
4250
4251 try:
4252 TextIOWrapper(fp, errors={invalid!r})
4253 except LookupError:
4254 pass
4255 else:
4256 sys.exit(24)
4257
4258 sys.exit(10)
4259 ''')
4260 proc = assert_python_failure('-X', 'dev', '-c', code)
4261 self.assertEqual(proc.rc, 10, proc)
4262
Inada Naoki48274832021-03-29 12:28:14 +09004263 def test_check_encoding_warning(self):
4264 # PEP 597: Raise warning when encoding is not specified
4265 # and sys.flags.warn_default_encoding is set.
4266 mod = self.io.__name__
4267 filename = __file__
4268 code = textwrap.dedent(f'''\
4269 import sys
4270 from {mod} import open, TextIOWrapper
4271 import pathlib
4272
4273 with open({filename!r}) as f: # line 5
4274 pass
4275
4276 pathlib.Path({filename!r}).read_text() # line 8
4277 ''')
4278 proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
4279 warnings = proc.err.splitlines()
4280 self.assertEqual(len(warnings), 2)
4281 self.assertTrue(
4282 warnings[0].startswith(b"<string>:5: EncodingWarning: "))
4283 self.assertTrue(
4284 warnings[1].startswith(b"<string>:8: EncodingWarning: "))
4285
Christian Heimes7b648752012-09-10 14:48:43 +02004286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004287class CMiscIOTest(MiscIOTest):
4288 io = io
4289
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004290 def test_readinto_buffer_overflow(self):
4291 # Issue #18025
4292 class BadReader(self.io.BufferedIOBase):
4293 def read(self, n=-1):
4294 return b'x' * 10**6
4295 bufio = BadReader()
4296 b = bytearray(2)
4297 self.assertRaises(ValueError, bufio.readinto, b)
4298
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004299 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4300 # Issue #23309: deadlocks at shutdown should be avoided when a
4301 # daemon thread and the main thread both write to a file.
4302 code = """if 1:
4303 import sys
4304 import time
4305 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004306 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004307
4308 file = sys.{stream_name}
4309
4310 def run():
4311 while True:
4312 file.write('.')
4313 file.flush()
4314
Victor Stinner2a1aed02017-04-21 17:59:23 +02004315 crash = SuppressCrashReport()
4316 crash.__enter__()
4317 # don't call __exit__(): the crash occurs at Python shutdown
4318
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004319 thread = threading.Thread(target=run)
4320 thread.daemon = True
4321 thread.start()
4322
4323 time.sleep(0.5)
4324 file.write('!')
4325 file.flush()
4326 """.format_map(locals())
4327 res, _ = run_python_until_end("-c", code)
4328 err = res.err.decode()
4329 if res.rc != 0:
4330 # Failure: should be a fatal error
Victor Stinner9e5d30c2020-03-07 00:54:20 +01004331 pattern = (r"Fatal Python error: _enter_buffered_busy: "
4332 r"could not acquire lock "
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004333 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4334 r"at interpreter shutdown, possibly due to "
4335 r"daemon threads".format_map(locals()))
4336 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004337 else:
4338 self.assertFalse(err.strip('.!'))
4339
4340 def test_daemon_threads_shutdown_stdout_deadlock(self):
4341 self.check_daemon_threads_shutdown_deadlock('stdout')
4342
4343 def test_daemon_threads_shutdown_stderr_deadlock(self):
4344 self.check_daemon_threads_shutdown_deadlock('stderr')
4345
4346
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004347class PyMiscIOTest(MiscIOTest):
4348 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004349
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004350
4351@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4352class SignalsTest(unittest.TestCase):
4353
4354 def setUp(self):
4355 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4356
4357 def tearDown(self):
4358 signal.signal(signal.SIGALRM, self.oldalrm)
4359
4360 def alarm_interrupt(self, sig, frame):
4361 1/0
4362
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004363 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4364 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004365 invokes the signal handler, and bubbles up the exception raised
4366 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004367 read_results = []
4368 def _read():
4369 s = os.read(r, 1)
4370 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004371
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004372 t = threading.Thread(target=_read)
4373 t.daemon = True
4374 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004375 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004376 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004377 try:
4378 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004379 if hasattr(signal, 'pthread_sigmask'):
4380 # create the thread with SIGALRM signal blocked
4381 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4382 t.start()
4383 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4384 else:
4385 t.start()
4386
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004387 # Fill the pipe enough that the write will be blocking.
4388 # It will be interrupted by the timer armed above. Since the
4389 # other thread has read one byte, the low-level write will
4390 # return with a successful (partial) result rather than an EINTR.
4391 # The buffered IO layer must check for pending signal
4392 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004393 signal.alarm(1)
4394 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004395 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004396 finally:
4397 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004398 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004399 # We got one byte, get another one and check that it isn't a
4400 # repeat of the first one.
4401 read_results.append(os.read(r, 1))
4402 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4403 finally:
4404 os.close(w)
4405 os.close(r)
4406 # This is deliberate. If we didn't close the file descriptor
4407 # before closing wio, wio would try to flush its internal
4408 # buffer, and block again.
4409 try:
4410 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004411 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004412 if e.errno != errno.EBADF:
4413 raise
4414
4415 def test_interrupted_write_unbuffered(self):
4416 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4417
4418 def test_interrupted_write_buffered(self):
4419 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4420
4421 def test_interrupted_write_text(self):
4422 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4423
Brett Cannon31f59292011-02-21 19:29:56 +00004424 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004425 def check_reentrant_write(self, data, **fdopen_kwargs):
4426 def on_alarm(*args):
4427 # Will be called reentrantly from the same thread
4428 wio.write(data)
4429 1/0
4430 signal.signal(signal.SIGALRM, on_alarm)
4431 r, w = os.pipe()
4432 wio = self.io.open(w, **fdopen_kwargs)
4433 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004434 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004435 # Either the reentrant call to wio.write() fails with RuntimeError,
4436 # or the signal handler raises ZeroDivisionError.
4437 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4438 while 1:
4439 for i in range(100):
4440 wio.write(data)
4441 wio.flush()
4442 # Make sure the buffer doesn't fill up and block further writes
4443 os.read(r, len(data) * 100)
4444 exc = cm.exception
4445 if isinstance(exc, RuntimeError):
4446 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4447 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004448 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004449 wio.close()
4450 os.close(r)
4451
4452 def test_reentrant_write_buffered(self):
4453 self.check_reentrant_write(b"xy", mode="wb")
4454
4455 def test_reentrant_write_text(self):
4456 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4457
Antoine Pitrou707ce822011-02-25 21:24:11 +00004458 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4459 """Check that a buffered read, when it gets interrupted (either
4460 returning a partial result or EINTR), properly invokes the signal
4461 handler and retries if the latter returned successfully."""
4462 r, w = os.pipe()
4463 fdopen_kwargs["closefd"] = False
4464 def alarm_handler(sig, frame):
4465 os.write(w, b"bar")
4466 signal.signal(signal.SIGALRM, alarm_handler)
4467 try:
4468 rio = self.io.open(r, **fdopen_kwargs)
4469 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004470 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004471 # Expected behaviour:
4472 # - first raw read() returns partial b"foo"
4473 # - second raw read() returns EINTR
4474 # - third raw read() returns b"bar"
4475 self.assertEqual(decode(rio.read(6)), "foobar")
4476 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004477 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004478 rio.close()
4479 os.close(w)
4480 os.close(r)
4481
Antoine Pitrou20db5112011-08-19 20:32:34 +02004482 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004483 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4484 mode="rb")
4485
Antoine Pitrou20db5112011-08-19 20:32:34 +02004486 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004487 self.check_interrupted_read_retry(lambda x: x,
Inada Naoki58cffba2021-04-01 11:25:04 +09004488 mode="r", encoding="latin1")
Antoine Pitrou707ce822011-02-25 21:24:11 +00004489
Antoine Pitrou707ce822011-02-25 21:24:11 +00004490 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4491 """Check that a buffered write, when it gets interrupted (either
4492 returning a partial result or EINTR), properly invokes the signal
4493 handler and retries if the latter returned successfully."""
Hai Shi883bc632020-07-06 17:12:49 +08004494 select = import_helper.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004495
Antoine Pitrou707ce822011-02-25 21:24:11 +00004496 # A quantity that exceeds the buffer size of an anonymous pipe's
4497 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004498 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004499 r, w = os.pipe()
4500 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004501
Antoine Pitrou707ce822011-02-25 21:24:11 +00004502 # We need a separate thread to read from the pipe and allow the
4503 # write() to finish. This thread is started after the SIGALRM is
4504 # received (forcing a first EINTR in write()).
4505 read_results = []
4506 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004507 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004508 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004509 try:
4510 while not write_finished:
4511 while r in select.select([r], [], [], 1.0)[0]:
4512 s = os.read(r, 1024)
4513 read_results.append(s)
4514 except BaseException as exc:
4515 nonlocal error
4516 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004517 t = threading.Thread(target=_read)
4518 t.daemon = True
4519 def alarm1(sig, frame):
4520 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004521 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004522 def alarm2(sig, frame):
4523 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004524
4525 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004526 signal.signal(signal.SIGALRM, alarm1)
4527 try:
4528 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004529 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004530 # Expected behaviour:
4531 # - first raw write() is partial (because of the limited pipe buffer
4532 # and the first alarm)
4533 # - second raw write() returns EINTR (because of the second alarm)
4534 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004535 written = wio.write(large_data)
4536 self.assertEqual(N, written)
4537
Antoine Pitrou707ce822011-02-25 21:24:11 +00004538 wio.flush()
4539 write_finished = True
4540 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004541
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004542 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004543 self.assertEqual(N, sum(len(x) for x in read_results))
4544 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004545 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004546 write_finished = True
4547 os.close(w)
4548 os.close(r)
4549 # This is deliberate. If we didn't close the file descriptor
4550 # before closing wio, wio would try to flush its internal
4551 # buffer, and could block (in case of failure).
4552 try:
4553 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004554 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004555 if e.errno != errno.EBADF:
4556 raise
4557
Antoine Pitrou20db5112011-08-19 20:32:34 +02004558 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004559 self.check_interrupted_write_retry(b"x", mode="wb")
4560
Antoine Pitrou20db5112011-08-19 20:32:34 +02004561 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004562 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4563
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004564
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004565class CSignalsTest(SignalsTest):
4566 io = io
4567
4568class PySignalsTest(SignalsTest):
4569 io = pyio
4570
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004571 # Handling reentrancy issues would slow down _pyio even more, so the
4572 # tests are disabled.
4573 test_reentrant_write_buffered = None
4574 test_reentrant_write_text = None
4575
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004576
Ezio Melottidaa42c72013-03-23 16:30:16 +02004577def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004578 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004579 CBufferedReaderTest, PyBufferedReaderTest,
4580 CBufferedWriterTest, PyBufferedWriterTest,
4581 CBufferedRWPairTest, PyBufferedRWPairTest,
4582 CBufferedRandomTest, PyBufferedRandomTest,
4583 StatefulIncrementalDecoderTest,
4584 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4585 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004586 CMiscIOTest, PyMiscIOTest,
4587 CSignalsTest, PySignalsTest,
4588 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004589
4590 # Put the namespaces of the IO module we are testing and some useful mock
4591 # classes in the __dict__ of each test.
4592 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004593 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4594 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004595 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4596 c_io_ns = {name : getattr(io, name) for name in all_members}
4597 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4598 globs = globals()
4599 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4600 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4601 # Avoid turning open into a bound method.
4602 py_io_ns["open"] = pyio.OpenWrapper
4603 for test in tests:
4604 if test.__name__.startswith("C"):
4605 for name, obj in c_io_ns.items():
4606 setattr(test, name, obj)
4607 elif test.__name__.startswith("Py"):
4608 for name, obj in py_io_ns.items():
4609 setattr(test, name, obj)
4610
Ezio Melottidaa42c72013-03-23 16:30:16 +02004611 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4612 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004613
4614if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004615 unittest.main()