blob: 3768b625516f41fb13fdfd29f7045fe8979a4fa8 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Hai Shi883bc632020-07-06 17:12:49 +080043from test.support import import_helper
44from test.support import os_helper
Hai Shie80697d2020-05-28 06:10:27 +080045from test.support import threading_helper
Hai Shi883bc632020-07-06 17:12:49 +080046from test.support import warnings_helper
47from test.support.os_helper import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000048
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000049import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050import io # C implementation of io
51import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Martin Panter6bb91f32016-05-28 00:41:57 +000053try:
54 import ctypes
55except ImportError:
56 def byteslike(*pos, **kw):
57 return array.array("b", bytes(*pos, **kw))
58else:
59 def byteslike(*pos, **kw):
60 """Create a bytes-like object having no string or sequence methods"""
61 data = bytes(*pos, **kw)
62 obj = EmptyStruct()
63 ctypes.resize(obj, len(data))
64 memoryview(obj).cast("B")[:] = data
65 return obj
66 class EmptyStruct(ctypes.Structure):
67 pass
68
Gregory P. Smithe5796c42018-12-30 20:17:57 -080069_cflags = sysconfig.get_config_var('CFLAGS') or ''
70_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
71MEMORY_SANITIZER = (
72 '-fsanitize=memory' in _cflags or
73 '--with-memory-sanitizer' in _config_args
74)
75
Victor Stinnerbc2aa812019-05-23 03:45:09 +020076# Does io.IOBase finalizer log the exception if the close() method fails?
77# The exception is ignored silently by default in release build.
78IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020079
80
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081def _default_chunk_size():
82 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000083 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return f._CHUNK_SIZE
85
86
Antoine Pitrou328ec742010-09-14 18:37:24 +000087class MockRawIOWithoutRead:
88 """A RawIO implementation without read(), so as to exercise the default
89 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000090
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000091 def __init__(self, read_stack=()):
92 self._read_stack = list(read_stack)
93 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000096
Guido van Rossum01a27522007-03-07 01:00:12 +000097 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000099 return len(b)
100
101 def writable(self):
102 return True
103
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000104 def fileno(self):
105 return 42
106
107 def readable(self):
108 return True
109
Guido van Rossum01a27522007-03-07 01:00:12 +0000110 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111 return True
112
Guido van Rossum01a27522007-03-07 01:00:12 +0000113 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000115
116 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000117 return 0 # same comment as above
118
119 def readinto(self, buf):
120 self._reads += 1
121 max_len = len(buf)
122 try:
123 data = self._read_stack[0]
124 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000125 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126 return 0
127 if data is None:
128 del self._read_stack[0]
129 return None
130 n = len(data)
131 if len(data) <= max_len:
132 del self._read_stack[0]
133 buf[:n] = data
134 return n
135 else:
136 buf[:] = data[:max_len]
137 self._read_stack[0] = data[max_len:]
138 return max_len
139
140 def truncate(self, pos=None):
141 return pos
142
Antoine Pitrou328ec742010-09-14 18:37:24 +0000143class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
144 pass
145
146class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
147 pass
148
149
150class MockRawIO(MockRawIOWithoutRead):
151
152 def read(self, n=None):
153 self._reads += 1
154 try:
155 return self._read_stack.pop(0)
156 except:
157 self._extraneous_reads += 1
158 return b""
159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160class CMockRawIO(MockRawIO, io.RawIOBase):
161 pass
162
163class PyMockRawIO(MockRawIO, pyio.RawIOBase):
164 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000165
Guido van Rossuma9e20242007-03-08 00:43:48 +0000166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000167class MisbehavedRawIO(MockRawIO):
168 def write(self, b):
169 return super().write(b) * 2
170
171 def read(self, n=None):
172 return super().read(n) * 2
173
174 def seek(self, pos, whence):
175 return -123
176
177 def tell(self):
178 return -456
179
180 def readinto(self, buf):
181 super().readinto(buf)
182 return len(buf) * 5
183
184class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
185 pass
186
187class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
188 pass
189
190
benfogle9703f092017-11-10 16:03:40 -0500191class SlowFlushRawIO(MockRawIO):
192 def __init__(self):
193 super().__init__()
194 self.in_flush = threading.Event()
195
196 def flush(self):
197 self.in_flush.set()
198 time.sleep(0.25)
199
200class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
201 pass
202
203class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
204 pass
205
206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000207class CloseFailureIO(MockRawIO):
208 closed = 0
209
210 def close(self):
211 if not self.closed:
212 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200213 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214
215class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
216 pass
217
218class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
219 pass
220
221
222class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000223
224 def __init__(self, data):
225 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000226 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000227
228 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000229 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000230 self.read_history.append(None if res is None else len(res))
231 return res
232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000233 def readinto(self, b):
234 res = super().readinto(b)
235 self.read_history.append(res)
236 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238class CMockFileIO(MockFileIO, io.BytesIO):
239 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241class PyMockFileIO(MockFileIO, pyio.BytesIO):
242 pass
243
244
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000245class MockUnseekableIO:
246 def seekable(self):
247 return False
248
249 def seek(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
252 def tell(self, *args):
253 raise self.UnsupportedOperation("not seekable")
254
Martin Panter754aab22016-03-31 07:21:56 +0000255 def truncate(self, *args):
256 raise self.UnsupportedOperation("not seekable")
257
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000258class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
259 UnsupportedOperation = io.UnsupportedOperation
260
261class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
262 UnsupportedOperation = pyio.UnsupportedOperation
263
264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265class MockNonBlockWriterIO:
266
267 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000268 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000269 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000271 def pop_written(self):
272 s = b"".join(self._write_stack)
273 self._write_stack[:] = []
274 return s
275
276 def block_on(self, char):
277 """Block when a given char is encountered."""
278 self._blocker_char = char
279
280 def readable(self):
281 return True
282
283 def seekable(self):
284 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000285
Victor Stinnerb589cef2019-06-11 03:10:59 +0200286 def seek(self, pos, whence=0):
287 # naive implementation, enough for tests
288 return 0
289
Guido van Rossum01a27522007-03-07 01:00:12 +0000290 def writable(self):
291 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000293 def write(self, b):
294 b = bytes(b)
295 n = -1
296 if self._blocker_char:
297 try:
298 n = b.index(self._blocker_char)
299 except ValueError:
300 pass
301 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100302 if n > 0:
303 # write data up to the first blocker
304 self._write_stack.append(b[:n])
305 return n
306 else:
307 # cancel blocker and indicate would block
308 self._blocker_char = None
309 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000310 self._write_stack.append(b)
311 return len(b)
312
313class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
314 BlockingIOError = io.BlockingIOError
315
316class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
317 BlockingIOError = pyio.BlockingIOError
318
Guido van Rossuma9e20242007-03-08 00:43:48 +0000319
Guido van Rossum28524c72007-02-27 05:47:44 +0000320class IOTest(unittest.TestCase):
321
Neal Norwitze7789b12008-03-24 06:18:09 +0000322 def setUp(self):
Hai Shi883bc632020-07-06 17:12:49 +0800323 os_helper.unlink(os_helper.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000324
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000325 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +0800326 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 f.truncate(0)
331 self.assertEqual(f.tell(), 5)
332 f.seek(0)
333
334 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.seek(0), 0)
336 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000337 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000340 buffer = bytearray(b" world\n\n\n")
341 self.assertEqual(f.write(buffer), 9)
342 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000343 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000344 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000345 self.assertEqual(f.seek(-1, 2), 13)
346 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000347
Guido van Rossum87429772007-04-10 21:06:59 +0000348 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000349 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000350 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000351
Guido van Rossum9b76da62007-04-11 01:09:03 +0000352 def read_ops(self, f, buffered=False):
353 data = f.read(5)
354 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000355 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000357 self.assertEqual(bytes(data), b" worl")
358 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000359 self.assertEqual(f.readinto(data), 2)
360 self.assertEqual(len(data), 5)
361 self.assertEqual(data[:2], b"d\n")
362 self.assertEqual(f.seek(0), 0)
363 self.assertEqual(f.read(20), b"hello world\n")
364 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000365 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000366 self.assertEqual(f.seek(-6, 2), 6)
367 self.assertEqual(f.read(5), b"world")
368 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000369 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000370 self.assertEqual(f.seek(-6, 1), 5)
371 self.assertEqual(f.read(5), b" worl")
372 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000373 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000374 if buffered:
375 f.seek(0)
376 self.assertEqual(f.read(), b"hello world\n")
377 f.seek(6)
378 self.assertEqual(f.read(), b"world\n")
379 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000380 f.seek(0)
381 data = byteslike(5)
382 self.assertEqual(f.readinto1(data), 5)
383 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000384
Guido van Rossum34d69e52007-04-10 20:08:41 +0000385 LARGE = 2**31
386
Guido van Rossum53807da2007-04-10 19:01:47 +0000387 def large_file_ops(self, f):
388 assert f.readable()
389 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100390 try:
391 self.assertEqual(f.seek(self.LARGE), self.LARGE)
392 except (OverflowError, ValueError):
393 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000394 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000396 self.assertEqual(f.tell(), self.LARGE + 3)
397 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000398 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000399 self.assertEqual(f.tell(), self.LARGE + 2)
400 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000401 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000402 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000403 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
404 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000405 self.assertEqual(f.read(2), b"x")
406
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000407 def test_invalid_operations(self):
408 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000409 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000410 for mode in ("w", "wb"):
Hai Shi883bc632020-07-06 17:12:49 +0800411 with self.open(os_helper.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000412 self.assertRaises(exc, fp.read)
413 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800414 with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000415 self.assertRaises(exc, fp.read)
416 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800417 with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, b"blah")
419 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800420 with self.open(os_helper.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000421 self.assertRaises(exc, fp.write, b"blah")
422 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800423 with self.open(os_helper.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000424 self.assertRaises(exc, fp.write, "blah")
425 self.assertRaises(exc, fp.writelines, ["blah\n"])
426 # Non-zero seeking from current or end pos
427 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
428 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000429
Martin Panter754aab22016-03-31 07:21:56 +0000430 def test_optional_abilities(self):
431 # Test for OSError when optional APIs are not supported
432 # The purpose of this test is to try fileno(), reading, writing and
433 # seeking operations with various objects that indicate they do not
434 # support these operations.
435
436 def pipe_reader():
437 [r, w] = os.pipe()
438 os.close(w) # So that read() is harmless
439 return self.FileIO(r, "r")
440
441 def pipe_writer():
442 [r, w] = os.pipe()
443 self.addCleanup(os.close, r)
444 # Guarantee that we can write into the pipe without blocking
445 thread = threading.Thread(target=os.read, args=(r, 100))
446 thread.start()
447 self.addCleanup(thread.join)
448 return self.FileIO(w, "w")
449
450 def buffered_reader():
451 return self.BufferedReader(self.MockUnseekableIO())
452
453 def buffered_writer():
454 return self.BufferedWriter(self.MockUnseekableIO())
455
456 def buffered_random():
457 return self.BufferedRandom(self.BytesIO())
458
459 def buffered_rw_pair():
460 return self.BufferedRWPair(self.MockUnseekableIO(),
461 self.MockUnseekableIO())
462
463 def text_reader():
464 class UnseekableReader(self.MockUnseekableIO):
465 writable = self.BufferedIOBase.writable
466 write = self.BufferedIOBase.write
467 return self.TextIOWrapper(UnseekableReader(), "ascii")
468
469 def text_writer():
470 class UnseekableWriter(self.MockUnseekableIO):
471 readable = self.BufferedIOBase.readable
472 read = self.BufferedIOBase.read
473 return self.TextIOWrapper(UnseekableWriter(), "ascii")
474
475 tests = (
476 (pipe_reader, "fr"), (pipe_writer, "fw"),
477 (buffered_reader, "r"), (buffered_writer, "w"),
478 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
479 (text_reader, "r"), (text_writer, "w"),
480 (self.BytesIO, "rws"), (self.StringIO, "rws"),
481 )
482 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000483 with self.subTest(test), test() as obj:
484 readable = "r" in abilities
485 self.assertEqual(obj.readable(), readable)
486 writable = "w" in abilities
487 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000488
489 if isinstance(obj, self.TextIOBase):
490 data = "3"
491 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
492 data = b"3"
493 else:
494 self.fail("Unknown base class")
495
496 if "f" in abilities:
497 obj.fileno()
498 else:
499 self.assertRaises(OSError, obj.fileno)
500
501 if readable:
502 obj.read(1)
503 obj.read()
504 else:
505 self.assertRaises(OSError, obj.read, 1)
506 self.assertRaises(OSError, obj.read)
507
508 if writable:
509 obj.write(data)
510 else:
511 self.assertRaises(OSError, obj.write, data)
512
Martin Panter3ee147f2016-03-31 21:05:31 +0000513 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000514 pipe_reader, pipe_writer):
515 # Pipes seem to appear as seekable on Windows
516 continue
517 seekable = "s" in abilities
518 self.assertEqual(obj.seekable(), seekable)
519
Martin Panter754aab22016-03-31 07:21:56 +0000520 if seekable:
521 obj.tell()
522 obj.seek(0)
523 else:
524 self.assertRaises(OSError, obj.tell)
525 self.assertRaises(OSError, obj.seek, 0)
526
527 if writable and seekable:
528 obj.truncate()
529 obj.truncate(0)
530 else:
531 self.assertRaises(OSError, obj.truncate)
532 self.assertRaises(OSError, obj.truncate, 0)
533
Antoine Pitrou13348842012-01-29 18:36:34 +0100534 def test_open_handles_NUL_chars(self):
535 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300536 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100537
538 bytes_fn = bytes(fn_with_NUL, 'ascii')
539 with warnings.catch_warnings():
540 warnings.simplefilter("ignore", DeprecationWarning)
541 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100542
Guido van Rossum28524c72007-02-27 05:47:44 +0000543 def test_raw_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800544 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000545 self.assertEqual(f.readable(), False)
546 self.assertEqual(f.writable(), True)
547 self.assertEqual(f.seekable(), True)
548 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800549 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000550 self.assertEqual(f.readable(), True)
551 self.assertEqual(f.writable(), False)
552 self.assertEqual(f.seekable(), True)
553 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000554
Guido van Rossum87429772007-04-10 21:06:59 +0000555 def test_buffered_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800556 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000557 self.assertEqual(f.readable(), False)
558 self.assertEqual(f.writable(), True)
559 self.assertEqual(f.seekable(), True)
560 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800561 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000562 self.assertEqual(f.readable(), True)
563 self.assertEqual(f.writable(), False)
564 self.assertEqual(f.seekable(), True)
565 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000566
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000567 def test_readline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800568 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000569 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Hai Shi883bc632020-07-06 17:12:49 +0800570 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000571 self.assertEqual(f.readline(), b"abc\n")
572 self.assertEqual(f.readline(10), b"def\n")
573 self.assertEqual(f.readline(2), b"xy")
574 self.assertEqual(f.readline(4), b"zzy\n")
575 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000576 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000577 self.assertRaises(TypeError, f.readline, 5.3)
Hai Shi883bc632020-07-06 17:12:49 +0800578 with self.open(os_helper.TESTFN, "r") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000579 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000580
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300581 def test_readline_nonsizeable(self):
582 # Issue #30061
583 # Crash when readline() returns an object without __len__
584 class R(self.IOBase):
585 def readline(self):
586 return None
587 self.assertRaises((TypeError, StopIteration), next, R())
588
589 def test_next_nonsizeable(self):
590 # Issue #30061
591 # Crash when __next__() returns an object without __len__
592 class R(self.IOBase):
593 def __next__(self):
594 return None
595 self.assertRaises(TypeError, R().readlines, 1)
596
Guido van Rossum28524c72007-02-27 05:47:44 +0000597 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000599 self.write_ops(f)
600 data = f.getvalue()
601 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000603 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000604
Guido van Rossum53807da2007-04-10 19:01:47 +0000605 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300606 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800607 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000608 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200609 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600610 support.requires(
611 'largefile',
612 'test requires %s bytes and a long time to run' % self.LARGE)
Hai Shi883bc632020-07-06 17:12:49 +0800613 with self.open(os_helper.TESTFN, "w+b", 0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 self.large_file_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800615 with self.open(os_helper.TESTFN, "w+b") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000617
618 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300619 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000620 f = None
Hai Shi883bc632020-07-06 17:12:49 +0800621 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000622 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000623 self.assertEqual(f.closed, True)
624 f = None
625 try:
Hai Shi883bc632020-07-06 17:12:49 +0800626 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000627 1/0
628 except ZeroDivisionError:
629 self.assertEqual(f.closed, True)
630 else:
631 self.fail("1/0 didn't raise an exception")
632
Antoine Pitrou08838b62009-01-21 00:55:13 +0000633 # issue 5008
634 def test_append_mode_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +0800635 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000636 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800637 with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000638 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800639 with self.open(os_helper.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000640 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800641 with self.open(os_helper.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300642 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000643
Guido van Rossum87429772007-04-10 21:06:59 +0000644 def test_destructor(self):
645 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000647 def __del__(self):
648 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 try:
650 f = super().__del__
651 except AttributeError:
652 pass
653 else:
654 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000655 def close(self):
656 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000658 def flush(self):
659 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000660 super().flush()
Hai Shi883bc632020-07-06 17:12:49 +0800661 with warnings_helper.check_warnings(('', ResourceWarning)):
662 f = MyFileIO(os_helper.TESTFN, "wb")
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000663 f.write(b"xxx")
664 del f
665 support.gc_collect()
666 self.assertEqual(record, [1, 2, 3])
Hai Shi883bc632020-07-06 17:12:49 +0800667 with self.open(os_helper.TESTFN, "rb") as f:
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000668 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000669
670 def _check_base_destructor(self, base):
671 record = []
672 class MyIO(base):
673 def __init__(self):
674 # This exercises the availability of attributes on object
675 # destruction.
676 # (in the C version, close() is called by the tp_dealloc
677 # function, not by __del__)
678 self.on_del = 1
679 self.on_close = 2
680 self.on_flush = 3
681 def __del__(self):
682 record.append(self.on_del)
683 try:
684 f = super().__del__
685 except AttributeError:
686 pass
687 else:
688 f()
689 def close(self):
690 record.append(self.on_close)
691 super().close()
692 def flush(self):
693 record.append(self.on_flush)
694 super().flush()
695 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000696 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000697 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000698 self.assertEqual(record, [1, 2, 3])
699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_IOBase_destructor(self):
701 self._check_base_destructor(self.IOBase)
702
703 def test_RawIOBase_destructor(self):
704 self._check_base_destructor(self.RawIOBase)
705
706 def test_BufferedIOBase_destructor(self):
707 self._check_base_destructor(self.BufferedIOBase)
708
709 def test_TextIOBase_destructor(self):
710 self._check_base_destructor(self.TextIOBase)
711
Guido van Rossum87429772007-04-10 21:06:59 +0000712 def test_close_flushes(self):
Hai Shi883bc632020-07-06 17:12:49 +0800713 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000714 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800715 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000716 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000717
Guido van Rossumd4103952007-04-12 05:44:49 +0000718 def test_array_writes(self):
719 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000720 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000721 def check(f):
722 with f:
723 self.assertEqual(f.write(a), n)
724 f.writelines((a,))
725 check(self.BytesIO())
Hai Shi883bc632020-07-06 17:12:49 +0800726 check(self.FileIO(os_helper.TESTFN, "w"))
Martin Panter6bb91f32016-05-28 00:41:57 +0000727 check(self.BufferedWriter(self.MockRawIO()))
728 check(self.BufferedRandom(self.MockRawIO()))
729 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000730
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000731 def test_closefd(self):
Hai Shi883bc632020-07-06 17:12:49 +0800732 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000733 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 def test_read_closed(self):
Hai Shi883bc632020-07-06 17:12:49 +0800736 with self.open(os_helper.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000737 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800738 with self.open(os_helper.TESTFN, "r") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000739 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000740 self.assertEqual(file.read(), "egg\n")
741 file.seek(0)
742 file.close()
743 self.assertRaises(ValueError, file.read)
Hai Shi883bc632020-07-06 17:12:49 +0800744 with self.open(os_helper.TESTFN, "rb") as f:
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100745 file = self.open(f.fileno(), "rb", closefd=False)
746 self.assertEqual(file.read()[:3], b"egg")
747 file.close()
748 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000749
750 def test_no_closefd_with_filename(self):
751 # can't use closefd in combination with a file name
Hai Shi883bc632020-07-06 17:12:49 +0800752 self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000753
754 def test_closefd_attr(self):
Hai Shi883bc632020-07-06 17:12:49 +0800755 with self.open(os_helper.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000756 f.write(b"egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800757 with self.open(os_helper.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000758 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000760 self.assertEqual(file.buffer.raw.closefd, False)
761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000762 def test_garbage_collection(self):
763 # FileIO objects are collected, and collecting them flushes
764 # all data to disk.
Hai Shi883bc632020-07-06 17:12:49 +0800765 with warnings_helper.check_warnings(('', ResourceWarning)):
766 f = self.FileIO(os_helper.TESTFN, "wb")
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000767 f.write(b"abcxxx")
768 f.f = f
769 wr = weakref.ref(f)
770 del f
771 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300772 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +0800773 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000774 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000775
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000776 def test_unbounded_file(self):
777 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
778 zero = "/dev/zero"
779 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000780 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000781 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000782 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000783 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800784 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000785 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000786 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000787 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000788 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000789 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000790 self.assertRaises(OverflowError, f.read)
791
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200792 def check_flush_error_on_close(self, *args, **kwargs):
793 # Test that the file is closed despite failed flush
794 # and that flush() is called before file closed.
795 f = self.open(*args, **kwargs)
796 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000797 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200798 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200799 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000800 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200801 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600802 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200803 self.assertTrue(closed) # flush() called
804 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200805 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200806
807 def test_flush_error_on_close(self):
808 # raw file
809 # Issue #5700: io.FileIO calls flush() after file closed
Hai Shi883bc632020-07-06 17:12:49 +0800810 self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
811 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200812 self.check_flush_error_on_close(fd, 'wb', buffering=0)
Hai Shi883bc632020-07-06 17:12:49 +0800813 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200814 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
815 os.close(fd)
816 # buffered io
Hai Shi883bc632020-07-06 17:12:49 +0800817 self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
818 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200819 self.check_flush_error_on_close(fd, 'wb')
Hai Shi883bc632020-07-06 17:12:49 +0800820 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200821 self.check_flush_error_on_close(fd, 'wb', closefd=False)
822 os.close(fd)
823 # text io
Hai Shi883bc632020-07-06 17:12:49 +0800824 self.check_flush_error_on_close(os_helper.TESTFN, 'w')
825 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200826 self.check_flush_error_on_close(fd, 'w')
Hai Shi883bc632020-07-06 17:12:49 +0800827 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200828 self.check_flush_error_on_close(fd, 'w', closefd=False)
829 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000830
831 def test_multi_close(self):
Hai Shi883bc632020-07-06 17:12:49 +0800832 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000833 f.close()
834 f.close()
835 f.close()
836 self.assertRaises(ValueError, f.flush)
837
Antoine Pitrou328ec742010-09-14 18:37:24 +0000838 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530839 # Exercise the default limited RawIOBase.read(n) implementation (which
840 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000841 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
842 self.assertEqual(rawio.read(2), b"ab")
843 self.assertEqual(rawio.read(2), b"c")
844 self.assertEqual(rawio.read(2), b"d")
845 self.assertEqual(rawio.read(2), None)
846 self.assertEqual(rawio.read(2), b"ef")
847 self.assertEqual(rawio.read(2), b"g")
848 self.assertEqual(rawio.read(2), None)
849 self.assertEqual(rawio.read(2), b"")
850
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400851 def test_types_have_dict(self):
852 test = (
853 self.IOBase(),
854 self.RawIOBase(),
855 self.TextIOBase(),
856 self.StringIO(),
857 self.BytesIO()
858 )
859 for obj in test:
860 self.assertTrue(hasattr(obj, "__dict__"))
861
Ross Lagerwall59142db2011-10-31 20:34:46 +0200862 def test_opener(self):
Hai Shi883bc632020-07-06 17:12:49 +0800863 with self.open(os_helper.TESTFN, "w") as f:
Ross Lagerwall59142db2011-10-31 20:34:46 +0200864 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800865 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Ross Lagerwall59142db2011-10-31 20:34:46 +0200866 def opener(path, flags):
867 return fd
868 with self.open("non-existent", "r", opener=opener) as f:
869 self.assertEqual(f.read(), "egg\n")
870
Barry Warsaw480e2852016-06-08 17:47:26 -0400871 def test_bad_opener_negative_1(self):
872 # Issue #27066.
873 def badopener(fname, flags):
874 return -1
875 with self.assertRaises(ValueError) as cm:
876 open('non-existent', 'r', opener=badopener)
877 self.assertEqual(str(cm.exception), 'opener returned -1')
878
879 def test_bad_opener_other_negative(self):
880 # Issue #27066.
881 def badopener(fname, flags):
882 return -2
883 with self.assertRaises(ValueError) as cm:
884 open('non-existent', 'r', opener=badopener)
885 self.assertEqual(str(cm.exception), 'opener returned -2')
886
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200887 def test_fileio_closefd(self):
888 # Issue #4841
889 with self.open(__file__, 'rb') as f1, \
890 self.open(__file__, 'rb') as f2:
891 fileio = self.FileIO(f1.fileno(), closefd=False)
892 # .__init__() must not close f1
893 fileio.__init__(f2.fileno(), closefd=False)
894 f1.readline()
895 # .close() must not close f2
896 fileio.close()
897 f2.readline()
898
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300899 def test_nonbuffered_textio(self):
Hai Shi883bc632020-07-06 17:12:49 +0800900 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300901 with self.assertRaises(ValueError):
Hai Shi883bc632020-07-06 17:12:49 +0800902 self.open(os_helper.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300903
904 def test_invalid_newline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800905 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300906 with self.assertRaises(ValueError):
Hai Shi883bc632020-07-06 17:12:49 +0800907 self.open(os_helper.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300908
Martin Panter6bb91f32016-05-28 00:41:57 +0000909 def test_buffered_readinto_mixin(self):
910 # Test the implementation provided by BufferedIOBase
911 class Stream(self.BufferedIOBase):
912 def read(self, size):
913 return b"12345"
914 read1 = read
915 stream = Stream()
916 for method in ("readinto", "readinto1"):
917 with self.subTest(method):
918 buffer = byteslike(5)
919 self.assertEqual(getattr(stream, method)(buffer), 5)
920 self.assertEqual(bytes(buffer), b"12345")
921
Ethan Furmand62548a2016-06-04 14:38:43 -0700922 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700923 def check_path_succeeds(path):
924 with self.open(path, "w") as f:
925 f.write("egg\n")
926
927 with self.open(path, "r") as f:
928 self.assertEqual(f.read(), "egg\n")
929
Hai Shi883bc632020-07-06 17:12:49 +0800930 check_path_succeeds(FakePath(os_helper.TESTFN))
Serhiy Storchaka67987ac2020-07-27 20:58:35 +0300931 check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
Ethan Furmand62548a2016-06-04 14:38:43 -0700932
Hai Shi883bc632020-07-06 17:12:49 +0800933 with self.open(os_helper.TESTFN, "w") as f:
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200934 bad_path = FakePath(f.fileno())
935 with self.assertRaises(TypeError):
936 self.open(bad_path, 'w')
937
938 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700939 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700940 self.open(bad_path, 'w')
941
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200942 bad_path = FakePath(FloatingPointError)
943 with self.assertRaises(FloatingPointError):
944 self.open(bad_path, 'w')
945
Ethan Furmand62548a2016-06-04 14:38:43 -0700946 # ensure that refcounting is correct with some error conditions
947 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Hai Shi883bc632020-07-06 17:12:49 +0800948 self.open(FakePath(os_helper.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700949
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530950 def test_RawIOBase_readall(self):
951 # Exercise the default unlimited RawIOBase.read() and readall()
952 # implementations.
953 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
954 self.assertEqual(rawio.read(), b"abcdefg")
955 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
956 self.assertEqual(rawio.readall(), b"abcdefg")
957
958 def test_BufferedIOBase_readinto(self):
959 # Exercise the default BufferedIOBase.readinto() and readinto1()
960 # implementations (which call read() or read1() internally).
961 class Reader(self.BufferedIOBase):
962 def __init__(self, avail):
963 self.avail = avail
964 def read(self, size):
965 result = self.avail[:size]
966 self.avail = self.avail[size:]
967 return result
968 def read1(self, size):
969 """Returns no more than 5 bytes at once"""
970 return self.read(min(size, 5))
971 tests = (
972 # (test method, total data available, read buffer size, expected
973 # read size)
974 ("readinto", 10, 5, 5),
975 ("readinto", 10, 6, 6), # More than read1() can return
976 ("readinto", 5, 6, 5), # Buffer larger than total available
977 ("readinto", 6, 7, 6),
978 ("readinto", 10, 0, 0), # Empty buffer
979 ("readinto1", 10, 5, 5), # Result limited to single read1() call
980 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
981 ("readinto1", 5, 6, 5), # Buffer larger than total available
982 ("readinto1", 6, 7, 5),
983 ("readinto1", 10, 0, 0), # Empty buffer
984 )
985 UNUSED_BYTE = 0x81
986 for test in tests:
987 with self.subTest(test):
988 method, avail, request, result = test
989 reader = Reader(bytes(range(avail)))
990 buffer = bytearray((UNUSED_BYTE,) * request)
991 method = getattr(reader, method)
992 self.assertEqual(method(buffer), result)
993 self.assertEqual(len(buffer), request)
994 self.assertSequenceEqual(buffer[:result], range(result))
995 unused = (UNUSED_BYTE,) * (request - result)
996 self.assertSequenceEqual(buffer[result:], unused)
997 self.assertEqual(len(reader.avail), avail - result)
998
Zackery Spytz28f07362018-07-17 00:31:44 -0600999 def test_close_assert(self):
1000 class R(self.IOBase):
1001 def __setattr__(self, name, value):
1002 pass
1003 def flush(self):
1004 raise OSError()
1005 f = R()
1006 # This would cause an assertion failure.
1007 self.assertRaises(OSError, f.close)
1008
Victor Stinner472f7942019-04-12 21:58:24 +02001009 # Silence destructor error
1010 R.flush = lambda self: None
1011
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001012
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001014
1015 def test_IOBase_finalize(self):
1016 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1017 # class which inherits IOBase and an object of this class are caught
1018 # in a reference cycle and close() is already in the method cache.
1019 class MyIO(self.IOBase):
1020 def close(self):
1021 pass
1022
1023 # create an instance to populate the method cache
1024 MyIO()
1025 obj = MyIO()
1026 obj.obj = obj
1027 wr = weakref.ref(obj)
1028 del MyIO
1029 del obj
1030 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001031 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001032
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001033class PyIOTest(IOTest):
1034 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001035
Guido van Rossuma9e20242007-03-08 00:43:48 +00001036
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001037@support.cpython_only
1038class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001039
Gregory P. Smith054b0652015-04-14 12:58:05 -07001040 def test_RawIOBase_io_in_pyio_match(self):
1041 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001042 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1043 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001044 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1045
1046 def test_RawIOBase_pyio_in_io_match(self):
1047 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1048 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1049 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1050
1051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052class CommonBufferedTests:
1053 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1054
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001055 def test_detach(self):
1056 raw = self.MockRawIO()
1057 buf = self.tp(raw)
1058 self.assertIs(buf.detach(), raw)
1059 self.assertRaises(ValueError, buf.detach)
1060
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001061 repr(buf) # Should still work
1062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 def test_fileno(self):
1064 rawio = self.MockRawIO()
1065 bufio = self.tp(rawio)
1066
Ezio Melottib3aedd42010-11-20 19:04:17 +00001067 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001069 def test_invalid_args(self):
1070 rawio = self.MockRawIO()
1071 bufio = self.tp(rawio)
1072 # Invalid whence
1073 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001074 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075
1076 def test_override_destructor(self):
1077 tp = self.tp
1078 record = []
1079 class MyBufferedIO(tp):
1080 def __del__(self):
1081 record.append(1)
1082 try:
1083 f = super().__del__
1084 except AttributeError:
1085 pass
1086 else:
1087 f()
1088 def close(self):
1089 record.append(2)
1090 super().close()
1091 def flush(self):
1092 record.append(3)
1093 super().flush()
1094 rawio = self.MockRawIO()
1095 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001097 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001098 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099
1100 def test_context_manager(self):
1101 # Test usability as a context manager
1102 rawio = self.MockRawIO()
1103 bufio = self.tp(rawio)
1104 def _with():
1105 with bufio:
1106 pass
1107 _with()
1108 # bufio should now be closed, and using it a second time should raise
1109 # a ValueError.
1110 self.assertRaises(ValueError, _with)
1111
1112 def test_error_through_destructor(self):
1113 # Test that the exception state is not modified by a destructor,
1114 # even if close() fails.
1115 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001116 with support.catch_unraisable_exception() as cm:
1117 with self.assertRaises(AttributeError):
1118 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001119
1120 if not IOBASE_EMITS_UNRAISABLE:
1121 self.assertIsNone(cm.unraisable)
1122 elif cm.unraisable is not None:
1123 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001124
Antoine Pitrou716c4442009-05-23 19:04:03 +00001125 def test_repr(self):
1126 raw = self.MockRawIO()
1127 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001128 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1129 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001130 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001131 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001132 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001133 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001134
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001135 def test_recursive_repr(self):
1136 # Issue #25455
1137 raw = self.MockRawIO()
1138 b = self.tp(raw)
1139 with support.swap_attr(raw, 'name', b):
1140 try:
1141 repr(b) # Should not crash
1142 except RuntimeError:
1143 pass
1144
Antoine Pitrou6be88762010-05-03 16:48:20 +00001145 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001146 # Test that buffered file is closed despite failed flush
1147 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001148 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001149 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001150 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001151 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001152 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001153 raw.flush = bad_flush
1154 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001155 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001156 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001157 self.assertTrue(raw.closed)
1158 self.assertTrue(closed) # flush() called
1159 self.assertFalse(closed[0]) # flush() called before file closed
1160 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001161 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001162
1163 def test_close_error_on_close(self):
1164 raw = self.MockRawIO()
1165 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001166 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001167 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001168 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001169 raw.close = bad_close
1170 b = self.tp(raw)
1171 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001172 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001173 b.close()
1174 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001175 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001176 self.assertEqual(err.exception.__context__.args, ('flush',))
1177 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001178
Victor Stinner472f7942019-04-12 21:58:24 +02001179 # Silence destructor error
1180 raw.close = lambda: None
1181 b.flush = lambda: None
1182
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001183 def test_nonnormalized_close_error_on_close(self):
1184 # Issue #21677
1185 raw = self.MockRawIO()
1186 def bad_flush():
1187 raise non_existing_flush
1188 def bad_close():
1189 raise non_existing_close
1190 raw.close = bad_close
1191 b = self.tp(raw)
1192 b.flush = bad_flush
1193 with self.assertRaises(NameError) as err: # exception not swallowed
1194 b.close()
1195 self.assertIn('non_existing_close', str(err.exception))
1196 self.assertIsInstance(err.exception.__context__, NameError)
1197 self.assertIn('non_existing_flush', str(err.exception.__context__))
1198 self.assertFalse(b.closed)
1199
Victor Stinner472f7942019-04-12 21:58:24 +02001200 # Silence destructor error
1201 b.flush = lambda: None
1202 raw.close = lambda: None
1203
Antoine Pitrou6be88762010-05-03 16:48:20 +00001204 def test_multi_close(self):
1205 raw = self.MockRawIO()
1206 b = self.tp(raw)
1207 b.close()
1208 b.close()
1209 b.close()
1210 self.assertRaises(ValueError, b.flush)
1211
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001212 def test_unseekable(self):
1213 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1214 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1215 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1216
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001217 def test_readonly_attributes(self):
1218 raw = self.MockRawIO()
1219 buf = self.tp(raw)
1220 x = self.MockRawIO()
1221 with self.assertRaises(AttributeError):
1222 buf.raw = x
1223
Guido van Rossum78892e42007-04-06 17:31:18 +00001224
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001225class SizeofTest:
1226
1227 @support.cpython_only
1228 def test_sizeof(self):
1229 bufsize1 = 4096
1230 bufsize2 = 8192
1231 rawio = self.MockRawIO()
1232 bufio = self.tp(rawio, buffer_size=bufsize1)
1233 size = sys.getsizeof(bufio) - bufsize1
1234 rawio = self.MockRawIO()
1235 bufio = self.tp(rawio, buffer_size=bufsize2)
1236 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1237
Jesus Ceadc469452012-10-04 12:37:56 +02001238 @support.cpython_only
1239 def test_buffer_freeing(self) :
1240 bufsize = 4096
1241 rawio = self.MockRawIO()
1242 bufio = self.tp(rawio, buffer_size=bufsize)
1243 size = sys.getsizeof(bufio) - bufsize
1244 bufio.close()
1245 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1248 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 def test_constructor(self):
1251 rawio = self.MockRawIO([b"abc"])
1252 bufio = self.tp(rawio)
1253 bufio.__init__(rawio)
1254 bufio.__init__(rawio, buffer_size=1024)
1255 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001256 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1258 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1259 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1260 rawio = self.MockRawIO([b"abc"])
1261 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001262 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001263
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001264 def test_uninitialized(self):
1265 bufio = self.tp.__new__(self.tp)
1266 del bufio
1267 bufio = self.tp.__new__(self.tp)
1268 self.assertRaisesRegex((ValueError, AttributeError),
1269 'uninitialized|has no attribute',
1270 bufio.read, 0)
1271 bufio.__init__(self.MockRawIO())
1272 self.assertEqual(bufio.read(0), b'')
1273
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001275 for arg in (None, 7):
1276 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1277 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001278 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 # Invalid args
1280 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001281
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001282 def test_read1(self):
1283 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1284 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001285 self.assertEqual(b"a", bufio.read(1))
1286 self.assertEqual(b"b", bufio.read1(1))
1287 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001288 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001289 self.assertEqual(b"c", bufio.read1(100))
1290 self.assertEqual(rawio._reads, 1)
1291 self.assertEqual(b"d", bufio.read1(100))
1292 self.assertEqual(rawio._reads, 2)
1293 self.assertEqual(b"efg", bufio.read1(100))
1294 self.assertEqual(rawio._reads, 3)
1295 self.assertEqual(b"", bufio.read1(100))
1296 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001297
1298 def test_read1_arbitrary(self):
1299 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1300 bufio = self.tp(rawio)
1301 self.assertEqual(b"a", bufio.read(1))
1302 self.assertEqual(b"bc", bufio.read1())
1303 self.assertEqual(b"d", bufio.read1())
1304 self.assertEqual(b"efg", bufio.read1(-1))
1305 self.assertEqual(rawio._reads, 3)
1306 self.assertEqual(b"", bufio.read1())
1307 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308
1309 def test_readinto(self):
1310 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1311 bufio = self.tp(rawio)
1312 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001313 self.assertEqual(bufio.readinto(b), 2)
1314 self.assertEqual(b, b"ab")
1315 self.assertEqual(bufio.readinto(b), 2)
1316 self.assertEqual(b, b"cd")
1317 self.assertEqual(bufio.readinto(b), 2)
1318 self.assertEqual(b, b"ef")
1319 self.assertEqual(bufio.readinto(b), 1)
1320 self.assertEqual(b, b"gf")
1321 self.assertEqual(bufio.readinto(b), 0)
1322 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001323 rawio = self.MockRawIO((b"abc", None))
1324 bufio = self.tp(rawio)
1325 self.assertEqual(bufio.readinto(b), 2)
1326 self.assertEqual(b, b"ab")
1327 self.assertEqual(bufio.readinto(b), 1)
1328 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329
Benjamin Petersona96fea02014-06-22 14:17:44 -07001330 def test_readinto1(self):
1331 buffer_size = 10
1332 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1333 bufio = self.tp(rawio, buffer_size=buffer_size)
1334 b = bytearray(2)
1335 self.assertEqual(bufio.peek(3), b'abc')
1336 self.assertEqual(rawio._reads, 1)
1337 self.assertEqual(bufio.readinto1(b), 2)
1338 self.assertEqual(b, b"ab")
1339 self.assertEqual(rawio._reads, 1)
1340 self.assertEqual(bufio.readinto1(b), 1)
1341 self.assertEqual(b[:1], b"c")
1342 self.assertEqual(rawio._reads, 1)
1343 self.assertEqual(bufio.readinto1(b), 2)
1344 self.assertEqual(b, b"de")
1345 self.assertEqual(rawio._reads, 2)
1346 b = bytearray(2*buffer_size)
1347 self.assertEqual(bufio.peek(3), b'fgh')
1348 self.assertEqual(rawio._reads, 3)
1349 self.assertEqual(bufio.readinto1(b), 6)
1350 self.assertEqual(b[:6], b"fghjkl")
1351 self.assertEqual(rawio._reads, 4)
1352
1353 def test_readinto_array(self):
1354 buffer_size = 60
1355 data = b"a" * 26
1356 rawio = self.MockRawIO((data,))
1357 bufio = self.tp(rawio, buffer_size=buffer_size)
1358
1359 # Create an array with element size > 1 byte
1360 b = array.array('i', b'x' * 32)
1361 assert len(b) != 16
1362
1363 # Read into it. We should get as many *bytes* as we can fit into b
1364 # (which is more than the number of elements)
1365 n = bufio.readinto(b)
1366 self.assertGreater(n, len(b))
1367
1368 # Check that old contents of b are preserved
1369 bm = memoryview(b).cast('B')
1370 self.assertLess(n, len(bm))
1371 self.assertEqual(bm[:n], data[:n])
1372 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1373
1374 def test_readinto1_array(self):
1375 buffer_size = 60
1376 data = b"a" * 26
1377 rawio = self.MockRawIO((data,))
1378 bufio = self.tp(rawio, buffer_size=buffer_size)
1379
1380 # Create an array with element size > 1 byte
1381 b = array.array('i', b'x' * 32)
1382 assert len(b) != 16
1383
1384 # Read into it. We should get as many *bytes* as we can fit into b
1385 # (which is more than the number of elements)
1386 n = bufio.readinto1(b)
1387 self.assertGreater(n, len(b))
1388
1389 # Check that old contents of b are preserved
1390 bm = memoryview(b).cast('B')
1391 self.assertLess(n, len(bm))
1392 self.assertEqual(bm[:n], data[:n])
1393 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1394
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001395 def test_readlines(self):
1396 def bufio():
1397 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1398 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001399 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1400 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1401 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001404 data = b"abcdefghi"
1405 dlen = len(data)
1406
1407 tests = [
1408 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1409 [ 100, [ 3, 3, 3], [ dlen ] ],
1410 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1411 ]
1412
1413 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 rawio = self.MockFileIO(data)
1415 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001416 pos = 0
1417 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001418 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001419 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001420 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001424 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1426 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001427 self.assertEqual(b"abcd", bufio.read(6))
1428 self.assertEqual(b"e", bufio.read(1))
1429 self.assertEqual(b"fg", bufio.read())
1430 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001431 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001432 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001433
Victor Stinnera80987f2011-05-25 22:47:16 +02001434 rawio = self.MockRawIO((b"a", None, None))
1435 self.assertEqual(b"a", rawio.readall())
1436 self.assertIsNone(rawio.readall())
1437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001438 def test_read_past_eof(self):
1439 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1440 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001441
Ezio Melottib3aedd42010-11-20 19:04:17 +00001442 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001443
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001444 def test_read_all(self):
1445 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1446 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001447
Ezio Melottib3aedd42010-11-20 19:04:17 +00001448 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001449
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001450 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001452 try:
1453 # Write out many bytes with exactly the same number of 0's,
1454 # 1's... 255's. This will help us check that concurrent reading
1455 # doesn't duplicate or forget contents.
1456 N = 1000
1457 l = list(range(256)) * N
1458 random.shuffle(l)
1459 s = bytes(bytearray(l))
Hai Shi883bc632020-07-06 17:12:49 +08001460 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001461 f.write(s)
Hai Shi883bc632020-07-06 17:12:49 +08001462 with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001464 errors = []
1465 results = []
1466 def f():
1467 try:
1468 # Intra-buffer read then buffer-flushing read
1469 for n in cycle([1, 19]):
1470 s = bufio.read(n)
1471 if not s:
1472 break
1473 # list.append() is atomic
1474 results.append(s)
1475 except Exception as e:
1476 errors.append(e)
1477 raise
1478 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001479 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001480 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001481 self.assertFalse(errors,
1482 "the following exceptions were caught: %r" % errors)
1483 s = b''.join(results)
1484 for i in range(256):
1485 c = bytes(bytearray([i]))
1486 self.assertEqual(s.count(c), N)
1487 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001488 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001489
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001490 def test_unseekable(self):
1491 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1492 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1493 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1494 bufio.read(1)
1495 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1496 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001498 def test_misbehaved_io(self):
1499 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1500 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001501 self.assertRaises(OSError, bufio.seek, 0)
1502 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001503
Victor Stinnerb589cef2019-06-11 03:10:59 +02001504 # Silence destructor error
1505 bufio.close = lambda: None
1506
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001507 def test_no_extraneous_read(self):
1508 # Issue #9550; when the raw IO object has satisfied the read request,
1509 # we should not issue any additional reads, otherwise it may block
1510 # (e.g. socket).
1511 bufsize = 16
1512 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1513 rawio = self.MockRawIO([b"x" * n])
1514 bufio = self.tp(rawio, bufsize)
1515 self.assertEqual(bufio.read(n), b"x" * n)
1516 # Simple case: one raw read is enough to satisfy the request.
1517 self.assertEqual(rawio._extraneous_reads, 0,
1518 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1519 # A more complex case where two raw reads are needed to satisfy
1520 # the request.
1521 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1522 bufio = self.tp(rawio, bufsize)
1523 self.assertEqual(bufio.read(n), b"x" * n)
1524 self.assertEqual(rawio._extraneous_reads, 0,
1525 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1526
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001527 def test_read_on_closed(self):
1528 # Issue #23796
1529 b = io.BufferedReader(io.BytesIO(b"12"))
1530 b.read(1)
1531 b.close()
1532 self.assertRaises(ValueError, b.peek)
1533 self.assertRaises(ValueError, b.read1, 1)
1534
Berker Peksagfd5116c2020-02-21 20:57:26 +03001535 def test_truncate_on_read_only(self):
1536 rawio = self.MockFileIO(b"abc")
1537 bufio = self.tp(rawio)
1538 self.assertFalse(bufio.writable())
1539 self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1540 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1541
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001542
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001543class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 tp = io.BufferedReader
1545
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001546 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1547 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548 def test_constructor(self):
1549 BufferedReaderTest.test_constructor(self)
1550 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001551 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001552 if sys.maxsize > 0x7FFFFFFF:
1553 rawio = self.MockRawIO()
1554 bufio = self.tp(rawio)
1555 self.assertRaises((OverflowError, MemoryError, ValueError),
1556 bufio.__init__, rawio, sys.maxsize)
1557
1558 def test_initialization(self):
1559 rawio = self.MockRawIO([b"abc"])
1560 bufio = self.tp(rawio)
1561 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1562 self.assertRaises(ValueError, bufio.read)
1563 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1564 self.assertRaises(ValueError, bufio.read)
1565 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1566 self.assertRaises(ValueError, bufio.read)
1567
1568 def test_misbehaved_io_read(self):
1569 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1570 bufio = self.tp(rawio)
1571 # _pyio.BufferedReader seems to implement reading different, so that
1572 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001573 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001574
1575 def test_garbage_collection(self):
1576 # C BufferedReader objects are collected.
1577 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001578 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1579 with warnings_helper.check_warnings(('', ResourceWarning)):
1580 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001581 f = self.tp(rawio)
1582 f.f = f
1583 wr = weakref.ref(f)
1584 del f
1585 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001586 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001587
R David Murray67bfe802013-02-23 21:51:05 -05001588 def test_args_error(self):
1589 # Issue #17275
1590 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1591 self.tp(io.BytesIO(), 1024, 1024, 1024)
1592
David Szotten86663562020-06-16 00:53:57 +01001593 def test_bad_readinto_value(self):
1594 rawio = io.BufferedReader(io.BytesIO(b"12"))
1595 rawio.readinto = lambda buf: -1
1596 bufio = self.tp(rawio)
1597 with self.assertRaises(OSError) as cm:
1598 bufio.readline()
1599 self.assertIsNone(cm.exception.__cause__)
1600
1601 def test_bad_readinto_type(self):
1602 rawio = io.BufferedReader(io.BytesIO(b"12"))
1603 rawio.readinto = lambda buf: b''
1604 bufio = self.tp(rawio)
1605 with self.assertRaises(OSError) as cm:
1606 bufio.readline()
1607 self.assertIsInstance(cm.exception.__cause__, TypeError)
1608
R David Murray67bfe802013-02-23 21:51:05 -05001609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610class PyBufferedReaderTest(BufferedReaderTest):
1611 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001612
Guido van Rossuma9e20242007-03-08 00:43:48 +00001613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001614class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1615 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617 def test_constructor(self):
1618 rawio = self.MockRawIO()
1619 bufio = self.tp(rawio)
1620 bufio.__init__(rawio)
1621 bufio.__init__(rawio, buffer_size=1024)
1622 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001623 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 bufio.flush()
1625 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1626 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1627 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1628 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001629 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001631 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001632
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001633 def test_uninitialized(self):
1634 bufio = self.tp.__new__(self.tp)
1635 del bufio
1636 bufio = self.tp.__new__(self.tp)
1637 self.assertRaisesRegex((ValueError, AttributeError),
1638 'uninitialized|has no attribute',
1639 bufio.write, b'')
1640 bufio.__init__(self.MockRawIO())
1641 self.assertEqual(bufio.write(b''), 0)
1642
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001643 def test_detach_flush(self):
1644 raw = self.MockRawIO()
1645 buf = self.tp(raw)
1646 buf.write(b"howdy!")
1647 self.assertFalse(raw._write_stack)
1648 buf.detach()
1649 self.assertEqual(raw._write_stack, [b"howdy!"])
1650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001652 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 writer = self.MockRawIO()
1654 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001655 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001656 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001657 buffer = bytearray(b"def")
1658 bufio.write(buffer)
1659 buffer[:] = b"***" # Overwrite our copy of the data
1660 bufio.flush()
1661 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 def test_write_overflow(self):
1664 writer = self.MockRawIO()
1665 bufio = self.tp(writer, 8)
1666 contents = b"abcdefghijklmnop"
1667 for n in range(0, len(contents), 3):
1668 bufio.write(contents[n:n+3])
1669 flushed = b"".join(writer._write_stack)
1670 # At least (total - 8) bytes were implicitly flushed, perhaps more
1671 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001672 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 def check_writes(self, intermediate_func):
1675 # Lots of writes, test the flushed output is as expected.
1676 contents = bytes(range(256)) * 1000
1677 n = 0
1678 writer = self.MockRawIO()
1679 bufio = self.tp(writer, 13)
1680 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1681 def gen_sizes():
1682 for size in count(1):
1683 for i in range(15):
1684 yield size
1685 sizes = gen_sizes()
1686 while n < len(contents):
1687 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001688 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 intermediate_func(bufio)
1690 n += size
1691 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001692 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 def test_writes(self):
1695 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 def test_writes_and_flushes(self):
1698 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 def test_writes_and_seeks(self):
1701 def _seekabs(bufio):
1702 pos = bufio.tell()
1703 bufio.seek(pos + 1, 0)
1704 bufio.seek(pos - 1, 0)
1705 bufio.seek(pos, 0)
1706 self.check_writes(_seekabs)
1707 def _seekrel(bufio):
1708 pos = bufio.seek(0, 1)
1709 bufio.seek(+1, 1)
1710 bufio.seek(-1, 1)
1711 bufio.seek(pos, 0)
1712 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001714 def test_writes_and_truncates(self):
1715 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 def test_write_non_blocking(self):
1718 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001719 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001720
Ezio Melottib3aedd42010-11-20 19:04:17 +00001721 self.assertEqual(bufio.write(b"abcd"), 4)
1722 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 # 1 byte will be written, the rest will be buffered
1724 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001725 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001726
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1728 raw.block_on(b"0")
1729 try:
1730 bufio.write(b"opqrwxyz0123456789")
1731 except self.BlockingIOError as e:
1732 written = e.characters_written
1733 else:
1734 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001735 self.assertEqual(written, 16)
1736 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001738
Ezio Melottib3aedd42010-11-20 19:04:17 +00001739 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740 s = raw.pop_written()
1741 # Previously buffered bytes were flushed
1742 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 def test_write_and_rewind(self):
1745 raw = io.BytesIO()
1746 bufio = self.tp(raw, 4)
1747 self.assertEqual(bufio.write(b"abcdef"), 6)
1748 self.assertEqual(bufio.tell(), 6)
1749 bufio.seek(0, 0)
1750 self.assertEqual(bufio.write(b"XY"), 2)
1751 bufio.seek(6, 0)
1752 self.assertEqual(raw.getvalue(), b"XYcdef")
1753 self.assertEqual(bufio.write(b"123456"), 6)
1754 bufio.flush()
1755 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001756
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001757 def test_flush(self):
1758 writer = self.MockRawIO()
1759 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001760 bufio.write(b"abc")
1761 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001762 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001763
Antoine Pitrou131a4892012-10-16 22:57:11 +02001764 def test_writelines(self):
1765 l = [b'ab', b'cd', b'ef']
1766 writer = self.MockRawIO()
1767 bufio = self.tp(writer, 8)
1768 bufio.writelines(l)
1769 bufio.flush()
1770 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1771
1772 def test_writelines_userlist(self):
1773 l = UserList([b'ab', b'cd', b'ef'])
1774 writer = self.MockRawIO()
1775 bufio = self.tp(writer, 8)
1776 bufio.writelines(l)
1777 bufio.flush()
1778 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1779
1780 def test_writelines_error(self):
1781 writer = self.MockRawIO()
1782 bufio = self.tp(writer, 8)
1783 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1784 self.assertRaises(TypeError, bufio.writelines, None)
1785 self.assertRaises(TypeError, bufio.writelines, 'abc')
1786
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 def test_destructor(self):
1788 writer = self.MockRawIO()
1789 bufio = self.tp(writer, 8)
1790 bufio.write(b"abc")
1791 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001792 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001793 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794
1795 def test_truncate(self):
1796 # Truncate implicitly flushes the buffer.
Hai Shi883bc632020-07-06 17:12:49 +08001797 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1798 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 bufio = self.tp(raw, 8)
1800 bufio.write(b"abcdef")
1801 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001802 self.assertEqual(bufio.tell(), 6)
Hai Shi883bc632020-07-06 17:12:49 +08001803 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001804 self.assertEqual(f.read(), b"abc")
1805
Nitish Chandra059f58c2018-01-28 21:30:09 +05301806 def test_truncate_after_write(self):
1807 # Ensure that truncate preserves the file position after
1808 # writes longer than the buffer size.
1809 # Issue: https://bugs.python.org/issue32228
Hai Shi883bc632020-07-06 17:12:49 +08001810 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1811 with self.open(os_helper.TESTFN, "wb") as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301812 # Fill with some buffer
1813 f.write(b'\x00' * 10000)
1814 buffer_sizes = [8192, 4096, 200]
1815 for buffer_size in buffer_sizes:
Hai Shi883bc632020-07-06 17:12:49 +08001816 with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301817 f.write(b'\x00' * (buffer_size + 1))
1818 # After write write_pos and write_end are set to 0
1819 f.read(1)
1820 # read operation makes sure that pos != raw_pos
1821 f.truncate()
1822 self.assertEqual(f.tell(), buffer_size + 2)
1823
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001824 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001826 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001827 # Write out many bytes from many threads and test they were
1828 # all flushed.
1829 N = 1000
1830 contents = bytes(range(256)) * N
1831 sizes = cycle([1, 19])
1832 n = 0
1833 queue = deque()
1834 while n < len(contents):
1835 size = next(sizes)
1836 queue.append(contents[n:n+size])
1837 n += size
1838 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001839 # We use a real file object because it allows us to
1840 # exercise situations where the GIL is released before
1841 # writing the buffer to the raw streams. This is in addition
1842 # to concurrency issues due to switching threads in the middle
1843 # of Python code.
Hai Shi883bc632020-07-06 17:12:49 +08001844 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001846 errors = []
1847 def f():
1848 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001849 while True:
1850 try:
1851 s = queue.popleft()
1852 except IndexError:
1853 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001854 bufio.write(s)
1855 except Exception as e:
1856 errors.append(e)
1857 raise
1858 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001859 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001860 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001861 self.assertFalse(errors,
1862 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 bufio.close()
Hai Shi883bc632020-07-06 17:12:49 +08001864 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 s = f.read()
1866 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001867 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001868 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001869 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 def test_misbehaved_io(self):
1872 rawio = self.MisbehavedRawIO()
1873 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001874 self.assertRaises(OSError, bufio.seek, 0)
1875 self.assertRaises(OSError, bufio.tell)
1876 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877
Victor Stinnerb589cef2019-06-11 03:10:59 +02001878 # Silence destructor error
1879 bufio.close = lambda: None
1880
Florent Xicluna109d5732012-07-07 17:03:22 +02001881 def test_max_buffer_size_removal(self):
1882 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001883 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001884
Benjamin Peterson68623612012-12-20 11:53:11 -06001885 def test_write_error_on_close(self):
1886 raw = self.MockRawIO()
1887 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001888 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001889 raw.write = bad_write
1890 b = self.tp(raw)
1891 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001892 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001893 self.assertTrue(b.closed)
1894
benfogle9703f092017-11-10 16:03:40 -05001895 def test_slow_close_from_thread(self):
1896 # Issue #31976
1897 rawio = self.SlowFlushRawIO()
1898 bufio = self.tp(rawio, 8)
1899 t = threading.Thread(target=bufio.close)
1900 t.start()
1901 rawio.in_flush.wait()
1902 self.assertRaises(ValueError, bufio.write, b'spam')
1903 self.assertTrue(bufio.closed)
1904 t.join()
1905
1906
Benjamin Peterson59406a92009-03-26 17:10:29 +00001907
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001908class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001909 tp = io.BufferedWriter
1910
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001911 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1912 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 def test_constructor(self):
1914 BufferedWriterTest.test_constructor(self)
1915 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001916 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 if sys.maxsize > 0x7FFFFFFF:
1918 rawio = self.MockRawIO()
1919 bufio = self.tp(rawio)
1920 self.assertRaises((OverflowError, MemoryError, ValueError),
1921 bufio.__init__, rawio, sys.maxsize)
1922
1923 def test_initialization(self):
1924 rawio = self.MockRawIO()
1925 bufio = self.tp(rawio)
1926 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1927 self.assertRaises(ValueError, bufio.write, b"def")
1928 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1929 self.assertRaises(ValueError, bufio.write, b"def")
1930 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1931 self.assertRaises(ValueError, bufio.write, b"def")
1932
1933 def test_garbage_collection(self):
1934 # C BufferedWriter objects are collected, and collecting them flushes
1935 # all data to disk.
1936 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001937 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1938 with warnings_helper.check_warnings(('', ResourceWarning)):
1939 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001940 f = self.tp(rawio)
1941 f.write(b"123xxx")
1942 f.x = f
1943 wr = weakref.ref(f)
1944 del f
1945 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001946 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08001947 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 self.assertEqual(f.read(), b"123xxx")
1949
R David Murray67bfe802013-02-23 21:51:05 -05001950 def test_args_error(self):
1951 # Issue #17275
1952 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1953 self.tp(io.BytesIO(), 1024, 1024, 1024)
1954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955
1956class PyBufferedWriterTest(BufferedWriterTest):
1957 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001958
Guido van Rossum01a27522007-03-07 01:00:12 +00001959class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001960
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001961 def test_constructor(self):
1962 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001963 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001964
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001965 def test_uninitialized(self):
1966 pair = self.tp.__new__(self.tp)
1967 del pair
1968 pair = self.tp.__new__(self.tp)
1969 self.assertRaisesRegex((ValueError, AttributeError),
1970 'uninitialized|has no attribute',
1971 pair.read, 0)
1972 self.assertRaisesRegex((ValueError, AttributeError),
1973 'uninitialized|has no attribute',
1974 pair.write, b'')
1975 pair.__init__(self.MockRawIO(), self.MockRawIO())
1976 self.assertEqual(pair.read(0), b'')
1977 self.assertEqual(pair.write(b''), 0)
1978
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001979 def test_detach(self):
1980 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1981 self.assertRaises(self.UnsupportedOperation, pair.detach)
1982
Florent Xicluna109d5732012-07-07 17:03:22 +02001983 def test_constructor_max_buffer_size_removal(self):
1984 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001985 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001986
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001987 def test_constructor_with_not_readable(self):
1988 class NotReadable(MockRawIO):
1989 def readable(self):
1990 return False
1991
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001992 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001993
1994 def test_constructor_with_not_writeable(self):
1995 class NotWriteable(MockRawIO):
1996 def writable(self):
1997 return False
1998
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001999 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002000
2001 def test_read(self):
2002 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2003
2004 self.assertEqual(pair.read(3), b"abc")
2005 self.assertEqual(pair.read(1), b"d")
2006 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002007 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2008 self.assertEqual(pair.read(None), b"abc")
2009
2010 def test_readlines(self):
2011 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2012 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2013 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2014 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002015
2016 def test_read1(self):
2017 # .read1() is delegated to the underlying reader object, so this test
2018 # can be shallow.
2019 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2020
2021 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00002022 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002023
2024 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00002025 for method in ("readinto", "readinto1"):
2026 with self.subTest(method):
2027 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002028
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002029 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002030 self.assertEqual(getattr(pair, method)(data), 5)
2031 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002032
2033 def test_write(self):
2034 w = self.MockRawIO()
2035 pair = self.tp(self.MockRawIO(), w)
2036
2037 pair.write(b"abc")
2038 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002039 buffer = bytearray(b"def")
2040 pair.write(buffer)
2041 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002042 pair.flush()
2043 self.assertEqual(w._write_stack, [b"abc", b"def"])
2044
2045 def test_peek(self):
2046 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2047
2048 self.assertTrue(pair.peek(3).startswith(b"abc"))
2049 self.assertEqual(pair.read(3), b"abc")
2050
2051 def test_readable(self):
2052 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2053 self.assertTrue(pair.readable())
2054
2055 def test_writeable(self):
2056 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2057 self.assertTrue(pair.writable())
2058
2059 def test_seekable(self):
2060 # BufferedRWPairs are never seekable, even if their readers and writers
2061 # are.
2062 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2063 self.assertFalse(pair.seekable())
2064
2065 # .flush() is delegated to the underlying writer object and has been
2066 # tested in the test_write method.
2067
2068 def test_close_and_closed(self):
2069 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2070 self.assertFalse(pair.closed)
2071 pair.close()
2072 self.assertTrue(pair.closed)
2073
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002074 def test_reader_close_error_on_close(self):
2075 def reader_close():
2076 reader_non_existing
2077 reader = self.MockRawIO()
2078 reader.close = reader_close
2079 writer = self.MockRawIO()
2080 pair = self.tp(reader, writer)
2081 with self.assertRaises(NameError) as err:
2082 pair.close()
2083 self.assertIn('reader_non_existing', str(err.exception))
2084 self.assertTrue(pair.closed)
2085 self.assertFalse(reader.closed)
2086 self.assertTrue(writer.closed)
2087
Victor Stinner472f7942019-04-12 21:58:24 +02002088 # Silence destructor error
2089 reader.close = lambda: None
2090
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002091 def test_writer_close_error_on_close(self):
2092 def writer_close():
2093 writer_non_existing
2094 reader = self.MockRawIO()
2095 writer = self.MockRawIO()
2096 writer.close = writer_close
2097 pair = self.tp(reader, writer)
2098 with self.assertRaises(NameError) as err:
2099 pair.close()
2100 self.assertIn('writer_non_existing', str(err.exception))
2101 self.assertFalse(pair.closed)
2102 self.assertTrue(reader.closed)
2103 self.assertFalse(writer.closed)
2104
Victor Stinner472f7942019-04-12 21:58:24 +02002105 # Silence destructor error
2106 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002107 writer = None
2108
Victor Stinner212646c2019-06-14 18:03:22 +02002109 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002110 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002111 # Ignore BufferedRWPair unraisable exception
2112 with support.catch_unraisable_exception():
2113 pair = None
2114 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002115 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002116
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002117 def test_reader_writer_close_error_on_close(self):
2118 def reader_close():
2119 reader_non_existing
2120 def writer_close():
2121 writer_non_existing
2122 reader = self.MockRawIO()
2123 reader.close = reader_close
2124 writer = self.MockRawIO()
2125 writer.close = writer_close
2126 pair = self.tp(reader, writer)
2127 with self.assertRaises(NameError) as err:
2128 pair.close()
2129 self.assertIn('reader_non_existing', str(err.exception))
2130 self.assertIsInstance(err.exception.__context__, NameError)
2131 self.assertIn('writer_non_existing', str(err.exception.__context__))
2132 self.assertFalse(pair.closed)
2133 self.assertFalse(reader.closed)
2134 self.assertFalse(writer.closed)
2135
Victor Stinner472f7942019-04-12 21:58:24 +02002136 # Silence destructor error
2137 reader.close = lambda: None
2138 writer.close = lambda: None
2139
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002140 def test_isatty(self):
2141 class SelectableIsAtty(MockRawIO):
2142 def __init__(self, isatty):
2143 MockRawIO.__init__(self)
2144 self._isatty = isatty
2145
2146 def isatty(self):
2147 return self._isatty
2148
2149 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2150 self.assertFalse(pair.isatty())
2151
2152 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2153 self.assertTrue(pair.isatty())
2154
2155 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2156 self.assertTrue(pair.isatty())
2157
2158 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2159 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002160
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002161 def test_weakref_clearing(self):
2162 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2163 ref = weakref.ref(brw)
2164 brw = None
2165 ref = None # Shouldn't segfault.
2166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167class CBufferedRWPairTest(BufferedRWPairTest):
2168 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170class PyBufferedRWPairTest(BufferedRWPairTest):
2171 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173
2174class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2175 read_mode = "rb+"
2176 write_mode = "wb+"
2177
2178 def test_constructor(self):
2179 BufferedReaderTest.test_constructor(self)
2180 BufferedWriterTest.test_constructor(self)
2181
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002182 def test_uninitialized(self):
2183 BufferedReaderTest.test_uninitialized(self)
2184 BufferedWriterTest.test_uninitialized(self)
2185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002186 def test_read_and_write(self):
2187 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002188 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002189
2190 self.assertEqual(b"as", rw.read(2))
2191 rw.write(b"ddd")
2192 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002193 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 def test_seek_and_tell(self):
2198 raw = self.BytesIO(b"asdfghjkl")
2199 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002200
Ezio Melottib3aedd42010-11-20 19:04:17 +00002201 self.assertEqual(b"as", rw.read(2))
2202 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002203 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002205
Antoine Pitroue05565e2011-08-20 14:39:23 +02002206 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002207 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002208 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002209 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002210 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002211 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002212 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(7, rw.tell())
2214 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002215 rw.flush()
2216 self.assertEqual(b"asdf123fl", raw.getvalue())
2217
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002218 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002219
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220 def check_flush_and_read(self, read_func):
2221 raw = self.BytesIO(b"abcdefghi")
2222 bufio = self.tp(raw)
2223
Ezio Melottib3aedd42010-11-20 19:04:17 +00002224 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002226 self.assertEqual(b"ef", read_func(bufio, 2))
2227 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002229 self.assertEqual(6, bufio.tell())
2230 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 raw.seek(0, 0)
2232 raw.write(b"XYZ")
2233 # flush() resets the read buffer
2234 bufio.flush()
2235 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002237
2238 def test_flush_and_read(self):
2239 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2240
2241 def test_flush_and_readinto(self):
2242 def _readinto(bufio, n=-1):
2243 b = bytearray(n if n >= 0 else 9999)
2244 n = bufio.readinto(b)
2245 return bytes(b[:n])
2246 self.check_flush_and_read(_readinto)
2247
2248 def test_flush_and_peek(self):
2249 def _peek(bufio, n=-1):
2250 # This relies on the fact that the buffer can contain the whole
2251 # raw stream, otherwise peek() can return less.
2252 b = bufio.peek(n)
2253 if n != -1:
2254 b = b[:n]
2255 bufio.seek(len(b), 1)
2256 return b
2257 self.check_flush_and_read(_peek)
2258
2259 def test_flush_and_write(self):
2260 raw = self.BytesIO(b"abcdefghi")
2261 bufio = self.tp(raw)
2262
2263 bufio.write(b"123")
2264 bufio.flush()
2265 bufio.write(b"45")
2266 bufio.flush()
2267 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(b"12345fghi", raw.getvalue())
2269 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002270
2271 def test_threads(self):
2272 BufferedReaderTest.test_threads(self)
2273 BufferedWriterTest.test_threads(self)
2274
2275 def test_writes_and_peek(self):
2276 def _peek(bufio):
2277 bufio.peek(1)
2278 self.check_writes(_peek)
2279 def _peek(bufio):
2280 pos = bufio.tell()
2281 bufio.seek(-1, 1)
2282 bufio.peek(1)
2283 bufio.seek(pos, 0)
2284 self.check_writes(_peek)
2285
2286 def test_writes_and_reads(self):
2287 def _read(bufio):
2288 bufio.seek(-1, 1)
2289 bufio.read(1)
2290 self.check_writes(_read)
2291
2292 def test_writes_and_read1s(self):
2293 def _read1(bufio):
2294 bufio.seek(-1, 1)
2295 bufio.read1(1)
2296 self.check_writes(_read1)
2297
2298 def test_writes_and_readintos(self):
2299 def _read(bufio):
2300 bufio.seek(-1, 1)
2301 bufio.readinto(bytearray(1))
2302 self.check_writes(_read)
2303
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002304 def test_write_after_readahead(self):
2305 # Issue #6629: writing after the buffer was filled by readahead should
2306 # first rewind the raw stream.
2307 for overwrite_size in [1, 5]:
2308 raw = self.BytesIO(b"A" * 10)
2309 bufio = self.tp(raw, 4)
2310 # Trigger readahead
2311 self.assertEqual(bufio.read(1), b"A")
2312 self.assertEqual(bufio.tell(), 1)
2313 # Overwriting should rewind the raw stream if it needs so
2314 bufio.write(b"B" * overwrite_size)
2315 self.assertEqual(bufio.tell(), overwrite_size + 1)
2316 # If the write size was smaller than the buffer size, flush() and
2317 # check that rewind happens.
2318 bufio.flush()
2319 self.assertEqual(bufio.tell(), overwrite_size + 1)
2320 s = raw.getvalue()
2321 self.assertEqual(s,
2322 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2323
Antoine Pitrou7c404892011-05-13 00:13:33 +02002324 def test_write_rewind_write(self):
2325 # Various combinations of reading / writing / seeking backwards / writing again
2326 def mutate(bufio, pos1, pos2):
2327 assert pos2 >= pos1
2328 # Fill the buffer
2329 bufio.seek(pos1)
2330 bufio.read(pos2 - pos1)
2331 bufio.write(b'\x02')
2332 # This writes earlier than the previous write, but still inside
2333 # the buffer.
2334 bufio.seek(pos1)
2335 bufio.write(b'\x01')
2336
2337 b = b"\x80\x81\x82\x83\x84"
2338 for i in range(0, len(b)):
2339 for j in range(i, len(b)):
2340 raw = self.BytesIO(b)
2341 bufio = self.tp(raw, 100)
2342 mutate(bufio, i, j)
2343 bufio.flush()
2344 expected = bytearray(b)
2345 expected[j] = 2
2346 expected[i] = 1
2347 self.assertEqual(raw.getvalue(), expected,
2348 "failed result for i=%d, j=%d" % (i, j))
2349
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002350 def test_truncate_after_read_or_write(self):
2351 raw = self.BytesIO(b"A" * 10)
2352 bufio = self.tp(raw, 100)
2353 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2354 self.assertEqual(bufio.truncate(), 2)
2355 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2356 self.assertEqual(bufio.truncate(), 4)
2357
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002358 def test_misbehaved_io(self):
2359 BufferedReaderTest.test_misbehaved_io(self)
2360 BufferedWriterTest.test_misbehaved_io(self)
2361
Antoine Pitroue05565e2011-08-20 14:39:23 +02002362 def test_interleaved_read_write(self):
2363 # Test for issue #12213
2364 with self.BytesIO(b'abcdefgh') as raw:
2365 with self.tp(raw, 100) as f:
2366 f.write(b"1")
2367 self.assertEqual(f.read(1), b'b')
2368 f.write(b'2')
2369 self.assertEqual(f.read1(1), b'd')
2370 f.write(b'3')
2371 buf = bytearray(1)
2372 f.readinto(buf)
2373 self.assertEqual(buf, b'f')
2374 f.write(b'4')
2375 self.assertEqual(f.peek(1), b'h')
2376 f.flush()
2377 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2378
2379 with self.BytesIO(b'abc') as raw:
2380 with self.tp(raw, 100) as f:
2381 self.assertEqual(f.read(1), b'a')
2382 f.write(b"2")
2383 self.assertEqual(f.read(1), b'c')
2384 f.flush()
2385 self.assertEqual(raw.getvalue(), b'a2c')
2386
2387 def test_interleaved_readline_write(self):
2388 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2389 with self.tp(raw) as f:
2390 f.write(b'1')
2391 self.assertEqual(f.readline(), b'b\n')
2392 f.write(b'2')
2393 self.assertEqual(f.readline(), b'def\n')
2394 f.write(b'3')
2395 self.assertEqual(f.readline(), b'\n')
2396 f.flush()
2397 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2398
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002399 # You can't construct a BufferedRandom over a non-seekable stream.
2400 test_unseekable = None
2401
Berker Peksagfd5116c2020-02-21 20:57:26 +03002402 # writable() returns True, so there's no point to test it over
2403 # a writable stream.
2404 test_truncate_on_read_only = None
2405
R David Murray67bfe802013-02-23 21:51:05 -05002406
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002407class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002408 tp = io.BufferedRandom
2409
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002410 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2411 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002412 def test_constructor(self):
2413 BufferedRandomTest.test_constructor(self)
2414 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002415 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002416 if sys.maxsize > 0x7FFFFFFF:
2417 rawio = self.MockRawIO()
2418 bufio = self.tp(rawio)
2419 self.assertRaises((OverflowError, MemoryError, ValueError),
2420 bufio.__init__, rawio, sys.maxsize)
2421
2422 def test_garbage_collection(self):
2423 CBufferedReaderTest.test_garbage_collection(self)
2424 CBufferedWriterTest.test_garbage_collection(self)
2425
R David Murray67bfe802013-02-23 21:51:05 -05002426 def test_args_error(self):
2427 # Issue #17275
2428 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2429 self.tp(io.BytesIO(), 1024, 1024, 1024)
2430
2431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432class PyBufferedRandomTest(BufferedRandomTest):
2433 tp = pyio.BufferedRandom
2434
2435
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002436# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2437# properties:
2438# - A single output character can correspond to many bytes of input.
2439# - The number of input bytes to complete the character can be
2440# undetermined until the last input byte is received.
2441# - The number of input bytes can vary depending on previous input.
2442# - A single input byte can correspond to many characters of output.
2443# - The number of output characters can be undetermined until the
2444# last input byte is received.
2445# - The number of output characters can vary depending on previous input.
2446
2447class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2448 """
2449 For testing seek/tell behavior with a stateful, buffering decoder.
2450
2451 Input is a sequence of words. Words may be fixed-length (length set
2452 by input) or variable-length (period-terminated). In variable-length
2453 mode, extra periods are ignored. Possible words are:
2454 - 'i' followed by a number sets the input length, I (maximum 99).
2455 When I is set to 0, words are space-terminated.
2456 - 'o' followed by a number sets the output length, O (maximum 99).
2457 - Any other word is converted into a word followed by a period on
2458 the output. The output word consists of the input word truncated
2459 or padded out with hyphens to make its length equal to O. If O
2460 is 0, the word is output verbatim without truncating or padding.
2461 I and O are initially set to 1. When I changes, any buffered input is
2462 re-scanned according to the new I. EOF also terminates the last word.
2463 """
2464
2465 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002466 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002467 self.reset()
2468
2469 def __repr__(self):
2470 return '<SID %x>' % id(self)
2471
2472 def reset(self):
2473 self.i = 1
2474 self.o = 1
2475 self.buffer = bytearray()
2476
2477 def getstate(self):
2478 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2479 return bytes(self.buffer), i*100 + o
2480
2481 def setstate(self, state):
2482 buffer, io = state
2483 self.buffer = bytearray(buffer)
2484 i, o = divmod(io, 100)
2485 self.i, self.o = i ^ 1, o ^ 1
2486
2487 def decode(self, input, final=False):
2488 output = ''
2489 for b in input:
2490 if self.i == 0: # variable-length, terminated with period
2491 if b == ord('.'):
2492 if self.buffer:
2493 output += self.process_word()
2494 else:
2495 self.buffer.append(b)
2496 else: # fixed-length, terminate after self.i bytes
2497 self.buffer.append(b)
2498 if len(self.buffer) == self.i:
2499 output += self.process_word()
2500 if final and self.buffer: # EOF terminates the last word
2501 output += self.process_word()
2502 return output
2503
2504 def process_word(self):
2505 output = ''
2506 if self.buffer[0] == ord('i'):
2507 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2508 elif self.buffer[0] == ord('o'):
2509 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2510 else:
2511 output = self.buffer.decode('ascii')
2512 if len(output) < self.o:
2513 output += '-'*self.o # pad out with hyphens
2514 if self.o:
2515 output = output[:self.o] # truncate to output length
2516 output += '.'
2517 self.buffer = bytearray()
2518 return output
2519
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002520 codecEnabled = False
2521
Hai Shi14cdc212020-10-26 02:38:33 +08002522
2523# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
2524# when registering codecs and cleanup functions.
2525def lookupTestDecoder(name):
2526 if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
2527 latin1 = codecs.lookup('latin-1')
2528 return codecs.CodecInfo(
2529 name='test_decoder', encode=latin1.encode, decode=None,
2530 incrementalencoder=None,
2531 streamreader=None, streamwriter=None,
2532 incrementaldecoder=StatefulIncrementalDecoder)
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002533
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002534
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002535class StatefulIncrementalDecoderTest(unittest.TestCase):
2536 """
2537 Make sure the StatefulIncrementalDecoder actually works.
2538 """
2539
2540 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002541 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002542 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002543 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002544 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002545 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002546 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002547 # I=0, O=6 (variable-length input, fixed-length output)
2548 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2549 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002550 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002551 # I=6, O=3 (fixed-length input > fixed-length output)
2552 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2553 # I=0, then 3; O=29, then 15 (with longer output)
2554 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2555 'a----------------------------.' +
2556 'b----------------------------.' +
2557 'cde--------------------------.' +
2558 'abcdefghijabcde.' +
2559 'a.b------------.' +
2560 '.c.------------.' +
2561 'd.e------------.' +
2562 'k--------------.' +
2563 'l--------------.' +
2564 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002565 ]
2566
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002567 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002568 # Try a few one-shot test cases.
2569 for input, eof, output in self.test_cases:
2570 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002571 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002572
2573 # Also test an unfinished decode, followed by forcing EOF.
2574 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002575 self.assertEqual(d.decode(b'oiabcd'), '')
2576 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002577
2578class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002579
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002580 def setUp(self):
2581 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2582 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Hai Shi883bc632020-07-06 17:12:49 +08002583 os_helper.unlink(os_helper.TESTFN)
Hai Shi14cdc212020-10-26 02:38:33 +08002584 codecs.register(lookupTestDecoder)
2585 self.addCleanup(codecs.unregister, lookupTestDecoder)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002586
Guido van Rossumd0712812007-04-11 16:32:43 +00002587 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08002588 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002589
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002590 def test_constructor(self):
2591 r = self.BytesIO(b"\xc3\xa9\n\n")
2592 b = self.BufferedReader(r, 1000)
2593 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002594 t.__init__(b, encoding="latin-1", newline="\r\n")
2595 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002596 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002597 t.__init__(b, encoding="utf-8", line_buffering=True)
2598 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002599 self.assertEqual(t.line_buffering, True)
2600 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002601 self.assertRaises(TypeError, t.__init__, b, newline=42)
2602 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2603
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002604 def test_uninitialized(self):
2605 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2606 del t
2607 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2608 self.assertRaises(Exception, repr, t)
2609 self.assertRaisesRegex((ValueError, AttributeError),
2610 'uninitialized|has no attribute',
2611 t.read, 0)
2612 t.__init__(self.MockRawIO())
2613 self.assertEqual(t.read(0), '')
2614
Nick Coghlana9b15242014-02-04 22:11:18 +10002615 def test_non_text_encoding_codecs_are_rejected(self):
2616 # Ensure the constructor complains if passed a codec that isn't
2617 # marked as a text encoding
2618 # http://bugs.python.org/issue20404
2619 r = self.BytesIO()
2620 b = self.BufferedWriter(r)
2621 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2622 self.TextIOWrapper(b, encoding="hex")
2623
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002624 def test_detach(self):
2625 r = self.BytesIO()
2626 b = self.BufferedWriter(r)
2627 t = self.TextIOWrapper(b)
2628 self.assertIs(t.detach(), b)
2629
2630 t = self.TextIOWrapper(b, encoding="ascii")
2631 t.write("howdy")
2632 self.assertFalse(r.getvalue())
2633 t.detach()
2634 self.assertEqual(r.getvalue(), b"howdy")
2635 self.assertRaises(ValueError, t.detach)
2636
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002637 # Operations independent of the detached stream should still work
2638 repr(t)
2639 self.assertEqual(t.encoding, "ascii")
2640 self.assertEqual(t.errors, "strict")
2641 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002642 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002643
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002644 def test_repr(self):
2645 raw = self.BytesIO("hello".encode("utf-8"))
2646 b = self.BufferedReader(raw)
2647 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002648 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002649 self.assertRegex(repr(t),
2650 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002651 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002652 self.assertRegex(repr(t),
2653 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002654 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002655 self.assertRegex(repr(t),
2656 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002657 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002658 self.assertRegex(repr(t),
2659 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002660
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002661 t.buffer.detach()
2662 repr(t) # Should not raise an exception
2663
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002664 def test_recursive_repr(self):
2665 # Issue #25455
2666 raw = self.BytesIO()
2667 t = self.TextIOWrapper(raw)
2668 with support.swap_attr(raw, 'name', t):
2669 try:
2670 repr(t) # Should not crash
2671 except RuntimeError:
2672 pass
2673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002674 def test_line_buffering(self):
2675 r = self.BytesIO()
2676 b = self.BufferedWriter(r, 1000)
2677 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002678 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002679 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002680 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002681 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002682 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002683 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002684
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002685 def test_reconfigure_line_buffering(self):
2686 r = self.BytesIO()
2687 b = self.BufferedWriter(r, 1000)
2688 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2689 t.write("AB\nC")
2690 self.assertEqual(r.getvalue(), b"")
2691
2692 t.reconfigure(line_buffering=True) # implicit flush
2693 self.assertEqual(r.getvalue(), b"AB\nC")
2694 t.write("DEF\nG")
2695 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2696 t.write("H")
2697 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2698 t.reconfigure(line_buffering=False) # implicit flush
2699 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2700 t.write("IJ")
2701 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2702
2703 # Keeping default value
2704 t.reconfigure()
2705 t.reconfigure(line_buffering=None)
2706 self.assertEqual(t.line_buffering, False)
2707 t.reconfigure(line_buffering=True)
2708 t.reconfigure()
2709 t.reconfigure(line_buffering=None)
2710 self.assertEqual(t.line_buffering, True)
2711
Victor Stinner91106cd2017-12-13 12:29:09 +01002712 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002713 def test_default_encoding(self):
2714 old_environ = dict(os.environ)
2715 try:
2716 # try to get a user preferred encoding different than the current
2717 # locale encoding to check that TextIOWrapper() uses the current
2718 # locale encoding and not the user preferred encoding
2719 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2720 if key in os.environ:
2721 del os.environ[key]
2722
2723 current_locale_encoding = locale.getpreferredencoding(False)
2724 b = self.BytesIO()
2725 t = self.TextIOWrapper(b)
2726 self.assertEqual(t.encoding, current_locale_encoding)
2727 finally:
2728 os.environ.clear()
2729 os.environ.update(old_environ)
2730
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002731 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002732 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002733 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002734 # Issue 15989
2735 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002736 b = self.BytesIO()
2737 b.fileno = lambda: _testcapi.INT_MAX + 1
2738 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2739 b.fileno = lambda: _testcapi.UINT_MAX + 1
2740 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2741
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002742 def test_encoding(self):
2743 # Check the encoding attribute is always set, and valid
2744 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002745 t = self.TextIOWrapper(b, encoding="utf-8")
2746 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002747 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002748 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002749 codecs.lookup(t.encoding)
2750
2751 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002752 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002753 b = self.BytesIO(b"abc\n\xff\n")
2754 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002755 self.assertRaises(UnicodeError, t.read)
2756 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002757 b = self.BytesIO(b"abc\n\xff\n")
2758 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002759 self.assertRaises(UnicodeError, t.read)
2760 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002761 b = self.BytesIO(b"abc\n\xff\n")
2762 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002763 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002764 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002765 b = self.BytesIO(b"abc\n\xff\n")
2766 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002767 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002769 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002770 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002771 b = self.BytesIO()
2772 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002773 self.assertRaises(UnicodeError, t.write, "\xff")
2774 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002775 b = self.BytesIO()
2776 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002777 self.assertRaises(UnicodeError, t.write, "\xff")
2778 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002779 b = self.BytesIO()
2780 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002781 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002782 t.write("abc\xffdef\n")
2783 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002784 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002785 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 b = self.BytesIO()
2787 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002788 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002789 t.write("abc\xffdef\n")
2790 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002791 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002792
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002794 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2795
2796 tests = [
2797 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002798 [ '', input_lines ],
2799 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2800 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2801 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002802 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002803 encodings = (
2804 'utf-8', 'latin-1',
2805 'utf-16', 'utf-16-le', 'utf-16-be',
2806 'utf-32', 'utf-32-le', 'utf-32-be',
2807 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002808
Guido van Rossum8358db22007-08-18 21:39:55 +00002809 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002810 # character in TextIOWrapper._pending_line.
2811 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002812 # XXX: str.encode() should return bytes
2813 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002814 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002815 for bufsize in range(1, 10):
2816 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002817 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2818 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002819 encoding=encoding)
2820 if do_reads:
2821 got_lines = []
2822 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002823 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002824 if c2 == '':
2825 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002826 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002827 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002828 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002829 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002830
2831 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002832 self.assertEqual(got_line, exp_line)
2833 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002834
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002835 def test_newlines_input(self):
2836 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002837 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2838 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002839 (None, normalized.decode("ascii").splitlines(keepends=True)),
2840 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002841 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2842 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2843 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002844 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002845 buf = self.BytesIO(testdata)
2846 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002847 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002848 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002849 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002851 def test_newlines_output(self):
2852 testdict = {
2853 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2854 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2855 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2856 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2857 }
2858 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2859 for newline, expected in tests:
2860 buf = self.BytesIO()
2861 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2862 txt.write("AAA\nB")
2863 txt.write("BB\nCCC\n")
2864 txt.write("X\rY\r\nZ")
2865 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002866 self.assertEqual(buf.closed, False)
2867 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002868
2869 def test_destructor(self):
2870 l = []
2871 base = self.BytesIO
2872 class MyBytesIO(base):
2873 def close(self):
2874 l.append(self.getvalue())
2875 base.close(self)
2876 b = MyBytesIO()
2877 t = self.TextIOWrapper(b, encoding="ascii")
2878 t.write("abc")
2879 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002880 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002881 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002882
2883 def test_override_destructor(self):
2884 record = []
2885 class MyTextIO(self.TextIOWrapper):
2886 def __del__(self):
2887 record.append(1)
2888 try:
2889 f = super().__del__
2890 except AttributeError:
2891 pass
2892 else:
2893 f()
2894 def close(self):
2895 record.append(2)
2896 super().close()
2897 def flush(self):
2898 record.append(3)
2899 super().flush()
2900 b = self.BytesIO()
2901 t = MyTextIO(b, encoding="ascii")
2902 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002903 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002904 self.assertEqual(record, [1, 2, 3])
2905
2906 def test_error_through_destructor(self):
2907 # Test that the exception state is not modified by a destructor,
2908 # even if close() fails.
2909 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002910 with support.catch_unraisable_exception() as cm:
2911 with self.assertRaises(AttributeError):
2912 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002913
2914 if not IOBASE_EMITS_UNRAISABLE:
2915 self.assertIsNone(cm.unraisable)
2916 elif cm.unraisable is not None:
2917 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002918
Guido van Rossum9b76da62007-04-11 01:09:03 +00002919 # Systematic tests of the text I/O API
2920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002921 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002922 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002923 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Hai Shi883bc632020-07-06 17:12:49 +08002924 f = self.open(os_helper.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002925 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002926 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002927 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08002928 f = self.open(os_helper.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002929 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002930 self.assertEqual(f.tell(), 0)
2931 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002932 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002933 self.assertEqual(f.seek(0), 0)
2934 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002935 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002936 self.assertEqual(f.read(2), "ab")
2937 self.assertEqual(f.read(1), "c")
2938 self.assertEqual(f.read(1), "")
2939 self.assertEqual(f.read(), "")
2940 self.assertEqual(f.tell(), cookie)
2941 self.assertEqual(f.seek(0), 0)
2942 self.assertEqual(f.seek(0, 2), cookie)
2943 self.assertEqual(f.write("def"), 3)
2944 self.assertEqual(f.seek(cookie), cookie)
2945 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002946 if enc.startswith("utf"):
2947 self.multi_line_test(f, enc)
2948 f.close()
2949
2950 def multi_line_test(self, f, enc):
2951 f.seek(0)
2952 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002953 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002954 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002955 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002956 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002957 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002958 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002959 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002960 wlines.append((f.tell(), line))
2961 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002962 f.seek(0)
2963 rlines = []
2964 while True:
2965 pos = f.tell()
2966 line = f.readline()
2967 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002968 break
2969 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002970 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002972 def test_telling(self):
Hai Shi883bc632020-07-06 17:12:49 +08002973 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002974 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002975 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002976 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002977 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002978 p2 = f.tell()
2979 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002980 self.assertEqual(f.tell(), p0)
2981 self.assertEqual(f.readline(), "\xff\n")
2982 self.assertEqual(f.tell(), p1)
2983 self.assertEqual(f.readline(), "\xff\n")
2984 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002985 f.seek(0)
2986 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002987 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002988 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002989 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002990 f.close()
2991
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002992 def test_seeking(self):
2993 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002994 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002995 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002996 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002997 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002998 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002999 suffix = bytes(u_suffix.encode("utf-8"))
3000 line = prefix + suffix
Hai Shi883bc632020-07-06 17:12:49 +08003001 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003002 f.write(line*2)
Hai Shi883bc632020-07-06 17:12:49 +08003003 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003004 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003005 self.assertEqual(s, str(prefix, "ascii"))
3006 self.assertEqual(f.tell(), prefix_size)
3007 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00003008
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003009 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00003010 # Regression test for a specific bug
3011 data = b'\xe0\xbf\xbf\n'
Hai Shi883bc632020-07-06 17:12:49 +08003012 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003013 f.write(data)
Hai Shi883bc632020-07-06 17:12:49 +08003014 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003015 f._CHUNK_SIZE # Just test that it exists
3016 f._CHUNK_SIZE = 2
3017 f.readline()
3018 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003020 def test_seek_and_tell(self):
3021 #Test seek/tell using the StatefulIncrementalDecoder.
3022 # Make test faster by doing smaller seeks
3023 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003024
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003025 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003026 """Tell/seek to various points within a data stream and ensure
3027 that the decoded data returned by read() is consistent."""
Hai Shi883bc632020-07-06 17:12:49 +08003028 f = self.open(os_helper.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003029 f.write(data)
3030 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08003031 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003032 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003033 decoded = f.read()
3034 f.close()
3035
Neal Norwitze2b07052008-03-18 19:52:05 +00003036 for i in range(min_pos, len(decoded) + 1): # seek positions
3037 for j in [1, 5, len(decoded) - i]: # read lengths
Hai Shi883bc632020-07-06 17:12:49 +08003038 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003039 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003040 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003041 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003042 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003043 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003044 f.close()
3045
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003046 # Enable the test decoder.
3047 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003048
3049 # Run the tests.
3050 try:
3051 # Try each test case.
3052 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003053 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003054
3055 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003056 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3057 offset = CHUNK_SIZE - len(input)//2
3058 prefix = b'.'*offset
3059 # Don't bother seeking into the prefix (takes too long).
3060 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003061 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003062
3063 # Ensure our test decoder won't interfere with subsequent tests.
3064 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003065 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003066
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003067 def test_multibyte_seek_and_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +08003068 f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003069 f.write("AB\n\u3046\u3048\n")
3070 f.close()
3071
Hai Shi883bc632020-07-06 17:12:49 +08003072 f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003073 self.assertEqual(f.readline(), "AB\n")
3074 p0 = f.tell()
3075 self.assertEqual(f.readline(), "\u3046\u3048\n")
3076 p1 = f.tell()
3077 f.seek(p0)
3078 self.assertEqual(f.readline(), "\u3046\u3048\n")
3079 self.assertEqual(f.tell(), p1)
3080 f.close()
3081
3082 def test_seek_with_encoder_state(self):
Hai Shi883bc632020-07-06 17:12:49 +08003083 f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003084 f.write("\u00e6\u0300")
3085 p0 = f.tell()
3086 f.write("\u00e6")
3087 f.seek(p0)
3088 f.write("\u0300")
3089 f.close()
3090
Hai Shi883bc632020-07-06 17:12:49 +08003091 f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003092 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3093 f.close()
3094
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003095 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003096 data = "1234567890"
3097 tests = ("utf-16",
3098 "utf-16-le",
3099 "utf-16-be",
3100 "utf-32",
3101 "utf-32-le",
3102 "utf-32-be")
3103 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003104 buf = self.BytesIO()
3105 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003106 # Check if the BOM is written only once (see issue1753).
3107 f.write(data)
3108 f.write(data)
3109 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003110 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003111 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003112 self.assertEqual(f.read(), data * 2)
3113 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003114
Benjamin Petersona1b49012009-03-31 23:11:32 +00003115 def test_unreadable(self):
3116 class UnReadable(self.BytesIO):
3117 def readable(self):
3118 return False
3119 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003120 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003121
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003122 def test_read_one_by_one(self):
3123 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003124 reads = ""
3125 while True:
3126 c = txt.read(1)
3127 if not c:
3128 break
3129 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003130 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003131
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003132 def test_readlines(self):
3133 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3134 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3135 txt.seek(0)
3136 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3137 txt.seek(0)
3138 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3139
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003140 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003141 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003142 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003143 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003144 reads = ""
3145 while True:
3146 c = txt.read(128)
3147 if not c:
3148 break
3149 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003150 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003151
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003152 def test_writelines(self):
3153 l = ['ab', 'cd', 'ef']
3154 buf = self.BytesIO()
3155 txt = self.TextIOWrapper(buf)
3156 txt.writelines(l)
3157 txt.flush()
3158 self.assertEqual(buf.getvalue(), b'abcdef')
3159
3160 def test_writelines_userlist(self):
3161 l = UserList(['ab', 'cd', 'ef'])
3162 buf = self.BytesIO()
3163 txt = self.TextIOWrapper(buf)
3164 txt.writelines(l)
3165 txt.flush()
3166 self.assertEqual(buf.getvalue(), b'abcdef')
3167
3168 def test_writelines_error(self):
3169 txt = self.TextIOWrapper(self.BytesIO())
3170 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3171 self.assertRaises(TypeError, txt.writelines, None)
3172 self.assertRaises(TypeError, txt.writelines, b'abc')
3173
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003174 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003175 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003176
3177 # read one char at a time
3178 reads = ""
3179 while True:
3180 c = txt.read(1)
3181 if not c:
3182 break
3183 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003184 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003185
3186 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003187 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003188 txt._CHUNK_SIZE = 4
3189
3190 reads = ""
3191 while True:
3192 c = txt.read(4)
3193 if not c:
3194 break
3195 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003196 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003197
3198 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003199 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003200 txt._CHUNK_SIZE = 4
3201
3202 reads = txt.read(4)
3203 reads += txt.read(4)
3204 reads += txt.readline()
3205 reads += txt.readline()
3206 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003207 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003208
3209 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003210 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003211 txt._CHUNK_SIZE = 4
3212
3213 reads = txt.read(4)
3214 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003215 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003216
3217 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003218 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003219 txt._CHUNK_SIZE = 4
3220
3221 reads = txt.read(4)
3222 pos = txt.tell()
3223 txt.seek(0)
3224 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003225 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003226
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003227 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003228 buffer = self.BytesIO(self.testdata)
3229 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003230
3231 self.assertEqual(buffer.seekable(), txt.seekable())
3232
Antoine Pitroue4501852009-05-14 18:55:55 +00003233 def test_append_bom(self):
3234 # The BOM is not written again when appending to a non-empty file
Hai Shi883bc632020-07-06 17:12:49 +08003235 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003236 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3237 with self.open(filename, 'w', encoding=charset) as f:
3238 f.write('aaa')
3239 pos = f.tell()
3240 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003241 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003242
3243 with self.open(filename, 'a', encoding=charset) as f:
3244 f.write('xxx')
3245 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003246 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003247
3248 def test_seek_bom(self):
3249 # Same test, but when seeking manually
Hai Shi883bc632020-07-06 17:12:49 +08003250 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003251 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3252 with self.open(filename, 'w', encoding=charset) as f:
3253 f.write('aaa')
3254 pos = f.tell()
3255 with self.open(filename, 'r+', encoding=charset) as f:
3256 f.seek(pos)
3257 f.write('zzz')
3258 f.seek(0)
3259 f.write('bbb')
3260 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003261 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003262
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003263 def test_seek_append_bom(self):
3264 # Same test, but first seek to the start and then to the end
Hai Shi883bc632020-07-06 17:12:49 +08003265 filename = os_helper.TESTFN
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003266 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3267 with self.open(filename, 'w', encoding=charset) as f:
3268 f.write('aaa')
3269 with self.open(filename, 'a', encoding=charset) as f:
3270 f.seek(0)
3271 f.seek(0, self.SEEK_END)
3272 f.write('xxx')
3273 with self.open(filename, 'rb') as f:
3274 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3275
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003276 def test_errors_property(self):
Hai Shi883bc632020-07-06 17:12:49 +08003277 with self.open(os_helper.TESTFN, "w") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003278 self.assertEqual(f.errors, "strict")
Hai Shi883bc632020-07-06 17:12:49 +08003279 with self.open(os_helper.TESTFN, "w", errors="replace") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003280 self.assertEqual(f.errors, "replace")
3281
Brett Cannon31f59292011-02-21 19:29:56 +00003282 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003283 def test_threads_write(self):
3284 # Issue6750: concurrent writes could duplicate data
3285 event = threading.Event()
Hai Shi883bc632020-07-06 17:12:49 +08003286 with self.open(os_helper.TESTFN, "w", buffering=1) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003287 def run(n):
3288 text = "Thread%03d\n" % n
3289 event.wait()
3290 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003291 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003292 for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08003293 with threading_helper.start_threads(threads, event.set):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003294 time.sleep(0.02)
Hai Shi883bc632020-07-06 17:12:49 +08003295 with self.open(os_helper.TESTFN) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003296 content = f.read()
3297 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003298 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003299
Antoine Pitrou6be88762010-05-03 16:48:20 +00003300 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003301 # Test that text file is closed despite failed flush
3302 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003303 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003304 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003305 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003306 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003307 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003308 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003309 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003310 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003311 self.assertTrue(txt.buffer.closed)
3312 self.assertTrue(closed) # flush() called
3313 self.assertFalse(closed[0]) # flush() called before file closed
3314 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003315 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003316
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003317 def test_close_error_on_close(self):
3318 buffer = self.BytesIO(self.testdata)
3319 def bad_flush():
3320 raise OSError('flush')
3321 def bad_close():
3322 raise OSError('close')
3323 buffer.close = bad_close
3324 txt = self.TextIOWrapper(buffer, encoding="ascii")
3325 txt.flush = bad_flush
3326 with self.assertRaises(OSError) as err: # exception not swallowed
3327 txt.close()
3328 self.assertEqual(err.exception.args, ('close',))
3329 self.assertIsInstance(err.exception.__context__, OSError)
3330 self.assertEqual(err.exception.__context__.args, ('flush',))
3331 self.assertFalse(txt.closed)
3332
Victor Stinner472f7942019-04-12 21:58:24 +02003333 # Silence destructor error
3334 buffer.close = lambda: None
3335 txt.flush = lambda: None
3336
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003337 def test_nonnormalized_close_error_on_close(self):
3338 # Issue #21677
3339 buffer = self.BytesIO(self.testdata)
3340 def bad_flush():
3341 raise non_existing_flush
3342 def bad_close():
3343 raise non_existing_close
3344 buffer.close = bad_close
3345 txt = self.TextIOWrapper(buffer, encoding="ascii")
3346 txt.flush = bad_flush
3347 with self.assertRaises(NameError) as err: # exception not swallowed
3348 txt.close()
3349 self.assertIn('non_existing_close', str(err.exception))
3350 self.assertIsInstance(err.exception.__context__, NameError)
3351 self.assertIn('non_existing_flush', str(err.exception.__context__))
3352 self.assertFalse(txt.closed)
3353
Victor Stinner472f7942019-04-12 21:58:24 +02003354 # Silence destructor error
3355 buffer.close = lambda: None
3356 txt.flush = lambda: None
3357
Antoine Pitrou6be88762010-05-03 16:48:20 +00003358 def test_multi_close(self):
3359 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3360 txt.close()
3361 txt.close()
3362 txt.close()
3363 self.assertRaises(ValueError, txt.flush)
3364
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003365 def test_unseekable(self):
3366 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3367 self.assertRaises(self.UnsupportedOperation, txt.tell)
3368 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3369
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003370 def test_readonly_attributes(self):
3371 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3372 buf = self.BytesIO(self.testdata)
3373 with self.assertRaises(AttributeError):
3374 txt.buffer = buf
3375
Antoine Pitroue96ec682011-07-23 21:46:35 +02003376 def test_rawio(self):
3377 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3378 # that subprocess.Popen() can have the required unbuffered
3379 # semantics with universal_newlines=True.
3380 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3381 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3382 # Reads
3383 self.assertEqual(txt.read(4), 'abcd')
3384 self.assertEqual(txt.readline(), 'efghi\n')
3385 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3386
3387 def test_rawio_write_through(self):
3388 # Issue #12591: with write_through=True, writes don't need a flush
3389 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3390 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3391 write_through=True)
3392 txt.write('1')
3393 txt.write('23\n4')
3394 txt.write('5')
3395 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3396
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003397 def test_bufio_write_through(self):
3398 # Issue #21396: write_through=True doesn't force a flush()
3399 # on the underlying binary buffered object.
3400 flush_called, write_called = [], []
3401 class BufferedWriter(self.BufferedWriter):
3402 def flush(self, *args, **kwargs):
3403 flush_called.append(True)
3404 return super().flush(*args, **kwargs)
3405 def write(self, *args, **kwargs):
3406 write_called.append(True)
3407 return super().write(*args, **kwargs)
3408
3409 rawio = self.BytesIO()
3410 data = b"a"
3411 bufio = BufferedWriter(rawio, len(data)*2)
3412 textio = self.TextIOWrapper(bufio, encoding='ascii',
3413 write_through=True)
3414 # write to the buffered io but don't overflow the buffer
3415 text = data.decode('ascii')
3416 textio.write(text)
3417
3418 # buffer.flush is not called with write_through=True
3419 self.assertFalse(flush_called)
3420 # buffer.write *is* called with write_through=True
3421 self.assertTrue(write_called)
3422 self.assertEqual(rawio.getvalue(), b"") # no flush
3423
3424 write_called = [] # reset
3425 textio.write(text * 10) # total content is larger than bufio buffer
3426 self.assertTrue(write_called)
3427 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3428
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003429 def test_reconfigure_write_through(self):
3430 raw = self.MockRawIO([])
3431 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3432 t.write('1')
3433 t.reconfigure(write_through=True) # implied flush
3434 self.assertEqual(t.write_through, True)
3435 self.assertEqual(b''.join(raw._write_stack), b'1')
3436 t.write('23')
3437 self.assertEqual(b''.join(raw._write_stack), b'123')
3438 t.reconfigure(write_through=False)
3439 self.assertEqual(t.write_through, False)
3440 t.write('45')
3441 t.flush()
3442 self.assertEqual(b''.join(raw._write_stack), b'12345')
3443 # Keeping default value
3444 t.reconfigure()
3445 t.reconfigure(write_through=None)
3446 self.assertEqual(t.write_through, False)
3447 t.reconfigure(write_through=True)
3448 t.reconfigure()
3449 t.reconfigure(write_through=None)
3450 self.assertEqual(t.write_through, True)
3451
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003452 def test_read_nonbytes(self):
3453 # Issue #17106
3454 # Crash when underlying read() returns non-bytes
3455 t = self.TextIOWrapper(self.StringIO('a'))
3456 self.assertRaises(TypeError, t.read, 1)
3457 t = self.TextIOWrapper(self.StringIO('a'))
3458 self.assertRaises(TypeError, t.readline)
3459 t = self.TextIOWrapper(self.StringIO('a'))
3460 self.assertRaises(TypeError, t.read)
3461
Oren Milmana5b4ea12017-08-25 21:14:54 +03003462 def test_illegal_encoder(self):
3463 # Issue 31271: Calling write() while the return value of encoder's
3464 # encode() is invalid shouldn't cause an assertion failure.
3465 rot13 = codecs.lookup("rot13")
3466 with support.swap_attr(rot13, '_is_text_encoding', True):
3467 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3468 self.assertRaises(TypeError, t.write, 'bar')
3469
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003470 def test_illegal_decoder(self):
3471 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003472 # Bypass the early encoding check added in issue 20404
3473 def _make_illegal_wrapper():
3474 quopri = codecs.lookup("quopri")
3475 quopri._is_text_encoding = True
3476 try:
3477 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3478 newline='\n', encoding="quopri")
3479 finally:
3480 quopri._is_text_encoding = False
3481 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003482 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003483 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003484 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003485 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003486 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003487 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003488 self.assertRaises(TypeError, t.read)
3489
Oren Milmanba7d7362017-08-29 11:58:27 +03003490 # Issue 31243: calling read() while the return value of decoder's
3491 # getstate() is invalid should neither crash the interpreter nor
3492 # raise a SystemError.
3493 def _make_very_illegal_wrapper(getstate_ret_val):
3494 class BadDecoder:
3495 def getstate(self):
3496 return getstate_ret_val
3497 def _get_bad_decoder(dummy):
3498 return BadDecoder()
3499 quopri = codecs.lookup("quopri")
3500 with support.swap_attr(quopri, 'incrementaldecoder',
3501 _get_bad_decoder):
3502 return _make_illegal_wrapper()
3503 t = _make_very_illegal_wrapper(42)
3504 self.assertRaises(TypeError, t.read, 42)
3505 t = _make_very_illegal_wrapper(())
3506 self.assertRaises(TypeError, t.read, 42)
3507 t = _make_very_illegal_wrapper((1, 2))
3508 self.assertRaises(TypeError, t.read, 42)
3509
Antoine Pitrou712cb732013-12-21 15:51:54 +01003510 def _check_create_at_shutdown(self, **kwargs):
3511 # Issue #20037: creating a TextIOWrapper at shutdown
3512 # shouldn't crash the interpreter.
3513 iomod = self.io.__name__
3514 code = """if 1:
3515 import codecs
3516 import {iomod} as io
3517
3518 # Avoid looking up codecs at shutdown
3519 codecs.lookup('utf-8')
3520
3521 class C:
3522 def __init__(self):
3523 self.buf = io.BytesIO()
3524 def __del__(self):
3525 io.TextIOWrapper(self.buf, **{kwargs})
3526 print("ok")
3527 c = C()
3528 """.format(iomod=iomod, kwargs=kwargs)
3529 return assert_python_ok("-c", code)
3530
3531 def test_create_at_shutdown_without_encoding(self):
3532 rc, out, err = self._check_create_at_shutdown()
3533 if err:
3534 # Can error out with a RuntimeError if the module state
3535 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003536 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003537 else:
3538 self.assertEqual("ok", out.decode().strip())
3539
3540 def test_create_at_shutdown_with_encoding(self):
3541 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3542 errors='strict')
3543 self.assertFalse(err)
3544 self.assertEqual("ok", out.decode().strip())
3545
Antoine Pitroub8503892014-04-29 10:14:02 +02003546 def test_read_byteslike(self):
3547 r = MemviewBytesIO(b'Just some random string\n')
3548 t = self.TextIOWrapper(r, 'utf-8')
3549
3550 # TextIOwrapper will not read the full string, because
3551 # we truncate it to a multiple of the native int size
3552 # so that we can construct a more complex memoryview.
3553 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3554
3555 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3556
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003557 def test_issue22849(self):
3558 class F(object):
3559 def readable(self): return True
3560 def writable(self): return True
3561 def seekable(self): return True
3562
3563 for i in range(10):
3564 try:
3565 self.TextIOWrapper(F(), encoding='utf-8')
3566 except Exception:
3567 pass
3568
3569 F.tell = lambda x: 0
3570 t = self.TextIOWrapper(F(), encoding='utf-8')
3571
INADA Naoki507434f2017-12-21 09:59:53 +09003572 def test_reconfigure_encoding_read(self):
3573 # latin1 -> utf8
3574 # (latin1 can decode utf-8 encoded string)
3575 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3576 raw = self.BytesIO(data)
3577 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3578 self.assertEqual(txt.readline(), 'abc\xe9\n')
3579 with self.assertRaises(self.UnsupportedOperation):
3580 txt.reconfigure(encoding='utf-8')
3581 with self.assertRaises(self.UnsupportedOperation):
3582 txt.reconfigure(newline=None)
3583
3584 def test_reconfigure_write_fromascii(self):
3585 # ascii has a specific encodefunc in the C implementation,
3586 # but utf-8-sig has not. Make sure that we get rid of the
3587 # cached encodefunc when we switch encoders.
3588 raw = self.BytesIO()
3589 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3590 txt.write('foo\n')
3591 txt.reconfigure(encoding='utf-8-sig')
3592 txt.write('\xe9\n')
3593 txt.flush()
3594 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3595
3596 def test_reconfigure_write(self):
3597 # latin -> utf8
3598 raw = self.BytesIO()
3599 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3600 txt.write('abc\xe9\n')
3601 txt.reconfigure(encoding='utf-8')
3602 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3603 txt.write('d\xe9f\n')
3604 txt.flush()
3605 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3606
3607 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3608 # the file
3609 raw = self.BytesIO()
3610 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3611 txt.write('abc\n')
3612 txt.reconfigure(encoding='utf-8-sig')
3613 txt.write('d\xe9f\n')
3614 txt.flush()
3615 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3616
3617 def test_reconfigure_write_non_seekable(self):
3618 raw = self.BytesIO()
3619 raw.seekable = lambda: False
3620 raw.seek = None
3621 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3622 txt.write('abc\n')
3623 txt.reconfigure(encoding='utf-8-sig')
3624 txt.write('d\xe9f\n')
3625 txt.flush()
3626
3627 # If the raw stream is not seekable, there'll be a BOM
3628 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3629
3630 def test_reconfigure_defaults(self):
3631 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3632 txt.reconfigure(encoding=None)
3633 self.assertEqual(txt.encoding, 'ascii')
3634 self.assertEqual(txt.errors, 'replace')
3635 txt.write('LF\n')
3636
3637 txt.reconfigure(newline='\r\n')
3638 self.assertEqual(txt.encoding, 'ascii')
3639 self.assertEqual(txt.errors, 'replace')
3640
3641 txt.reconfigure(errors='ignore')
3642 self.assertEqual(txt.encoding, 'ascii')
3643 self.assertEqual(txt.errors, 'ignore')
3644 txt.write('CRLF\n')
3645
3646 txt.reconfigure(encoding='utf-8', newline=None)
3647 self.assertEqual(txt.errors, 'strict')
3648 txt.seek(0)
3649 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3650
3651 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3652
3653 def test_reconfigure_newline(self):
3654 raw = self.BytesIO(b'CR\rEOF')
3655 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3656 txt.reconfigure(newline=None)
3657 self.assertEqual(txt.readline(), 'CR\n')
3658 raw = self.BytesIO(b'CR\rEOF')
3659 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3660 txt.reconfigure(newline='')
3661 self.assertEqual(txt.readline(), 'CR\r')
3662 raw = self.BytesIO(b'CR\rLF\nEOF')
3663 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3664 txt.reconfigure(newline='\n')
3665 self.assertEqual(txt.readline(), 'CR\rLF\n')
3666 raw = self.BytesIO(b'LF\nCR\rEOF')
3667 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3668 txt.reconfigure(newline='\r')
3669 self.assertEqual(txt.readline(), 'LF\nCR\r')
3670 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3671 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3672 txt.reconfigure(newline='\r\n')
3673 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3674
3675 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3676 txt.reconfigure(newline=None)
3677 txt.write('linesep\n')
3678 txt.reconfigure(newline='')
3679 txt.write('LF\n')
3680 txt.reconfigure(newline='\n')
3681 txt.write('LF\n')
3682 txt.reconfigure(newline='\r')
3683 txt.write('CR\n')
3684 txt.reconfigure(newline='\r\n')
3685 txt.write('CRLF\n')
3686 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3687 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3688
Zackery Spytz23db9352018-06-29 04:14:58 -06003689 def test_issue25862(self):
3690 # Assertion failures occurred in tell() after read() and write().
3691 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3692 t.read(1)
3693 t.read()
3694 t.tell()
3695 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3696 t.read(1)
3697 t.write('x')
3698 t.tell()
3699
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003700
Antoine Pitroub8503892014-04-29 10:14:02 +02003701class MemviewBytesIO(io.BytesIO):
3702 '''A BytesIO object whose read method returns memoryviews
3703 rather than bytes'''
3704
3705 def read1(self, len_):
3706 return _to_memoryview(super().read1(len_))
3707
3708 def read(self, len_):
3709 return _to_memoryview(super().read(len_))
3710
3711def _to_memoryview(buf):
3712 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3713
3714 arr = array.array('i')
3715 idx = len(buf) - len(buf) % arr.itemsize
3716 arr.frombytes(buf[:idx])
3717 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003718
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003719
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003720class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003721 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003722 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003723
3724 def test_initialization(self):
3725 r = self.BytesIO(b"\xc3\xa9\n\n")
3726 b = self.BufferedReader(r, 1000)
3727 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003728 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3729 self.assertRaises(ValueError, t.read)
3730
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003731 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3732 self.assertRaises(Exception, repr, t)
3733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003734 def test_garbage_collection(self):
3735 # C TextIOWrapper objects are collected, and collecting them flushes
3736 # all data to disk.
3737 # The Python version has __del__, so it ends in gc.garbage instead.
Hai Shi883bc632020-07-06 17:12:49 +08003738 with warnings_helper.check_warnings(('', ResourceWarning)):
3739 rawio = io.FileIO(os_helper.TESTFN, "wb")
Antoine Pitrou796564c2013-07-30 19:59:21 +02003740 b = self.BufferedWriter(rawio)
3741 t = self.TextIOWrapper(b, encoding="ascii")
3742 t.write("456def")
3743 t.x = t
3744 wr = weakref.ref(t)
3745 del t
3746 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003747 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08003748 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003749 self.assertEqual(f.read(), b"456def")
3750
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003751 def test_rwpair_cleared_before_textio(self):
3752 # Issue 13070: TextIOWrapper's finalization would crash when called
3753 # after the reference to the underlying BufferedRWPair's writer got
3754 # cleared by the GC.
3755 for i in range(1000):
3756 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3757 t1 = self.TextIOWrapper(b1, encoding="ascii")
3758 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3759 t2 = self.TextIOWrapper(b2, encoding="ascii")
3760 # circular references
3761 t1.buddy = t2
3762 t2.buddy = t1
3763 support.gc_collect()
3764
Zackery Spytz842acaa2018-12-17 07:52:45 -07003765 def test_del__CHUNK_SIZE_SystemError(self):
3766 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3767 with self.assertRaises(AttributeError):
3768 del t._CHUNK_SIZE
3769
Inada Naoki01806d52021-02-22 08:29:30 +09003770 def test_internal_buffer_size(self):
3771 # bpo-43260: TextIOWrapper's internal buffer should not store
3772 # data larger than chunk size.
3773 chunk_size = 8192 # default chunk size, updated later
3774
3775 class MockIO(self.MockRawIO):
3776 def write(self, data):
3777 if len(data) > chunk_size:
3778 raise RuntimeError
3779 return super().write(data)
3780
3781 buf = MockIO()
3782 t = self.TextIOWrapper(buf, encoding="ascii")
3783 chunk_size = t._CHUNK_SIZE
3784 t.write("abc")
3785 t.write("def")
3786 # default chunk size is 8192 bytes so t don't write data to buf.
3787 self.assertEqual([], buf._write_stack)
3788
3789 with self.assertRaises(RuntimeError):
3790 t.write("x"*(chunk_size+1))
3791
3792 self.assertEqual([b"abcdef"], buf._write_stack)
3793 t.write("ghi")
3794 t.write("x"*chunk_size)
3795 self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
3796
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003797
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003798class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003799 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003800 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003801
3802
3803class IncrementalNewlineDecoderTest(unittest.TestCase):
3804
3805 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003806 # UTF-8 specific tests for a newline decoder
3807 def _check_decode(b, s, **kwargs):
3808 # We exercise getstate() / setstate() as well as decode()
3809 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003810 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003811 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003812 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003813
Antoine Pitrou180a3362008-12-14 16:36:46 +00003814 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003815
Antoine Pitrou180a3362008-12-14 16:36:46 +00003816 _check_decode(b'\xe8', "")
3817 _check_decode(b'\xa2', "")
3818 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003819
Antoine Pitrou180a3362008-12-14 16:36:46 +00003820 _check_decode(b'\xe8', "")
3821 _check_decode(b'\xa2', "")
3822 _check_decode(b'\x88', "\u8888")
3823
3824 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003825 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3826
Antoine Pitrou180a3362008-12-14 16:36:46 +00003827 decoder.reset()
3828 _check_decode(b'\n', "\n")
3829 _check_decode(b'\r', "")
3830 _check_decode(b'', "\n", final=True)
3831 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003832
Antoine Pitrou180a3362008-12-14 16:36:46 +00003833 _check_decode(b'\r', "")
3834 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003835
Antoine Pitrou180a3362008-12-14 16:36:46 +00003836 _check_decode(b'\r\r\n', "\n\n")
3837 _check_decode(b'\r', "")
3838 _check_decode(b'\r', "\n")
3839 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003840
Antoine Pitrou180a3362008-12-14 16:36:46 +00003841 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3842 _check_decode(b'\xe8\xa2\x88', "\u8888")
3843 _check_decode(b'\n', "\n")
3844 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3845 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003846
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003847 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003848 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003849 if encoding is not None:
3850 encoder = codecs.getincrementalencoder(encoding)()
3851 def _decode_bytewise(s):
3852 # Decode one byte at a time
3853 for b in encoder.encode(s):
3854 result.append(decoder.decode(bytes([b])))
3855 else:
3856 encoder = None
3857 def _decode_bytewise(s):
3858 # Decode one char at a time
3859 for c in s:
3860 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003861 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003862 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003863 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003864 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003865 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003866 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003867 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003868 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003869 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003870 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003871 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003872 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003873 input = "abc"
3874 if encoder is not None:
3875 encoder.reset()
3876 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003877 self.assertEqual(decoder.decode(input), "abc")
3878 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003879
3880 def test_newline_decoder(self):
3881 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003882 # None meaning the IncrementalNewlineDecoder takes unicode input
3883 # rather than bytes input
3884 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003885 'utf-16', 'utf-16-le', 'utf-16-be',
3886 'utf-32', 'utf-32-le', 'utf-32-be',
3887 )
3888 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003889 decoder = enc and codecs.getincrementaldecoder(enc)()
3890 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3891 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003892 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003893 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3894 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003895 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003896
Antoine Pitrou66913e22009-03-06 23:40:56 +00003897 def test_newline_bytes(self):
3898 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3899 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003900 self.assertEqual(dec.newlines, None)
3901 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3902 self.assertEqual(dec.newlines, None)
3903 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3904 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003905 dec = self.IncrementalNewlineDecoder(None, translate=False)
3906 _check(dec)
3907 dec = self.IncrementalNewlineDecoder(None, translate=True)
3908 _check(dec)
3909
Xiang Zhangb08746b2018-10-31 19:49:16 +08003910 def test_translate(self):
3911 # issue 35062
3912 for translate in (-2, -1, 1, 2):
3913 decoder = codecs.getincrementaldecoder("utf-8")()
3914 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3915 self.check_newline_decoding_utf8(decoder)
3916 decoder = codecs.getincrementaldecoder("utf-8")()
3917 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3918 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3919
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003920class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3921 pass
3922
3923class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3924 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003925
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003926
Guido van Rossum01a27522007-03-07 01:00:12 +00003927# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003928
Guido van Rossum5abbf752007-08-27 17:39:33 +00003929class MiscIOTest(unittest.TestCase):
3930
Barry Warsaw40e82462008-11-20 20:14:50 +00003931 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08003932 os_helper.unlink(os_helper.TESTFN)
Barry Warsaw40e82462008-11-20 20:14:50 +00003933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003934 def test___all__(self):
3935 for name in self.io.__all__:
3936 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003937 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003938 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003939 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003940 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003941 self.assertTrue(issubclass(obj, Exception), name)
3942 elif not name.startswith("SEEK_"):
3943 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003944
Barry Warsaw40e82462008-11-20 20:14:50 +00003945 def test_attributes(self):
Hai Shi883bc632020-07-06 17:12:49 +08003946 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003947 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003948 f.close()
3949
Hai Shi883bc632020-07-06 17:12:49 +08003950 with warnings_helper.check_warnings(('', DeprecationWarning)):
3951 f = self.open(os_helper.TESTFN, "U")
3952 self.assertEqual(f.name, os_helper.TESTFN)
3953 self.assertEqual(f.buffer.name, os_helper.TESTFN)
3954 self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
Victor Stinner942f7a22020-03-04 18:50:22 +01003955 self.assertEqual(f.mode, "U")
3956 self.assertEqual(f.buffer.mode, "rb")
3957 self.assertEqual(f.buffer.raw.mode, "rb")
3958 f.close()
3959
Hai Shi883bc632020-07-06 17:12:49 +08003960 f = self.open(os_helper.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003961 self.assertEqual(f.mode, "w+")
3962 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3963 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003964
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003965 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003966 self.assertEqual(g.mode, "wb")
3967 self.assertEqual(g.raw.mode, "wb")
3968 self.assertEqual(g.name, f.fileno())
3969 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003970 f.close()
3971 g.close()
3972
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003973 def test_open_pipe_with_append(self):
3974 # bpo-27805: Ignore ESPIPE from lseek() in open().
3975 r, w = os.pipe()
3976 self.addCleanup(os.close, r)
3977 f = self.open(w, 'a')
3978 self.addCleanup(f.close)
3979 # Check that the file is marked non-seekable. On Windows, however, lseek
3980 # somehow succeeds on pipes.
3981 if sys.platform != 'win32':
3982 self.assertFalse(f.seekable())
3983
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003984 def test_io_after_close(self):
3985 for kwargs in [
3986 {"mode": "w"},
3987 {"mode": "wb"},
3988 {"mode": "w", "buffering": 1},
3989 {"mode": "w", "buffering": 2},
3990 {"mode": "wb", "buffering": 0},
3991 {"mode": "r"},
3992 {"mode": "rb"},
3993 {"mode": "r", "buffering": 1},
3994 {"mode": "r", "buffering": 2},
3995 {"mode": "rb", "buffering": 0},
3996 {"mode": "w+"},
3997 {"mode": "w+b"},
3998 {"mode": "w+", "buffering": 1},
3999 {"mode": "w+", "buffering": 2},
4000 {"mode": "w+b", "buffering": 0},
4001 ]:
Hai Shi883bc632020-07-06 17:12:49 +08004002 f = self.open(os_helper.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004003 f.close()
4004 self.assertRaises(ValueError, f.flush)
4005 self.assertRaises(ValueError, f.fileno)
4006 self.assertRaises(ValueError, f.isatty)
4007 self.assertRaises(ValueError, f.__iter__)
4008 if hasattr(f, "peek"):
4009 self.assertRaises(ValueError, f.peek, 1)
4010 self.assertRaises(ValueError, f.read)
4011 if hasattr(f, "read1"):
4012 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00004013 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02004014 if hasattr(f, "readall"):
4015 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004016 if hasattr(f, "readinto"):
4017 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07004018 if hasattr(f, "readinto1"):
4019 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004020 self.assertRaises(ValueError, f.readline)
4021 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08004022 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004023 self.assertRaises(ValueError, f.seek, 0)
4024 self.assertRaises(ValueError, f.tell)
4025 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004026 self.assertRaises(ValueError, f.write,
4027 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004028 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004029 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004031 def test_blockingioerror(self):
4032 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004033 class C(str):
4034 pass
4035 c = C("")
4036 b = self.BlockingIOError(1, c)
4037 c.b = b
4038 b.c = c
4039 wr = weakref.ref(c)
4040 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00004041 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004042 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004043
4044 def test_abcs(self):
4045 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00004046 self.assertIsInstance(self.IOBase, abc.ABCMeta)
4047 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4048 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4049 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004050
4051 def _check_abc_inheritance(self, abcmodule):
Hai Shi883bc632020-07-06 17:12:49 +08004052 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004053 self.assertIsInstance(f, abcmodule.IOBase)
4054 self.assertIsInstance(f, abcmodule.RawIOBase)
4055 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4056 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004057 with self.open(os_helper.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004058 self.assertIsInstance(f, abcmodule.IOBase)
4059 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4060 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4061 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004062 with self.open(os_helper.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004063 self.assertIsInstance(f, abcmodule.IOBase)
4064 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4065 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4066 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004067
4068 def test_abc_inheritance(self):
4069 # Test implementations inherit from their respective ABCs
4070 self._check_abc_inheritance(self)
4071
4072 def test_abc_inheritance_official(self):
4073 # Test implementations inherit from the official ABCs of the
4074 # baseline "io" module.
4075 self._check_abc_inheritance(io)
4076
Antoine Pitroue033e062010-10-29 10:38:18 +00004077 def _check_warn_on_dealloc(self, *args, **kwargs):
4078 f = open(*args, **kwargs)
4079 r = repr(f)
4080 with self.assertWarns(ResourceWarning) as cm:
4081 f = None
4082 support.gc_collect()
4083 self.assertIn(r, str(cm.warning.args[0]))
4084
4085 def test_warn_on_dealloc(self):
Hai Shi883bc632020-07-06 17:12:49 +08004086 self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4087 self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
4088 self._check_warn_on_dealloc(os_helper.TESTFN, "w")
Antoine Pitroue033e062010-10-29 10:38:18 +00004089
4090 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4091 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004092 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004093 for fd in fds:
4094 try:
4095 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004096 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004097 if e.errno != errno.EBADF:
4098 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004099 self.addCleanup(cleanup_fds)
4100 r, w = os.pipe()
4101 fds += r, w
4102 self._check_warn_on_dealloc(r, *args, **kwargs)
4103 # When using closefd=False, there's no warning
4104 r, w = os.pipe()
4105 fds += r, w
Hai Shi883bc632020-07-06 17:12:49 +08004106 with warnings_helper.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004107 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004108
4109 def test_warn_on_dealloc_fd(self):
4110 self._check_warn_on_dealloc_fd("rb", buffering=0)
4111 self._check_warn_on_dealloc_fd("rb")
4112 self._check_warn_on_dealloc_fd("r")
4113
4114
Antoine Pitrou243757e2010-11-05 21:15:39 +00004115 def test_pickling(self):
4116 # Pickling file objects is forbidden
4117 for kwargs in [
4118 {"mode": "w"},
4119 {"mode": "wb"},
4120 {"mode": "wb", "buffering": 0},
4121 {"mode": "r"},
4122 {"mode": "rb"},
4123 {"mode": "rb", "buffering": 0},
4124 {"mode": "w+"},
4125 {"mode": "w+b"},
4126 {"mode": "w+b", "buffering": 0},
4127 ]:
4128 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
Hai Shi883bc632020-07-06 17:12:49 +08004129 with self.open(os_helper.TESTFN, **kwargs) as f:
Antoine Pitrou243757e2010-11-05 21:15:39 +00004130 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4131
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004132 def test_nonblock_pipe_write_bigbuf(self):
4133 self._test_nonblock_pipe_write(16*1024)
4134
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004135 def test_nonblock_pipe_write_smallbuf(self):
4136 self._test_nonblock_pipe_write(1024)
4137
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004138 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4139 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004140 def _test_nonblock_pipe_write(self, bufsize):
4141 sent = []
4142 received = []
4143 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004144 os.set_blocking(r, False)
4145 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004146
4147 # To exercise all code paths in the C implementation we need
4148 # to play with buffer sizes. For instance, if we choose a
4149 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4150 # then we will never get a partial write of the buffer.
4151 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4152 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4153
4154 with rf, wf:
4155 for N in 9999, 73, 7574:
4156 try:
4157 i = 0
4158 while True:
4159 msg = bytes([i % 26 + 97]) * N
4160 sent.append(msg)
4161 wf.write(msg)
4162 i += 1
4163
4164 except self.BlockingIOError as e:
4165 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004166 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004167 sent[-1] = sent[-1][:e.characters_written]
4168 received.append(rf.read())
4169 msg = b'BLOCKED'
4170 wf.write(msg)
4171 sent.append(msg)
4172
4173 while True:
4174 try:
4175 wf.flush()
4176 break
4177 except self.BlockingIOError as e:
4178 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004179 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004180 self.assertEqual(e.characters_written, 0)
4181 received.append(rf.read())
4182
4183 received += iter(rf.read, None)
4184
4185 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004186 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004187 self.assertTrue(wf.closed)
4188 self.assertTrue(rf.closed)
4189
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004190 def test_create_fail(self):
4191 # 'x' mode fails if file is existing
Hai Shi883bc632020-07-06 17:12:49 +08004192 with self.open(os_helper.TESTFN, 'w'):
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004193 pass
Hai Shi883bc632020-07-06 17:12:49 +08004194 self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x')
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004195
4196 def test_create_writes(self):
4197 # 'x' mode opens for writing
Hai Shi883bc632020-07-06 17:12:49 +08004198 with self.open(os_helper.TESTFN, 'xb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004199 f.write(b"spam")
Hai Shi883bc632020-07-06 17:12:49 +08004200 with self.open(os_helper.TESTFN, 'rb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004201 self.assertEqual(b"spam", f.read())
4202
Christian Heimes7b648752012-09-10 14:48:43 +02004203 def test_open_allargs(self):
4204 # there used to be a buffer overflow in the parser for rawmode
Hai Shi883bc632020-07-06 17:12:49 +08004205 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+')
Christian Heimes7b648752012-09-10 14:48:43 +02004206
Victor Stinner22eb6892019-06-26 00:51:05 +02004207 def test_check_encoding_errors(self):
4208 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4209 # arguments in dev mode
4210 mod = self.io.__name__
4211 filename = __file__
4212 invalid = 'Boom, Shaka Laka, Boom!'
4213 code = textwrap.dedent(f'''
4214 import sys
4215 from {mod} import open, TextIOWrapper
4216
4217 try:
4218 open({filename!r}, encoding={invalid!r})
4219 except LookupError:
4220 pass
4221 else:
4222 sys.exit(21)
4223
4224 try:
4225 open({filename!r}, errors={invalid!r})
4226 except LookupError:
4227 pass
4228 else:
4229 sys.exit(22)
4230
4231 fp = open({filename!r}, "rb")
4232 with fp:
4233 try:
4234 TextIOWrapper(fp, encoding={invalid!r})
4235 except LookupError:
4236 pass
4237 else:
4238 sys.exit(23)
4239
4240 try:
4241 TextIOWrapper(fp, errors={invalid!r})
4242 except LookupError:
4243 pass
4244 else:
4245 sys.exit(24)
4246
4247 sys.exit(10)
4248 ''')
4249 proc = assert_python_failure('-X', 'dev', '-c', code)
4250 self.assertEqual(proc.rc, 10, proc)
4251
Christian Heimes7b648752012-09-10 14:48:43 +02004252
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004253class CMiscIOTest(MiscIOTest):
4254 io = io
4255
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004256 def test_readinto_buffer_overflow(self):
4257 # Issue #18025
4258 class BadReader(self.io.BufferedIOBase):
4259 def read(self, n=-1):
4260 return b'x' * 10**6
4261 bufio = BadReader()
4262 b = bytearray(2)
4263 self.assertRaises(ValueError, bufio.readinto, b)
4264
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004265 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4266 # Issue #23309: deadlocks at shutdown should be avoided when a
4267 # daemon thread and the main thread both write to a file.
4268 code = """if 1:
4269 import sys
4270 import time
4271 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004272 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004273
4274 file = sys.{stream_name}
4275
4276 def run():
4277 while True:
4278 file.write('.')
4279 file.flush()
4280
Victor Stinner2a1aed02017-04-21 17:59:23 +02004281 crash = SuppressCrashReport()
4282 crash.__enter__()
4283 # don't call __exit__(): the crash occurs at Python shutdown
4284
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004285 thread = threading.Thread(target=run)
4286 thread.daemon = True
4287 thread.start()
4288
4289 time.sleep(0.5)
4290 file.write('!')
4291 file.flush()
4292 """.format_map(locals())
4293 res, _ = run_python_until_end("-c", code)
4294 err = res.err.decode()
4295 if res.rc != 0:
4296 # Failure: should be a fatal error
Victor Stinner9e5d30c2020-03-07 00:54:20 +01004297 pattern = (r"Fatal Python error: _enter_buffered_busy: "
4298 r"could not acquire lock "
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004299 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4300 r"at interpreter shutdown, possibly due to "
4301 r"daemon threads".format_map(locals()))
4302 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004303 else:
4304 self.assertFalse(err.strip('.!'))
4305
4306 def test_daemon_threads_shutdown_stdout_deadlock(self):
4307 self.check_daemon_threads_shutdown_deadlock('stdout')
4308
4309 def test_daemon_threads_shutdown_stderr_deadlock(self):
4310 self.check_daemon_threads_shutdown_deadlock('stderr')
4311
4312
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004313class PyMiscIOTest(MiscIOTest):
4314 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004315
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004316
4317@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4318class SignalsTest(unittest.TestCase):
4319
4320 def setUp(self):
4321 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4322
4323 def tearDown(self):
4324 signal.signal(signal.SIGALRM, self.oldalrm)
4325
4326 def alarm_interrupt(self, sig, frame):
4327 1/0
4328
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004329 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4330 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004331 invokes the signal handler, and bubbles up the exception raised
4332 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004333 read_results = []
4334 def _read():
4335 s = os.read(r, 1)
4336 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004337
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004338 t = threading.Thread(target=_read)
4339 t.daemon = True
4340 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004341 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004342 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004343 try:
4344 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004345 if hasattr(signal, 'pthread_sigmask'):
4346 # create the thread with SIGALRM signal blocked
4347 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4348 t.start()
4349 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4350 else:
4351 t.start()
4352
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004353 # Fill the pipe enough that the write will be blocking.
4354 # It will be interrupted by the timer armed above. Since the
4355 # other thread has read one byte, the low-level write will
4356 # return with a successful (partial) result rather than an EINTR.
4357 # The buffered IO layer must check for pending signal
4358 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004359 signal.alarm(1)
4360 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004361 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004362 finally:
4363 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004364 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004365 # We got one byte, get another one and check that it isn't a
4366 # repeat of the first one.
4367 read_results.append(os.read(r, 1))
4368 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4369 finally:
4370 os.close(w)
4371 os.close(r)
4372 # This is deliberate. If we didn't close the file descriptor
4373 # before closing wio, wio would try to flush its internal
4374 # buffer, and block again.
4375 try:
4376 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004377 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004378 if e.errno != errno.EBADF:
4379 raise
4380
4381 def test_interrupted_write_unbuffered(self):
4382 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4383
4384 def test_interrupted_write_buffered(self):
4385 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4386
4387 def test_interrupted_write_text(self):
4388 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4389
Brett Cannon31f59292011-02-21 19:29:56 +00004390 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004391 def check_reentrant_write(self, data, **fdopen_kwargs):
4392 def on_alarm(*args):
4393 # Will be called reentrantly from the same thread
4394 wio.write(data)
4395 1/0
4396 signal.signal(signal.SIGALRM, on_alarm)
4397 r, w = os.pipe()
4398 wio = self.io.open(w, **fdopen_kwargs)
4399 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004400 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004401 # Either the reentrant call to wio.write() fails with RuntimeError,
4402 # or the signal handler raises ZeroDivisionError.
4403 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4404 while 1:
4405 for i in range(100):
4406 wio.write(data)
4407 wio.flush()
4408 # Make sure the buffer doesn't fill up and block further writes
4409 os.read(r, len(data) * 100)
4410 exc = cm.exception
4411 if isinstance(exc, RuntimeError):
4412 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4413 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004414 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004415 wio.close()
4416 os.close(r)
4417
4418 def test_reentrant_write_buffered(self):
4419 self.check_reentrant_write(b"xy", mode="wb")
4420
4421 def test_reentrant_write_text(self):
4422 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4423
Antoine Pitrou707ce822011-02-25 21:24:11 +00004424 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4425 """Check that a buffered read, when it gets interrupted (either
4426 returning a partial result or EINTR), properly invokes the signal
4427 handler and retries if the latter returned successfully."""
4428 r, w = os.pipe()
4429 fdopen_kwargs["closefd"] = False
4430 def alarm_handler(sig, frame):
4431 os.write(w, b"bar")
4432 signal.signal(signal.SIGALRM, alarm_handler)
4433 try:
4434 rio = self.io.open(r, **fdopen_kwargs)
4435 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004436 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004437 # Expected behaviour:
4438 # - first raw read() returns partial b"foo"
4439 # - second raw read() returns EINTR
4440 # - third raw read() returns b"bar"
4441 self.assertEqual(decode(rio.read(6)), "foobar")
4442 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004443 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004444 rio.close()
4445 os.close(w)
4446 os.close(r)
4447
Antoine Pitrou20db5112011-08-19 20:32:34 +02004448 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004449 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4450 mode="rb")
4451
Antoine Pitrou20db5112011-08-19 20:32:34 +02004452 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004453 self.check_interrupted_read_retry(lambda x: x,
4454 mode="r")
4455
Antoine Pitrou707ce822011-02-25 21:24:11 +00004456 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4457 """Check that a buffered write, when it gets interrupted (either
4458 returning a partial result or EINTR), properly invokes the signal
4459 handler and retries if the latter returned successfully."""
Hai Shi883bc632020-07-06 17:12:49 +08004460 select = import_helper.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004461
Antoine Pitrou707ce822011-02-25 21:24:11 +00004462 # A quantity that exceeds the buffer size of an anonymous pipe's
4463 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004464 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004465 r, w = os.pipe()
4466 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004467
Antoine Pitrou707ce822011-02-25 21:24:11 +00004468 # We need a separate thread to read from the pipe and allow the
4469 # write() to finish. This thread is started after the SIGALRM is
4470 # received (forcing a first EINTR in write()).
4471 read_results = []
4472 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004473 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004474 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004475 try:
4476 while not write_finished:
4477 while r in select.select([r], [], [], 1.0)[0]:
4478 s = os.read(r, 1024)
4479 read_results.append(s)
4480 except BaseException as exc:
4481 nonlocal error
4482 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004483 t = threading.Thread(target=_read)
4484 t.daemon = True
4485 def alarm1(sig, frame):
4486 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004487 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004488 def alarm2(sig, frame):
4489 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004490
4491 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004492 signal.signal(signal.SIGALRM, alarm1)
4493 try:
4494 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004495 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004496 # Expected behaviour:
4497 # - first raw write() is partial (because of the limited pipe buffer
4498 # and the first alarm)
4499 # - second raw write() returns EINTR (because of the second alarm)
4500 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004501 written = wio.write(large_data)
4502 self.assertEqual(N, written)
4503
Antoine Pitrou707ce822011-02-25 21:24:11 +00004504 wio.flush()
4505 write_finished = True
4506 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004507
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004508 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004509 self.assertEqual(N, sum(len(x) for x in read_results))
4510 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004511 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004512 write_finished = True
4513 os.close(w)
4514 os.close(r)
4515 # This is deliberate. If we didn't close the file descriptor
4516 # before closing wio, wio would try to flush its internal
4517 # buffer, and could block (in case of failure).
4518 try:
4519 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004520 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004521 if e.errno != errno.EBADF:
4522 raise
4523
Antoine Pitrou20db5112011-08-19 20:32:34 +02004524 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004525 self.check_interrupted_write_retry(b"x", mode="wb")
4526
Antoine Pitrou20db5112011-08-19 20:32:34 +02004527 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004528 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4529
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004530
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004531class CSignalsTest(SignalsTest):
4532 io = io
4533
4534class PySignalsTest(SignalsTest):
4535 io = pyio
4536
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004537 # Handling reentrancy issues would slow down _pyio even more, so the
4538 # tests are disabled.
4539 test_reentrant_write_buffered = None
4540 test_reentrant_write_text = None
4541
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004542
Ezio Melottidaa42c72013-03-23 16:30:16 +02004543def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004544 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004545 CBufferedReaderTest, PyBufferedReaderTest,
4546 CBufferedWriterTest, PyBufferedWriterTest,
4547 CBufferedRWPairTest, PyBufferedRWPairTest,
4548 CBufferedRandomTest, PyBufferedRandomTest,
4549 StatefulIncrementalDecoderTest,
4550 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4551 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004552 CMiscIOTest, PyMiscIOTest,
4553 CSignalsTest, PySignalsTest,
4554 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004555
4556 # Put the namespaces of the IO module we are testing and some useful mock
4557 # classes in the __dict__ of each test.
4558 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004559 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4560 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004561 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4562 c_io_ns = {name : getattr(io, name) for name in all_members}
4563 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4564 globs = globals()
4565 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4566 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4567 # Avoid turning open into a bound method.
4568 py_io_ns["open"] = pyio.OpenWrapper
4569 for test in tests:
4570 if test.__name__.startswith("C"):
4571 for name, obj in c_io_ns.items():
4572 setattr(test, name, obj)
4573 elif test.__name__.startswith("Py"):
4574 for name, obj in py_io_ns.items():
4575 setattr(test, name, obj)
4576
Ezio Melottidaa42c72013-03-23 16:30:16 +02004577 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4578 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004579
4580if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004581 unittest.main()