blob: 6a9ce39f08eb581a3c1df3cfde58d9057e7af4aa [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Hai Shi883bc632020-07-06 17:12:49 +080043from test.support import import_helper
44from test.support import os_helper
Hai Shie80697d2020-05-28 06:10:27 +080045from test.support import threading_helper
Hai Shi883bc632020-07-06 17:12:49 +080046from test.support import warnings_helper
47from test.support.os_helper import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000048
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000049import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050import io # C implementation of io
51import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Martin Panter6bb91f32016-05-28 00:41:57 +000053try:
54 import ctypes
55except ImportError:
56 def byteslike(*pos, **kw):
57 return array.array("b", bytes(*pos, **kw))
58else:
59 def byteslike(*pos, **kw):
60 """Create a bytes-like object having no string or sequence methods"""
61 data = bytes(*pos, **kw)
62 obj = EmptyStruct()
63 ctypes.resize(obj, len(data))
64 memoryview(obj).cast("B")[:] = data
65 return obj
66 class EmptyStruct(ctypes.Structure):
67 pass
68
Gregory P. Smithe5796c42018-12-30 20:17:57 -080069_cflags = sysconfig.get_config_var('CFLAGS') or ''
70_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
71MEMORY_SANITIZER = (
72 '-fsanitize=memory' in _cflags or
73 '--with-memory-sanitizer' in _config_args
74)
75
Victor Stinnerbc2aa812019-05-23 03:45:09 +020076# Does io.IOBase finalizer log the exception if the close() method fails?
77# The exception is ignored silently by default in release build.
78IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020079
80
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081def _default_chunk_size():
82 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000083 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return f._CHUNK_SIZE
85
86
Antoine Pitrou328ec742010-09-14 18:37:24 +000087class MockRawIOWithoutRead:
88 """A RawIO implementation without read(), so as to exercise the default
89 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000090
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000091 def __init__(self, read_stack=()):
92 self._read_stack = list(read_stack)
93 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000096
Guido van Rossum01a27522007-03-07 01:00:12 +000097 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000099 return len(b)
100
101 def writable(self):
102 return True
103
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000104 def fileno(self):
105 return 42
106
107 def readable(self):
108 return True
109
Guido van Rossum01a27522007-03-07 01:00:12 +0000110 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111 return True
112
Guido van Rossum01a27522007-03-07 01:00:12 +0000113 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000115
116 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000117 return 0 # same comment as above
118
119 def readinto(self, buf):
120 self._reads += 1
121 max_len = len(buf)
122 try:
123 data = self._read_stack[0]
124 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000125 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126 return 0
127 if data is None:
128 del self._read_stack[0]
129 return None
130 n = len(data)
131 if len(data) <= max_len:
132 del self._read_stack[0]
133 buf[:n] = data
134 return n
135 else:
136 buf[:] = data[:max_len]
137 self._read_stack[0] = data[max_len:]
138 return max_len
139
140 def truncate(self, pos=None):
141 return pos
142
Antoine Pitrou328ec742010-09-14 18:37:24 +0000143class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
144 pass
145
146class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
147 pass
148
149
150class MockRawIO(MockRawIOWithoutRead):
151
152 def read(self, n=None):
153 self._reads += 1
154 try:
155 return self._read_stack.pop(0)
156 except:
157 self._extraneous_reads += 1
158 return b""
159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160class CMockRawIO(MockRawIO, io.RawIOBase):
161 pass
162
163class PyMockRawIO(MockRawIO, pyio.RawIOBase):
164 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000165
Guido van Rossuma9e20242007-03-08 00:43:48 +0000166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000167class MisbehavedRawIO(MockRawIO):
168 def write(self, b):
169 return super().write(b) * 2
170
171 def read(self, n=None):
172 return super().read(n) * 2
173
174 def seek(self, pos, whence):
175 return -123
176
177 def tell(self):
178 return -456
179
180 def readinto(self, buf):
181 super().readinto(buf)
182 return len(buf) * 5
183
184class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
185 pass
186
187class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
188 pass
189
190
benfogle9703f092017-11-10 16:03:40 -0500191class SlowFlushRawIO(MockRawIO):
192 def __init__(self):
193 super().__init__()
194 self.in_flush = threading.Event()
195
196 def flush(self):
197 self.in_flush.set()
198 time.sleep(0.25)
199
200class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
201 pass
202
203class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
204 pass
205
206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000207class CloseFailureIO(MockRawIO):
208 closed = 0
209
210 def close(self):
211 if not self.closed:
212 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200213 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214
215class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
216 pass
217
218class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
219 pass
220
221
222class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000223
224 def __init__(self, data):
225 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000226 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000227
228 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000229 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000230 self.read_history.append(None if res is None else len(res))
231 return res
232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000233 def readinto(self, b):
234 res = super().readinto(b)
235 self.read_history.append(res)
236 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238class CMockFileIO(MockFileIO, io.BytesIO):
239 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241class PyMockFileIO(MockFileIO, pyio.BytesIO):
242 pass
243
244
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000245class MockUnseekableIO:
246 def seekable(self):
247 return False
248
249 def seek(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
252 def tell(self, *args):
253 raise self.UnsupportedOperation("not seekable")
254
Martin Panter754aab22016-03-31 07:21:56 +0000255 def truncate(self, *args):
256 raise self.UnsupportedOperation("not seekable")
257
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000258class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
259 UnsupportedOperation = io.UnsupportedOperation
260
261class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
262 UnsupportedOperation = pyio.UnsupportedOperation
263
264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265class MockNonBlockWriterIO:
266
267 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000268 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000269 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000271 def pop_written(self):
272 s = b"".join(self._write_stack)
273 self._write_stack[:] = []
274 return s
275
276 def block_on(self, char):
277 """Block when a given char is encountered."""
278 self._blocker_char = char
279
280 def readable(self):
281 return True
282
283 def seekable(self):
284 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000285
Victor Stinnerb589cef2019-06-11 03:10:59 +0200286 def seek(self, pos, whence=0):
287 # naive implementation, enough for tests
288 return 0
289
Guido van Rossum01a27522007-03-07 01:00:12 +0000290 def writable(self):
291 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000293 def write(self, b):
294 b = bytes(b)
295 n = -1
296 if self._blocker_char:
297 try:
298 n = b.index(self._blocker_char)
299 except ValueError:
300 pass
301 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100302 if n > 0:
303 # write data up to the first blocker
304 self._write_stack.append(b[:n])
305 return n
306 else:
307 # cancel blocker and indicate would block
308 self._blocker_char = None
309 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000310 self._write_stack.append(b)
311 return len(b)
312
313class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
314 BlockingIOError = io.BlockingIOError
315
316class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
317 BlockingIOError = pyio.BlockingIOError
318
Guido van Rossuma9e20242007-03-08 00:43:48 +0000319
Guido van Rossum28524c72007-02-27 05:47:44 +0000320class IOTest(unittest.TestCase):
321
Neal Norwitze7789b12008-03-24 06:18:09 +0000322 def setUp(self):
Hai Shi883bc632020-07-06 17:12:49 +0800323 os_helper.unlink(os_helper.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000324
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000325 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +0800326 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 f.truncate(0)
331 self.assertEqual(f.tell(), 5)
332 f.seek(0)
333
334 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.seek(0), 0)
336 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000337 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000340 buffer = bytearray(b" world\n\n\n")
341 self.assertEqual(f.write(buffer), 9)
342 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000343 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000344 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000345 self.assertEqual(f.seek(-1, 2), 13)
346 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000347
Guido van Rossum87429772007-04-10 21:06:59 +0000348 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000349 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000350 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000351
Guido van Rossum9b76da62007-04-11 01:09:03 +0000352 def read_ops(self, f, buffered=False):
353 data = f.read(5)
354 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000355 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000357 self.assertEqual(bytes(data), b" worl")
358 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000359 self.assertEqual(f.readinto(data), 2)
360 self.assertEqual(len(data), 5)
361 self.assertEqual(data[:2], b"d\n")
362 self.assertEqual(f.seek(0), 0)
363 self.assertEqual(f.read(20), b"hello world\n")
364 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000365 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000366 self.assertEqual(f.seek(-6, 2), 6)
367 self.assertEqual(f.read(5), b"world")
368 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000369 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000370 self.assertEqual(f.seek(-6, 1), 5)
371 self.assertEqual(f.read(5), b" worl")
372 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000373 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000374 if buffered:
375 f.seek(0)
376 self.assertEqual(f.read(), b"hello world\n")
377 f.seek(6)
378 self.assertEqual(f.read(), b"world\n")
379 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000380 f.seek(0)
381 data = byteslike(5)
382 self.assertEqual(f.readinto1(data), 5)
383 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000384
Guido van Rossum34d69e52007-04-10 20:08:41 +0000385 LARGE = 2**31
386
Guido van Rossum53807da2007-04-10 19:01:47 +0000387 def large_file_ops(self, f):
388 assert f.readable()
389 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100390 try:
391 self.assertEqual(f.seek(self.LARGE), self.LARGE)
392 except (OverflowError, ValueError):
393 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000394 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000396 self.assertEqual(f.tell(), self.LARGE + 3)
397 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000398 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000399 self.assertEqual(f.tell(), self.LARGE + 2)
400 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000401 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000402 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000403 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
404 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000405 self.assertEqual(f.read(2), b"x")
406
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000407 def test_invalid_operations(self):
408 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000409 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000410 for mode in ("w", "wb"):
Hai Shi883bc632020-07-06 17:12:49 +0800411 with self.open(os_helper.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000412 self.assertRaises(exc, fp.read)
413 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800414 with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000415 self.assertRaises(exc, fp.read)
416 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800417 with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, b"blah")
419 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800420 with self.open(os_helper.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000421 self.assertRaises(exc, fp.write, b"blah")
422 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800423 with self.open(os_helper.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000424 self.assertRaises(exc, fp.write, "blah")
425 self.assertRaises(exc, fp.writelines, ["blah\n"])
426 # Non-zero seeking from current or end pos
427 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
428 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000429
Martin Panter754aab22016-03-31 07:21:56 +0000430 def test_optional_abilities(self):
431 # Test for OSError when optional APIs are not supported
432 # The purpose of this test is to try fileno(), reading, writing and
433 # seeking operations with various objects that indicate they do not
434 # support these operations.
435
436 def pipe_reader():
437 [r, w] = os.pipe()
438 os.close(w) # So that read() is harmless
439 return self.FileIO(r, "r")
440
441 def pipe_writer():
442 [r, w] = os.pipe()
443 self.addCleanup(os.close, r)
444 # Guarantee that we can write into the pipe without blocking
445 thread = threading.Thread(target=os.read, args=(r, 100))
446 thread.start()
447 self.addCleanup(thread.join)
448 return self.FileIO(w, "w")
449
450 def buffered_reader():
451 return self.BufferedReader(self.MockUnseekableIO())
452
453 def buffered_writer():
454 return self.BufferedWriter(self.MockUnseekableIO())
455
456 def buffered_random():
457 return self.BufferedRandom(self.BytesIO())
458
459 def buffered_rw_pair():
460 return self.BufferedRWPair(self.MockUnseekableIO(),
461 self.MockUnseekableIO())
462
463 def text_reader():
464 class UnseekableReader(self.MockUnseekableIO):
465 writable = self.BufferedIOBase.writable
466 write = self.BufferedIOBase.write
467 return self.TextIOWrapper(UnseekableReader(), "ascii")
468
469 def text_writer():
470 class UnseekableWriter(self.MockUnseekableIO):
471 readable = self.BufferedIOBase.readable
472 read = self.BufferedIOBase.read
473 return self.TextIOWrapper(UnseekableWriter(), "ascii")
474
475 tests = (
476 (pipe_reader, "fr"), (pipe_writer, "fw"),
477 (buffered_reader, "r"), (buffered_writer, "w"),
478 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
479 (text_reader, "r"), (text_writer, "w"),
480 (self.BytesIO, "rws"), (self.StringIO, "rws"),
481 )
482 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000483 with self.subTest(test), test() as obj:
484 readable = "r" in abilities
485 self.assertEqual(obj.readable(), readable)
486 writable = "w" in abilities
487 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000488
489 if isinstance(obj, self.TextIOBase):
490 data = "3"
491 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
492 data = b"3"
493 else:
494 self.fail("Unknown base class")
495
496 if "f" in abilities:
497 obj.fileno()
498 else:
499 self.assertRaises(OSError, obj.fileno)
500
501 if readable:
502 obj.read(1)
503 obj.read()
504 else:
505 self.assertRaises(OSError, obj.read, 1)
506 self.assertRaises(OSError, obj.read)
507
508 if writable:
509 obj.write(data)
510 else:
511 self.assertRaises(OSError, obj.write, data)
512
Martin Panter3ee147f2016-03-31 21:05:31 +0000513 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000514 pipe_reader, pipe_writer):
515 # Pipes seem to appear as seekable on Windows
516 continue
517 seekable = "s" in abilities
518 self.assertEqual(obj.seekable(), seekable)
519
Martin Panter754aab22016-03-31 07:21:56 +0000520 if seekable:
521 obj.tell()
522 obj.seek(0)
523 else:
524 self.assertRaises(OSError, obj.tell)
525 self.assertRaises(OSError, obj.seek, 0)
526
527 if writable and seekable:
528 obj.truncate()
529 obj.truncate(0)
530 else:
531 self.assertRaises(OSError, obj.truncate)
532 self.assertRaises(OSError, obj.truncate, 0)
533
Inada Naokiff3c9732021-03-31 14:26:08 +0900534 def test_open_binmode_encoding(self):
535 """open() raises ValueError when encoding is specified in bin mode"""
536 self.assertRaises(ValueError, self.open, os_helper.TESTFN,
537 "wb", encoding="utf-8")
538
539 # encoding=None and encoding="locale" is allowed.
540 with self.open(os_helper.TESTFN, "wb", encoding=None):
541 pass
542 with self.open(os_helper.TESTFN, "wb", encoding="locale"):
543 pass
544
Antoine Pitrou13348842012-01-29 18:36:34 +0100545 def test_open_handles_NUL_chars(self):
546 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300547 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100548
549 bytes_fn = bytes(fn_with_NUL, 'ascii')
550 with warnings.catch_warnings():
551 warnings.simplefilter("ignore", DeprecationWarning)
552 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100553
Guido van Rossum28524c72007-02-27 05:47:44 +0000554 def test_raw_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800555 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000556 self.assertEqual(f.readable(), False)
557 self.assertEqual(f.writable(), True)
558 self.assertEqual(f.seekable(), True)
559 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800560 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000561 self.assertEqual(f.readable(), True)
562 self.assertEqual(f.writable(), False)
563 self.assertEqual(f.seekable(), True)
564 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000565
Guido van Rossum87429772007-04-10 21:06:59 +0000566 def test_buffered_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800567 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000568 self.assertEqual(f.readable(), False)
569 self.assertEqual(f.writable(), True)
570 self.assertEqual(f.seekable(), True)
571 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800572 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000573 self.assertEqual(f.readable(), True)
574 self.assertEqual(f.writable(), False)
575 self.assertEqual(f.seekable(), True)
576 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000577
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000578 def test_readline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800579 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000580 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Hai Shi883bc632020-07-06 17:12:49 +0800581 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000582 self.assertEqual(f.readline(), b"abc\n")
583 self.assertEqual(f.readline(10), b"def\n")
584 self.assertEqual(f.readline(2), b"xy")
585 self.assertEqual(f.readline(4), b"zzy\n")
586 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000587 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000588 self.assertRaises(TypeError, f.readline, 5.3)
Hai Shi883bc632020-07-06 17:12:49 +0800589 with self.open(os_helper.TESTFN, "r") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000590 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000591
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300592 def test_readline_nonsizeable(self):
593 # Issue #30061
594 # Crash when readline() returns an object without __len__
595 class R(self.IOBase):
596 def readline(self):
597 return None
598 self.assertRaises((TypeError, StopIteration), next, R())
599
600 def test_next_nonsizeable(self):
601 # Issue #30061
602 # Crash when __next__() returns an object without __len__
603 class R(self.IOBase):
604 def __next__(self):
605 return None
606 self.assertRaises(TypeError, R().readlines, 1)
607
Guido van Rossum28524c72007-02-27 05:47:44 +0000608 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000610 self.write_ops(f)
611 data = f.getvalue()
612 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000613 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000614 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000615
Guido van Rossum53807da2007-04-10 19:01:47 +0000616 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300617 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800618 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000619 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200620 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600621 support.requires(
622 'largefile',
623 'test requires %s bytes and a long time to run' % self.LARGE)
Hai Shi883bc632020-07-06 17:12:49 +0800624 with self.open(os_helper.TESTFN, "w+b", 0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000625 self.large_file_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800626 with self.open(os_helper.TESTFN, "w+b") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000627 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000628
629 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300630 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000631 f = None
Hai Shi883bc632020-07-06 17:12:49 +0800632 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000633 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000634 self.assertEqual(f.closed, True)
635 f = None
636 try:
Hai Shi883bc632020-07-06 17:12:49 +0800637 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000638 1/0
639 except ZeroDivisionError:
640 self.assertEqual(f.closed, True)
641 else:
642 self.fail("1/0 didn't raise an exception")
643
Antoine Pitrou08838b62009-01-21 00:55:13 +0000644 # issue 5008
645 def test_append_mode_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +0800646 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000647 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800648 with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000649 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800650 with self.open(os_helper.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000651 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800652 with self.open(os_helper.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300653 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000654
Guido van Rossum87429772007-04-10 21:06:59 +0000655 def test_destructor(self):
656 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000658 def __del__(self):
659 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000660 try:
661 f = super().__del__
662 except AttributeError:
663 pass
664 else:
665 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000666 def close(self):
667 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000668 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000669 def flush(self):
670 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000671 super().flush()
Hai Shi883bc632020-07-06 17:12:49 +0800672 with warnings_helper.check_warnings(('', ResourceWarning)):
673 f = MyFileIO(os_helper.TESTFN, "wb")
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000674 f.write(b"xxx")
675 del f
676 support.gc_collect()
677 self.assertEqual(record, [1, 2, 3])
Hai Shi883bc632020-07-06 17:12:49 +0800678 with self.open(os_helper.TESTFN, "rb") as f:
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000679 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000680
681 def _check_base_destructor(self, base):
682 record = []
683 class MyIO(base):
684 def __init__(self):
685 # This exercises the availability of attributes on object
686 # destruction.
687 # (in the C version, close() is called by the tp_dealloc
688 # function, not by __del__)
689 self.on_del = 1
690 self.on_close = 2
691 self.on_flush = 3
692 def __del__(self):
693 record.append(self.on_del)
694 try:
695 f = super().__del__
696 except AttributeError:
697 pass
698 else:
699 f()
700 def close(self):
701 record.append(self.on_close)
702 super().close()
703 def flush(self):
704 record.append(self.on_flush)
705 super().flush()
706 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000707 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000708 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000709 self.assertEqual(record, [1, 2, 3])
710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000711 def test_IOBase_destructor(self):
712 self._check_base_destructor(self.IOBase)
713
714 def test_RawIOBase_destructor(self):
715 self._check_base_destructor(self.RawIOBase)
716
717 def test_BufferedIOBase_destructor(self):
718 self._check_base_destructor(self.BufferedIOBase)
719
720 def test_TextIOBase_destructor(self):
721 self._check_base_destructor(self.TextIOBase)
722
Guido van Rossum87429772007-04-10 21:06:59 +0000723 def test_close_flushes(self):
Hai Shi883bc632020-07-06 17:12:49 +0800724 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000725 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800726 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000727 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000728
Guido van Rossumd4103952007-04-12 05:44:49 +0000729 def test_array_writes(self):
730 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000731 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000732 def check(f):
733 with f:
734 self.assertEqual(f.write(a), n)
735 f.writelines((a,))
736 check(self.BytesIO())
Hai Shi883bc632020-07-06 17:12:49 +0800737 check(self.FileIO(os_helper.TESTFN, "w"))
Martin Panter6bb91f32016-05-28 00:41:57 +0000738 check(self.BufferedWriter(self.MockRawIO()))
739 check(self.BufferedRandom(self.MockRawIO()))
740 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000741
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000742 def test_closefd(self):
Hai Shi883bc632020-07-06 17:12:49 +0800743 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000744 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 def test_read_closed(self):
Hai Shi883bc632020-07-06 17:12:49 +0800747 with self.open(os_helper.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000748 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800749 with self.open(os_helper.TESTFN, "r") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000750 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000751 self.assertEqual(file.read(), "egg\n")
752 file.seek(0)
753 file.close()
754 self.assertRaises(ValueError, file.read)
Hai Shi883bc632020-07-06 17:12:49 +0800755 with self.open(os_helper.TESTFN, "rb") as f:
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100756 file = self.open(f.fileno(), "rb", closefd=False)
757 self.assertEqual(file.read()[:3], b"egg")
758 file.close()
759 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000760
761 def test_no_closefd_with_filename(self):
762 # can't use closefd in combination with a file name
Hai Shi883bc632020-07-06 17:12:49 +0800763 self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000764
765 def test_closefd_attr(self):
Hai Shi883bc632020-07-06 17:12:49 +0800766 with self.open(os_helper.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000767 f.write(b"egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800768 with self.open(os_helper.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000769 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000770 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000771 self.assertEqual(file.buffer.raw.closefd, False)
772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000773 def test_garbage_collection(self):
774 # FileIO objects are collected, and collecting them flushes
775 # all data to disk.
Hai Shi883bc632020-07-06 17:12:49 +0800776 with warnings_helper.check_warnings(('', ResourceWarning)):
777 f = self.FileIO(os_helper.TESTFN, "wb")
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000778 f.write(b"abcxxx")
779 f.f = f
780 wr = weakref.ref(f)
781 del f
782 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300783 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +0800784 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000785 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000786
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000787 def test_unbounded_file(self):
788 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
789 zero = "/dev/zero"
790 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000791 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000792 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000793 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000794 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800795 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000796 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000797 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000798 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000799 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000800 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000801 self.assertRaises(OverflowError, f.read)
802
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200803 def check_flush_error_on_close(self, *args, **kwargs):
804 # Test that the file is closed despite failed flush
805 # and that flush() is called before file closed.
806 f = self.open(*args, **kwargs)
807 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000808 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200809 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200810 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000811 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200812 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600813 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200814 self.assertTrue(closed) # flush() called
815 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200816 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200817
818 def test_flush_error_on_close(self):
819 # raw file
820 # Issue #5700: io.FileIO calls flush() after file closed
Hai Shi883bc632020-07-06 17:12:49 +0800821 self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
822 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200823 self.check_flush_error_on_close(fd, 'wb', buffering=0)
Hai Shi883bc632020-07-06 17:12:49 +0800824 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200825 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
826 os.close(fd)
827 # buffered io
Hai Shi883bc632020-07-06 17:12:49 +0800828 self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
829 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200830 self.check_flush_error_on_close(fd, 'wb')
Hai Shi883bc632020-07-06 17:12:49 +0800831 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200832 self.check_flush_error_on_close(fd, 'wb', closefd=False)
833 os.close(fd)
834 # text io
Hai Shi883bc632020-07-06 17:12:49 +0800835 self.check_flush_error_on_close(os_helper.TESTFN, 'w')
836 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200837 self.check_flush_error_on_close(fd, 'w')
Hai Shi883bc632020-07-06 17:12:49 +0800838 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200839 self.check_flush_error_on_close(fd, 'w', closefd=False)
840 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000841
842 def test_multi_close(self):
Hai Shi883bc632020-07-06 17:12:49 +0800843 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000844 f.close()
845 f.close()
846 f.close()
847 self.assertRaises(ValueError, f.flush)
848
Antoine Pitrou328ec742010-09-14 18:37:24 +0000849 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530850 # Exercise the default limited RawIOBase.read(n) implementation (which
851 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000852 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
853 self.assertEqual(rawio.read(2), b"ab")
854 self.assertEqual(rawio.read(2), b"c")
855 self.assertEqual(rawio.read(2), b"d")
856 self.assertEqual(rawio.read(2), None)
857 self.assertEqual(rawio.read(2), b"ef")
858 self.assertEqual(rawio.read(2), b"g")
859 self.assertEqual(rawio.read(2), None)
860 self.assertEqual(rawio.read(2), b"")
861
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400862 def test_types_have_dict(self):
863 test = (
864 self.IOBase(),
865 self.RawIOBase(),
866 self.TextIOBase(),
867 self.StringIO(),
868 self.BytesIO()
869 )
870 for obj in test:
871 self.assertTrue(hasattr(obj, "__dict__"))
872
Ross Lagerwall59142db2011-10-31 20:34:46 +0200873 def test_opener(self):
Hai Shi883bc632020-07-06 17:12:49 +0800874 with self.open(os_helper.TESTFN, "w") as f:
Ross Lagerwall59142db2011-10-31 20:34:46 +0200875 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800876 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Ross Lagerwall59142db2011-10-31 20:34:46 +0200877 def opener(path, flags):
878 return fd
879 with self.open("non-existent", "r", opener=opener) as f:
880 self.assertEqual(f.read(), "egg\n")
881
Barry Warsaw480e2852016-06-08 17:47:26 -0400882 def test_bad_opener_negative_1(self):
883 # Issue #27066.
884 def badopener(fname, flags):
885 return -1
886 with self.assertRaises(ValueError) as cm:
887 open('non-existent', 'r', opener=badopener)
888 self.assertEqual(str(cm.exception), 'opener returned -1')
889
890 def test_bad_opener_other_negative(self):
891 # Issue #27066.
892 def badopener(fname, flags):
893 return -2
894 with self.assertRaises(ValueError) as cm:
895 open('non-existent', 'r', opener=badopener)
896 self.assertEqual(str(cm.exception), 'opener returned -2')
897
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200898 def test_fileio_closefd(self):
899 # Issue #4841
900 with self.open(__file__, 'rb') as f1, \
901 self.open(__file__, 'rb') as f2:
902 fileio = self.FileIO(f1.fileno(), closefd=False)
903 # .__init__() must not close f1
904 fileio.__init__(f2.fileno(), closefd=False)
905 f1.readline()
906 # .close() must not close f2
907 fileio.close()
908 f2.readline()
909
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300910 def test_nonbuffered_textio(self):
Hai Shi883bc632020-07-06 17:12:49 +0800911 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300912 with self.assertRaises(ValueError):
Hai Shi883bc632020-07-06 17:12:49 +0800913 self.open(os_helper.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300914
915 def test_invalid_newline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800916 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300917 with self.assertRaises(ValueError):
Hai Shi883bc632020-07-06 17:12:49 +0800918 self.open(os_helper.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300919
Martin Panter6bb91f32016-05-28 00:41:57 +0000920 def test_buffered_readinto_mixin(self):
921 # Test the implementation provided by BufferedIOBase
922 class Stream(self.BufferedIOBase):
923 def read(self, size):
924 return b"12345"
925 read1 = read
926 stream = Stream()
927 for method in ("readinto", "readinto1"):
928 with self.subTest(method):
929 buffer = byteslike(5)
930 self.assertEqual(getattr(stream, method)(buffer), 5)
931 self.assertEqual(bytes(buffer), b"12345")
932
Ethan Furmand62548a2016-06-04 14:38:43 -0700933 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700934 def check_path_succeeds(path):
935 with self.open(path, "w") as f:
936 f.write("egg\n")
937
938 with self.open(path, "r") as f:
939 self.assertEqual(f.read(), "egg\n")
940
Hai Shi883bc632020-07-06 17:12:49 +0800941 check_path_succeeds(FakePath(os_helper.TESTFN))
Serhiy Storchaka67987ac2020-07-27 20:58:35 +0300942 check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
Ethan Furmand62548a2016-06-04 14:38:43 -0700943
Hai Shi883bc632020-07-06 17:12:49 +0800944 with self.open(os_helper.TESTFN, "w") as f:
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200945 bad_path = FakePath(f.fileno())
946 with self.assertRaises(TypeError):
947 self.open(bad_path, 'w')
948
949 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700950 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700951 self.open(bad_path, 'w')
952
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200953 bad_path = FakePath(FloatingPointError)
954 with self.assertRaises(FloatingPointError):
955 self.open(bad_path, 'w')
956
Ethan Furmand62548a2016-06-04 14:38:43 -0700957 # ensure that refcounting is correct with some error conditions
958 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Hai Shi883bc632020-07-06 17:12:49 +0800959 self.open(FakePath(os_helper.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700960
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530961 def test_RawIOBase_readall(self):
962 # Exercise the default unlimited RawIOBase.read() and readall()
963 # implementations.
964 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
965 self.assertEqual(rawio.read(), b"abcdefg")
966 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
967 self.assertEqual(rawio.readall(), b"abcdefg")
968
969 def test_BufferedIOBase_readinto(self):
970 # Exercise the default BufferedIOBase.readinto() and readinto1()
971 # implementations (which call read() or read1() internally).
972 class Reader(self.BufferedIOBase):
973 def __init__(self, avail):
974 self.avail = avail
975 def read(self, size):
976 result = self.avail[:size]
977 self.avail = self.avail[size:]
978 return result
979 def read1(self, size):
980 """Returns no more than 5 bytes at once"""
981 return self.read(min(size, 5))
982 tests = (
983 # (test method, total data available, read buffer size, expected
984 # read size)
985 ("readinto", 10, 5, 5),
986 ("readinto", 10, 6, 6), # More than read1() can return
987 ("readinto", 5, 6, 5), # Buffer larger than total available
988 ("readinto", 6, 7, 6),
989 ("readinto", 10, 0, 0), # Empty buffer
990 ("readinto1", 10, 5, 5), # Result limited to single read1() call
991 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
992 ("readinto1", 5, 6, 5), # Buffer larger than total available
993 ("readinto1", 6, 7, 5),
994 ("readinto1", 10, 0, 0), # Empty buffer
995 )
996 UNUSED_BYTE = 0x81
997 for test in tests:
998 with self.subTest(test):
999 method, avail, request, result = test
1000 reader = Reader(bytes(range(avail)))
1001 buffer = bytearray((UNUSED_BYTE,) * request)
1002 method = getattr(reader, method)
1003 self.assertEqual(method(buffer), result)
1004 self.assertEqual(len(buffer), request)
1005 self.assertSequenceEqual(buffer[:result], range(result))
1006 unused = (UNUSED_BYTE,) * (request - result)
1007 self.assertSequenceEqual(buffer[result:], unused)
1008 self.assertEqual(len(reader.avail), avail - result)
1009
Zackery Spytz28f07362018-07-17 00:31:44 -06001010 def test_close_assert(self):
1011 class R(self.IOBase):
1012 def __setattr__(self, name, value):
1013 pass
1014 def flush(self):
1015 raise OSError()
1016 f = R()
1017 # This would cause an assertion failure.
1018 self.assertRaises(OSError, f.close)
1019
Victor Stinner472f7942019-04-12 21:58:24 +02001020 # Silence destructor error
1021 R.flush = lambda self: None
1022
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001025
1026 def test_IOBase_finalize(self):
1027 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1028 # class which inherits IOBase and an object of this class are caught
1029 # in a reference cycle and close() is already in the method cache.
1030 class MyIO(self.IOBase):
1031 def close(self):
1032 pass
1033
1034 # create an instance to populate the method cache
1035 MyIO()
1036 obj = MyIO()
1037 obj.obj = obj
1038 wr = weakref.ref(obj)
1039 del MyIO
1040 del obj
1041 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001042 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001044class PyIOTest(IOTest):
1045 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001046
Guido van Rossuma9e20242007-03-08 00:43:48 +00001047
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001048@support.cpython_only
1049class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001050
Gregory P. Smith054b0652015-04-14 12:58:05 -07001051 def test_RawIOBase_io_in_pyio_match(self):
1052 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001053 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1054 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001055 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1056
1057 def test_RawIOBase_pyio_in_io_match(self):
1058 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1059 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1060 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1061
1062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063class CommonBufferedTests:
1064 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1065
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001066 def test_detach(self):
1067 raw = self.MockRawIO()
1068 buf = self.tp(raw)
1069 self.assertIs(buf.detach(), raw)
1070 self.assertRaises(ValueError, buf.detach)
1071
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001072 repr(buf) # Should still work
1073
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001074 def test_fileno(self):
1075 rawio = self.MockRawIO()
1076 bufio = self.tp(rawio)
1077
Ezio Melottib3aedd42010-11-20 19:04:17 +00001078 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001080 def test_invalid_args(self):
1081 rawio = self.MockRawIO()
1082 bufio = self.tp(rawio)
1083 # Invalid whence
1084 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001085 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001086
1087 def test_override_destructor(self):
1088 tp = self.tp
1089 record = []
1090 class MyBufferedIO(tp):
1091 def __del__(self):
1092 record.append(1)
1093 try:
1094 f = super().__del__
1095 except AttributeError:
1096 pass
1097 else:
1098 f()
1099 def close(self):
1100 record.append(2)
1101 super().close()
1102 def flush(self):
1103 record.append(3)
1104 super().flush()
1105 rawio = self.MockRawIO()
1106 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001107 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001108 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001109 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001110
1111 def test_context_manager(self):
1112 # Test usability as a context manager
1113 rawio = self.MockRawIO()
1114 bufio = self.tp(rawio)
1115 def _with():
1116 with bufio:
1117 pass
1118 _with()
1119 # bufio should now be closed, and using it a second time should raise
1120 # a ValueError.
1121 self.assertRaises(ValueError, _with)
1122
1123 def test_error_through_destructor(self):
1124 # Test that the exception state is not modified by a destructor,
1125 # even if close() fails.
1126 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001127 with support.catch_unraisable_exception() as cm:
1128 with self.assertRaises(AttributeError):
1129 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001130
1131 if not IOBASE_EMITS_UNRAISABLE:
1132 self.assertIsNone(cm.unraisable)
1133 elif cm.unraisable is not None:
1134 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001135
Antoine Pitrou716c4442009-05-23 19:04:03 +00001136 def test_repr(self):
1137 raw = self.MockRawIO()
1138 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001139 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1140 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001141 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001142 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001143 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001144 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001145
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001146 def test_recursive_repr(self):
1147 # Issue #25455
1148 raw = self.MockRawIO()
1149 b = self.tp(raw)
1150 with support.swap_attr(raw, 'name', b):
1151 try:
1152 repr(b) # Should not crash
1153 except RuntimeError:
1154 pass
1155
Antoine Pitrou6be88762010-05-03 16:48:20 +00001156 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001157 # Test that buffered file is closed despite failed flush
1158 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001159 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001160 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001161 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001162 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001163 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001164 raw.flush = bad_flush
1165 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001166 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001167 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001168 self.assertTrue(raw.closed)
1169 self.assertTrue(closed) # flush() called
1170 self.assertFalse(closed[0]) # flush() called before file closed
1171 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001172 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001173
1174 def test_close_error_on_close(self):
1175 raw = self.MockRawIO()
1176 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001177 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001178 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001179 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001180 raw.close = bad_close
1181 b = self.tp(raw)
1182 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001183 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001184 b.close()
1185 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001186 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001187 self.assertEqual(err.exception.__context__.args, ('flush',))
1188 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001189
Victor Stinner472f7942019-04-12 21:58:24 +02001190 # Silence destructor error
1191 raw.close = lambda: None
1192 b.flush = lambda: None
1193
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001194 def test_nonnormalized_close_error_on_close(self):
1195 # Issue #21677
1196 raw = self.MockRawIO()
1197 def bad_flush():
1198 raise non_existing_flush
1199 def bad_close():
1200 raise non_existing_close
1201 raw.close = bad_close
1202 b = self.tp(raw)
1203 b.flush = bad_flush
1204 with self.assertRaises(NameError) as err: # exception not swallowed
1205 b.close()
1206 self.assertIn('non_existing_close', str(err.exception))
1207 self.assertIsInstance(err.exception.__context__, NameError)
1208 self.assertIn('non_existing_flush', str(err.exception.__context__))
1209 self.assertFalse(b.closed)
1210
Victor Stinner472f7942019-04-12 21:58:24 +02001211 # Silence destructor error
1212 b.flush = lambda: None
1213 raw.close = lambda: None
1214
Antoine Pitrou6be88762010-05-03 16:48:20 +00001215 def test_multi_close(self):
1216 raw = self.MockRawIO()
1217 b = self.tp(raw)
1218 b.close()
1219 b.close()
1220 b.close()
1221 self.assertRaises(ValueError, b.flush)
1222
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001223 def test_unseekable(self):
1224 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1225 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1226 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1227
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001228 def test_readonly_attributes(self):
1229 raw = self.MockRawIO()
1230 buf = self.tp(raw)
1231 x = self.MockRawIO()
1232 with self.assertRaises(AttributeError):
1233 buf.raw = x
1234
Guido van Rossum78892e42007-04-06 17:31:18 +00001235
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001236class SizeofTest:
1237
1238 @support.cpython_only
1239 def test_sizeof(self):
1240 bufsize1 = 4096
1241 bufsize2 = 8192
1242 rawio = self.MockRawIO()
1243 bufio = self.tp(rawio, buffer_size=bufsize1)
1244 size = sys.getsizeof(bufio) - bufsize1
1245 rawio = self.MockRawIO()
1246 bufio = self.tp(rawio, buffer_size=bufsize2)
1247 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1248
Jesus Ceadc469452012-10-04 12:37:56 +02001249 @support.cpython_only
1250 def test_buffer_freeing(self) :
1251 bufsize = 4096
1252 rawio = self.MockRawIO()
1253 bufio = self.tp(rawio, buffer_size=bufsize)
1254 size = sys.getsizeof(bufio) - bufsize
1255 bufio.close()
1256 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001257
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001258class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1259 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001260
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001261 def test_constructor(self):
1262 rawio = self.MockRawIO([b"abc"])
1263 bufio = self.tp(rawio)
1264 bufio.__init__(rawio)
1265 bufio.__init__(rawio, buffer_size=1024)
1266 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001267 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1269 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1270 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1271 rawio = self.MockRawIO([b"abc"])
1272 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001273 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001274
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001275 def test_uninitialized(self):
1276 bufio = self.tp.__new__(self.tp)
1277 del bufio
1278 bufio = self.tp.__new__(self.tp)
1279 self.assertRaisesRegex((ValueError, AttributeError),
1280 'uninitialized|has no attribute',
1281 bufio.read, 0)
1282 bufio.__init__(self.MockRawIO())
1283 self.assertEqual(bufio.read(0), b'')
1284
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001285 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001286 for arg in (None, 7):
1287 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1288 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001289 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001290 # Invalid args
1291 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001293 def test_read1(self):
1294 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1295 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001296 self.assertEqual(b"a", bufio.read(1))
1297 self.assertEqual(b"b", bufio.read1(1))
1298 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001299 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001300 self.assertEqual(b"c", bufio.read1(100))
1301 self.assertEqual(rawio._reads, 1)
1302 self.assertEqual(b"d", bufio.read1(100))
1303 self.assertEqual(rawio._reads, 2)
1304 self.assertEqual(b"efg", bufio.read1(100))
1305 self.assertEqual(rawio._reads, 3)
1306 self.assertEqual(b"", bufio.read1(100))
1307 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001308
1309 def test_read1_arbitrary(self):
1310 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1311 bufio = self.tp(rawio)
1312 self.assertEqual(b"a", bufio.read(1))
1313 self.assertEqual(b"bc", bufio.read1())
1314 self.assertEqual(b"d", bufio.read1())
1315 self.assertEqual(b"efg", bufio.read1(-1))
1316 self.assertEqual(rawio._reads, 3)
1317 self.assertEqual(b"", bufio.read1())
1318 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001319
1320 def test_readinto(self):
1321 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1322 bufio = self.tp(rawio)
1323 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001324 self.assertEqual(bufio.readinto(b), 2)
1325 self.assertEqual(b, b"ab")
1326 self.assertEqual(bufio.readinto(b), 2)
1327 self.assertEqual(b, b"cd")
1328 self.assertEqual(bufio.readinto(b), 2)
1329 self.assertEqual(b, b"ef")
1330 self.assertEqual(bufio.readinto(b), 1)
1331 self.assertEqual(b, b"gf")
1332 self.assertEqual(bufio.readinto(b), 0)
1333 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001334 rawio = self.MockRawIO((b"abc", None))
1335 bufio = self.tp(rawio)
1336 self.assertEqual(bufio.readinto(b), 2)
1337 self.assertEqual(b, b"ab")
1338 self.assertEqual(bufio.readinto(b), 1)
1339 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001340
Benjamin Petersona96fea02014-06-22 14:17:44 -07001341 def test_readinto1(self):
1342 buffer_size = 10
1343 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1344 bufio = self.tp(rawio, buffer_size=buffer_size)
1345 b = bytearray(2)
1346 self.assertEqual(bufio.peek(3), b'abc')
1347 self.assertEqual(rawio._reads, 1)
1348 self.assertEqual(bufio.readinto1(b), 2)
1349 self.assertEqual(b, b"ab")
1350 self.assertEqual(rawio._reads, 1)
1351 self.assertEqual(bufio.readinto1(b), 1)
1352 self.assertEqual(b[:1], b"c")
1353 self.assertEqual(rawio._reads, 1)
1354 self.assertEqual(bufio.readinto1(b), 2)
1355 self.assertEqual(b, b"de")
1356 self.assertEqual(rawio._reads, 2)
1357 b = bytearray(2*buffer_size)
1358 self.assertEqual(bufio.peek(3), b'fgh')
1359 self.assertEqual(rawio._reads, 3)
1360 self.assertEqual(bufio.readinto1(b), 6)
1361 self.assertEqual(b[:6], b"fghjkl")
1362 self.assertEqual(rawio._reads, 4)
1363
1364 def test_readinto_array(self):
1365 buffer_size = 60
1366 data = b"a" * 26
1367 rawio = self.MockRawIO((data,))
1368 bufio = self.tp(rawio, buffer_size=buffer_size)
1369
1370 # Create an array with element size > 1 byte
1371 b = array.array('i', b'x' * 32)
1372 assert len(b) != 16
1373
1374 # Read into it. We should get as many *bytes* as we can fit into b
1375 # (which is more than the number of elements)
1376 n = bufio.readinto(b)
1377 self.assertGreater(n, len(b))
1378
1379 # Check that old contents of b are preserved
1380 bm = memoryview(b).cast('B')
1381 self.assertLess(n, len(bm))
1382 self.assertEqual(bm[:n], data[:n])
1383 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1384
1385 def test_readinto1_array(self):
1386 buffer_size = 60
1387 data = b"a" * 26
1388 rawio = self.MockRawIO((data,))
1389 bufio = self.tp(rawio, buffer_size=buffer_size)
1390
1391 # Create an array with element size > 1 byte
1392 b = array.array('i', b'x' * 32)
1393 assert len(b) != 16
1394
1395 # Read into it. We should get as many *bytes* as we can fit into b
1396 # (which is more than the number of elements)
1397 n = bufio.readinto1(b)
1398 self.assertGreater(n, len(b))
1399
1400 # Check that old contents of b are preserved
1401 bm = memoryview(b).cast('B')
1402 self.assertLess(n, len(bm))
1403 self.assertEqual(bm[:n], data[:n])
1404 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1405
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001406 def test_readlines(self):
1407 def bufio():
1408 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1409 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001410 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1411 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1412 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001413
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001415 data = b"abcdefghi"
1416 dlen = len(data)
1417
1418 tests = [
1419 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1420 [ 100, [ 3, 3, 3], [ dlen ] ],
1421 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1422 ]
1423
1424 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 rawio = self.MockFileIO(data)
1426 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001427 pos = 0
1428 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001429 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001430 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001431 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001432 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001433
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001434 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001435 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001436 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1437 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001438 self.assertEqual(b"abcd", bufio.read(6))
1439 self.assertEqual(b"e", bufio.read(1))
1440 self.assertEqual(b"fg", bufio.read())
1441 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001442 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001443 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001444
Victor Stinnera80987f2011-05-25 22:47:16 +02001445 rawio = self.MockRawIO((b"a", None, None))
1446 self.assertEqual(b"a", rawio.readall())
1447 self.assertIsNone(rawio.readall())
1448
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001449 def test_read_past_eof(self):
1450 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1451 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001452
Ezio Melottib3aedd42010-11-20 19:04:17 +00001453 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001454
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001455 def test_read_all(self):
1456 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1457 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001458
Ezio Melottib3aedd42010-11-20 19:04:17 +00001459 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001460
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001461 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001462 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001463 try:
1464 # Write out many bytes with exactly the same number of 0's,
1465 # 1's... 255's. This will help us check that concurrent reading
1466 # doesn't duplicate or forget contents.
1467 N = 1000
1468 l = list(range(256)) * N
1469 random.shuffle(l)
1470 s = bytes(bytearray(l))
Hai Shi883bc632020-07-06 17:12:49 +08001471 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001472 f.write(s)
Hai Shi883bc632020-07-06 17:12:49 +08001473 with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001474 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001475 errors = []
1476 results = []
1477 def f():
1478 try:
1479 # Intra-buffer read then buffer-flushing read
1480 for n in cycle([1, 19]):
1481 s = bufio.read(n)
1482 if not s:
1483 break
1484 # list.append() is atomic
1485 results.append(s)
1486 except Exception as e:
1487 errors.append(e)
1488 raise
1489 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001490 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001491 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001492 self.assertFalse(errors,
1493 "the following exceptions were caught: %r" % errors)
1494 s = b''.join(results)
1495 for i in range(256):
1496 c = bytes(bytearray([i]))
1497 self.assertEqual(s.count(c), N)
1498 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001499 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001500
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001501 def test_unseekable(self):
1502 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1503 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1504 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1505 bufio.read(1)
1506 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1507 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1508
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001509 def test_misbehaved_io(self):
1510 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1511 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001512 self.assertRaises(OSError, bufio.seek, 0)
1513 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001514
Victor Stinnerb589cef2019-06-11 03:10:59 +02001515 # Silence destructor error
1516 bufio.close = lambda: None
1517
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001518 def test_no_extraneous_read(self):
1519 # Issue #9550; when the raw IO object has satisfied the read request,
1520 # we should not issue any additional reads, otherwise it may block
1521 # (e.g. socket).
1522 bufsize = 16
1523 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1524 rawio = self.MockRawIO([b"x" * n])
1525 bufio = self.tp(rawio, bufsize)
1526 self.assertEqual(bufio.read(n), b"x" * n)
1527 # Simple case: one raw read is enough to satisfy the request.
1528 self.assertEqual(rawio._extraneous_reads, 0,
1529 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1530 # A more complex case where two raw reads are needed to satisfy
1531 # the request.
1532 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1533 bufio = self.tp(rawio, bufsize)
1534 self.assertEqual(bufio.read(n), b"x" * n)
1535 self.assertEqual(rawio._extraneous_reads, 0,
1536 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1537
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001538 def test_read_on_closed(self):
1539 # Issue #23796
1540 b = io.BufferedReader(io.BytesIO(b"12"))
1541 b.read(1)
1542 b.close()
1543 self.assertRaises(ValueError, b.peek)
1544 self.assertRaises(ValueError, b.read1, 1)
1545
Berker Peksagfd5116c2020-02-21 20:57:26 +03001546 def test_truncate_on_read_only(self):
1547 rawio = self.MockFileIO(b"abc")
1548 bufio = self.tp(rawio)
1549 self.assertFalse(bufio.writable())
1550 self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1551 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1552
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001553
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001554class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555 tp = io.BufferedReader
1556
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001557 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1558 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 def test_constructor(self):
1560 BufferedReaderTest.test_constructor(self)
1561 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001562 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001563 if sys.maxsize > 0x7FFFFFFF:
1564 rawio = self.MockRawIO()
1565 bufio = self.tp(rawio)
1566 self.assertRaises((OverflowError, MemoryError, ValueError),
1567 bufio.__init__, rawio, sys.maxsize)
1568
1569 def test_initialization(self):
1570 rawio = self.MockRawIO([b"abc"])
1571 bufio = self.tp(rawio)
1572 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1573 self.assertRaises(ValueError, bufio.read)
1574 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1575 self.assertRaises(ValueError, bufio.read)
1576 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1577 self.assertRaises(ValueError, bufio.read)
1578
1579 def test_misbehaved_io_read(self):
1580 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1581 bufio = self.tp(rawio)
1582 # _pyio.BufferedReader seems to implement reading different, so that
1583 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001584 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585
1586 def test_garbage_collection(self):
1587 # C BufferedReader objects are collected.
1588 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001589 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1590 with warnings_helper.check_warnings(('', ResourceWarning)):
1591 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001592 f = self.tp(rawio)
1593 f.f = f
1594 wr = weakref.ref(f)
1595 del f
1596 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001597 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598
R David Murray67bfe802013-02-23 21:51:05 -05001599 def test_args_error(self):
1600 # Issue #17275
1601 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1602 self.tp(io.BytesIO(), 1024, 1024, 1024)
1603
David Szotten86663562020-06-16 00:53:57 +01001604 def test_bad_readinto_value(self):
1605 rawio = io.BufferedReader(io.BytesIO(b"12"))
1606 rawio.readinto = lambda buf: -1
1607 bufio = self.tp(rawio)
1608 with self.assertRaises(OSError) as cm:
1609 bufio.readline()
1610 self.assertIsNone(cm.exception.__cause__)
1611
1612 def test_bad_readinto_type(self):
1613 rawio = io.BufferedReader(io.BytesIO(b"12"))
1614 rawio.readinto = lambda buf: b''
1615 bufio = self.tp(rawio)
1616 with self.assertRaises(OSError) as cm:
1617 bufio.readline()
1618 self.assertIsInstance(cm.exception.__cause__, TypeError)
1619
R David Murray67bfe802013-02-23 21:51:05 -05001620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621class PyBufferedReaderTest(BufferedReaderTest):
1622 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001623
Guido van Rossuma9e20242007-03-08 00:43:48 +00001624
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001625class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1626 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001627
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001628 def test_constructor(self):
1629 rawio = self.MockRawIO()
1630 bufio = self.tp(rawio)
1631 bufio.__init__(rawio)
1632 bufio.__init__(rawio, buffer_size=1024)
1633 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001634 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001635 bufio.flush()
1636 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1637 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1638 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1639 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001640 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001641 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001642 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001644 def test_uninitialized(self):
1645 bufio = self.tp.__new__(self.tp)
1646 del bufio
1647 bufio = self.tp.__new__(self.tp)
1648 self.assertRaisesRegex((ValueError, AttributeError),
1649 'uninitialized|has no attribute',
1650 bufio.write, b'')
1651 bufio.__init__(self.MockRawIO())
1652 self.assertEqual(bufio.write(b''), 0)
1653
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001654 def test_detach_flush(self):
1655 raw = self.MockRawIO()
1656 buf = self.tp(raw)
1657 buf.write(b"howdy!")
1658 self.assertFalse(raw._write_stack)
1659 buf.detach()
1660 self.assertEqual(raw._write_stack, [b"howdy!"])
1661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001663 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001664 writer = self.MockRawIO()
1665 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001666 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001667 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001668 buffer = bytearray(b"def")
1669 bufio.write(buffer)
1670 buffer[:] = b"***" # Overwrite our copy of the data
1671 bufio.flush()
1672 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 def test_write_overflow(self):
1675 writer = self.MockRawIO()
1676 bufio = self.tp(writer, 8)
1677 contents = b"abcdefghijklmnop"
1678 for n in range(0, len(contents), 3):
1679 bufio.write(contents[n:n+3])
1680 flushed = b"".join(writer._write_stack)
1681 # At least (total - 8) bytes were implicitly flushed, perhaps more
1682 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001683 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 def check_writes(self, intermediate_func):
1686 # Lots of writes, test the flushed output is as expected.
1687 contents = bytes(range(256)) * 1000
1688 n = 0
1689 writer = self.MockRawIO()
1690 bufio = self.tp(writer, 13)
1691 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1692 def gen_sizes():
1693 for size in count(1):
1694 for i in range(15):
1695 yield size
1696 sizes = gen_sizes()
1697 while n < len(contents):
1698 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001699 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 intermediate_func(bufio)
1701 n += size
1702 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001703 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001704
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 def test_writes(self):
1706 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001707
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 def test_writes_and_flushes(self):
1709 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001711 def test_writes_and_seeks(self):
1712 def _seekabs(bufio):
1713 pos = bufio.tell()
1714 bufio.seek(pos + 1, 0)
1715 bufio.seek(pos - 1, 0)
1716 bufio.seek(pos, 0)
1717 self.check_writes(_seekabs)
1718 def _seekrel(bufio):
1719 pos = bufio.seek(0, 1)
1720 bufio.seek(+1, 1)
1721 bufio.seek(-1, 1)
1722 bufio.seek(pos, 0)
1723 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001725 def test_writes_and_truncates(self):
1726 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001727
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001728 def test_write_non_blocking(self):
1729 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001730 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001731
Ezio Melottib3aedd42010-11-20 19:04:17 +00001732 self.assertEqual(bufio.write(b"abcd"), 4)
1733 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001734 # 1 byte will be written, the rest will be buffered
1735 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001736 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001737
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1739 raw.block_on(b"0")
1740 try:
1741 bufio.write(b"opqrwxyz0123456789")
1742 except self.BlockingIOError as e:
1743 written = e.characters_written
1744 else:
1745 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001746 self.assertEqual(written, 16)
1747 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001748 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001749
Ezio Melottib3aedd42010-11-20 19:04:17 +00001750 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001751 s = raw.pop_written()
1752 # Previously buffered bytes were flushed
1753 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001755 def test_write_and_rewind(self):
1756 raw = io.BytesIO()
1757 bufio = self.tp(raw, 4)
1758 self.assertEqual(bufio.write(b"abcdef"), 6)
1759 self.assertEqual(bufio.tell(), 6)
1760 bufio.seek(0, 0)
1761 self.assertEqual(bufio.write(b"XY"), 2)
1762 bufio.seek(6, 0)
1763 self.assertEqual(raw.getvalue(), b"XYcdef")
1764 self.assertEqual(bufio.write(b"123456"), 6)
1765 bufio.flush()
1766 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001767
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001768 def test_flush(self):
1769 writer = self.MockRawIO()
1770 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001771 bufio.write(b"abc")
1772 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001773 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001774
Antoine Pitrou131a4892012-10-16 22:57:11 +02001775 def test_writelines(self):
1776 l = [b'ab', b'cd', b'ef']
1777 writer = self.MockRawIO()
1778 bufio = self.tp(writer, 8)
1779 bufio.writelines(l)
1780 bufio.flush()
1781 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1782
1783 def test_writelines_userlist(self):
1784 l = UserList([b'ab', b'cd', b'ef'])
1785 writer = self.MockRawIO()
1786 bufio = self.tp(writer, 8)
1787 bufio.writelines(l)
1788 bufio.flush()
1789 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1790
1791 def test_writelines_error(self):
1792 writer = self.MockRawIO()
1793 bufio = self.tp(writer, 8)
1794 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1795 self.assertRaises(TypeError, bufio.writelines, None)
1796 self.assertRaises(TypeError, bufio.writelines, 'abc')
1797
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001798 def test_destructor(self):
1799 writer = self.MockRawIO()
1800 bufio = self.tp(writer, 8)
1801 bufio.write(b"abc")
1802 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001803 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001804 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001805
1806 def test_truncate(self):
1807 # Truncate implicitly flushes the buffer.
Hai Shi883bc632020-07-06 17:12:49 +08001808 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1809 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001810 bufio = self.tp(raw, 8)
1811 bufio.write(b"abcdef")
1812 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001813 self.assertEqual(bufio.tell(), 6)
Hai Shi883bc632020-07-06 17:12:49 +08001814 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001815 self.assertEqual(f.read(), b"abc")
1816
Nitish Chandra059f58c2018-01-28 21:30:09 +05301817 def test_truncate_after_write(self):
1818 # Ensure that truncate preserves the file position after
1819 # writes longer than the buffer size.
1820 # Issue: https://bugs.python.org/issue32228
Hai Shi883bc632020-07-06 17:12:49 +08001821 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1822 with self.open(os_helper.TESTFN, "wb") as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301823 # Fill with some buffer
1824 f.write(b'\x00' * 10000)
1825 buffer_sizes = [8192, 4096, 200]
1826 for buffer_size in buffer_sizes:
Hai Shi883bc632020-07-06 17:12:49 +08001827 with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301828 f.write(b'\x00' * (buffer_size + 1))
1829 # After write write_pos and write_end are set to 0
1830 f.read(1)
1831 # read operation makes sure that pos != raw_pos
1832 f.truncate()
1833 self.assertEqual(f.tell(), buffer_size + 2)
1834
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001835 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001836 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001837 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001838 # Write out many bytes from many threads and test they were
1839 # all flushed.
1840 N = 1000
1841 contents = bytes(range(256)) * N
1842 sizes = cycle([1, 19])
1843 n = 0
1844 queue = deque()
1845 while n < len(contents):
1846 size = next(sizes)
1847 queue.append(contents[n:n+size])
1848 n += size
1849 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001850 # We use a real file object because it allows us to
1851 # exercise situations where the GIL is released before
1852 # writing the buffer to the raw streams. This is in addition
1853 # to concurrency issues due to switching threads in the middle
1854 # of Python code.
Hai Shi883bc632020-07-06 17:12:49 +08001855 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001856 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001857 errors = []
1858 def f():
1859 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001860 while True:
1861 try:
1862 s = queue.popleft()
1863 except IndexError:
1864 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001865 bufio.write(s)
1866 except Exception as e:
1867 errors.append(e)
1868 raise
1869 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001870 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001871 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001872 self.assertFalse(errors,
1873 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001874 bufio.close()
Hai Shi883bc632020-07-06 17:12:49 +08001875 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001876 s = f.read()
1877 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001878 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001879 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001880 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001881
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001882 def test_misbehaved_io(self):
1883 rawio = self.MisbehavedRawIO()
1884 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001885 self.assertRaises(OSError, bufio.seek, 0)
1886 self.assertRaises(OSError, bufio.tell)
1887 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001888
Victor Stinnerb589cef2019-06-11 03:10:59 +02001889 # Silence destructor error
1890 bufio.close = lambda: None
1891
Florent Xicluna109d5732012-07-07 17:03:22 +02001892 def test_max_buffer_size_removal(self):
1893 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001894 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001895
Benjamin Peterson68623612012-12-20 11:53:11 -06001896 def test_write_error_on_close(self):
1897 raw = self.MockRawIO()
1898 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001899 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001900 raw.write = bad_write
1901 b = self.tp(raw)
1902 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001903 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001904 self.assertTrue(b.closed)
1905
benfogle9703f092017-11-10 16:03:40 -05001906 def test_slow_close_from_thread(self):
1907 # Issue #31976
1908 rawio = self.SlowFlushRawIO()
1909 bufio = self.tp(rawio, 8)
1910 t = threading.Thread(target=bufio.close)
1911 t.start()
1912 rawio.in_flush.wait()
1913 self.assertRaises(ValueError, bufio.write, b'spam')
1914 self.assertTrue(bufio.closed)
1915 t.join()
1916
1917
Benjamin Peterson59406a92009-03-26 17:10:29 +00001918
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001919class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 tp = io.BufferedWriter
1921
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001922 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1923 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001924 def test_constructor(self):
1925 BufferedWriterTest.test_constructor(self)
1926 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001927 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001928 if sys.maxsize > 0x7FFFFFFF:
1929 rawio = self.MockRawIO()
1930 bufio = self.tp(rawio)
1931 self.assertRaises((OverflowError, MemoryError, ValueError),
1932 bufio.__init__, rawio, sys.maxsize)
1933
1934 def test_initialization(self):
1935 rawio = self.MockRawIO()
1936 bufio = self.tp(rawio)
1937 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1938 self.assertRaises(ValueError, bufio.write, b"def")
1939 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1940 self.assertRaises(ValueError, bufio.write, b"def")
1941 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1942 self.assertRaises(ValueError, bufio.write, b"def")
1943
1944 def test_garbage_collection(self):
1945 # C BufferedWriter objects are collected, and collecting them flushes
1946 # all data to disk.
1947 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001948 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1949 with warnings_helper.check_warnings(('', ResourceWarning)):
1950 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001951 f = self.tp(rawio)
1952 f.write(b"123xxx")
1953 f.x = f
1954 wr = weakref.ref(f)
1955 del f
1956 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001957 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08001958 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001959 self.assertEqual(f.read(), b"123xxx")
1960
R David Murray67bfe802013-02-23 21:51:05 -05001961 def test_args_error(self):
1962 # Issue #17275
1963 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1964 self.tp(io.BytesIO(), 1024, 1024, 1024)
1965
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001966
1967class PyBufferedWriterTest(BufferedWriterTest):
1968 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001969
Guido van Rossum01a27522007-03-07 01:00:12 +00001970class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001971
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001972 def test_constructor(self):
1973 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001974 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001975
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001976 def test_uninitialized(self):
1977 pair = self.tp.__new__(self.tp)
1978 del pair
1979 pair = self.tp.__new__(self.tp)
1980 self.assertRaisesRegex((ValueError, AttributeError),
1981 'uninitialized|has no attribute',
1982 pair.read, 0)
1983 self.assertRaisesRegex((ValueError, AttributeError),
1984 'uninitialized|has no attribute',
1985 pair.write, b'')
1986 pair.__init__(self.MockRawIO(), self.MockRawIO())
1987 self.assertEqual(pair.read(0), b'')
1988 self.assertEqual(pair.write(b''), 0)
1989
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001990 def test_detach(self):
1991 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1992 self.assertRaises(self.UnsupportedOperation, pair.detach)
1993
Florent Xicluna109d5732012-07-07 17:03:22 +02001994 def test_constructor_max_buffer_size_removal(self):
1995 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001996 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001997
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001998 def test_constructor_with_not_readable(self):
1999 class NotReadable(MockRawIO):
2000 def readable(self):
2001 return False
2002
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002003 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002004
2005 def test_constructor_with_not_writeable(self):
2006 class NotWriteable(MockRawIO):
2007 def writable(self):
2008 return False
2009
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002010 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002011
2012 def test_read(self):
2013 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2014
2015 self.assertEqual(pair.read(3), b"abc")
2016 self.assertEqual(pair.read(1), b"d")
2017 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002018 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2019 self.assertEqual(pair.read(None), b"abc")
2020
2021 def test_readlines(self):
2022 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2023 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2024 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2025 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002026
2027 def test_read1(self):
2028 # .read1() is delegated to the underlying reader object, so this test
2029 # can be shallow.
2030 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2031
2032 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00002033 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002034
2035 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00002036 for method in ("readinto", "readinto1"):
2037 with self.subTest(method):
2038 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002039
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002040 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002041 self.assertEqual(getattr(pair, method)(data), 5)
2042 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002043
2044 def test_write(self):
2045 w = self.MockRawIO()
2046 pair = self.tp(self.MockRawIO(), w)
2047
2048 pair.write(b"abc")
2049 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002050 buffer = bytearray(b"def")
2051 pair.write(buffer)
2052 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002053 pair.flush()
2054 self.assertEqual(w._write_stack, [b"abc", b"def"])
2055
2056 def test_peek(self):
2057 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2058
2059 self.assertTrue(pair.peek(3).startswith(b"abc"))
2060 self.assertEqual(pair.read(3), b"abc")
2061
2062 def test_readable(self):
2063 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2064 self.assertTrue(pair.readable())
2065
2066 def test_writeable(self):
2067 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2068 self.assertTrue(pair.writable())
2069
2070 def test_seekable(self):
2071 # BufferedRWPairs are never seekable, even if their readers and writers
2072 # are.
2073 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2074 self.assertFalse(pair.seekable())
2075
2076 # .flush() is delegated to the underlying writer object and has been
2077 # tested in the test_write method.
2078
2079 def test_close_and_closed(self):
2080 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2081 self.assertFalse(pair.closed)
2082 pair.close()
2083 self.assertTrue(pair.closed)
2084
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002085 def test_reader_close_error_on_close(self):
2086 def reader_close():
2087 reader_non_existing
2088 reader = self.MockRawIO()
2089 reader.close = reader_close
2090 writer = self.MockRawIO()
2091 pair = self.tp(reader, writer)
2092 with self.assertRaises(NameError) as err:
2093 pair.close()
2094 self.assertIn('reader_non_existing', str(err.exception))
2095 self.assertTrue(pair.closed)
2096 self.assertFalse(reader.closed)
2097 self.assertTrue(writer.closed)
2098
Victor Stinner472f7942019-04-12 21:58:24 +02002099 # Silence destructor error
2100 reader.close = lambda: None
2101
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002102 def test_writer_close_error_on_close(self):
2103 def writer_close():
2104 writer_non_existing
2105 reader = self.MockRawIO()
2106 writer = self.MockRawIO()
2107 writer.close = writer_close
2108 pair = self.tp(reader, writer)
2109 with self.assertRaises(NameError) as err:
2110 pair.close()
2111 self.assertIn('writer_non_existing', str(err.exception))
2112 self.assertFalse(pair.closed)
2113 self.assertTrue(reader.closed)
2114 self.assertFalse(writer.closed)
2115
Victor Stinner472f7942019-04-12 21:58:24 +02002116 # Silence destructor error
2117 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002118 writer = None
2119
Victor Stinner212646c2019-06-14 18:03:22 +02002120 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002121 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002122 # Ignore BufferedRWPair unraisable exception
2123 with support.catch_unraisable_exception():
2124 pair = None
2125 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002126 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002127
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002128 def test_reader_writer_close_error_on_close(self):
2129 def reader_close():
2130 reader_non_existing
2131 def writer_close():
2132 writer_non_existing
2133 reader = self.MockRawIO()
2134 reader.close = reader_close
2135 writer = self.MockRawIO()
2136 writer.close = writer_close
2137 pair = self.tp(reader, writer)
2138 with self.assertRaises(NameError) as err:
2139 pair.close()
2140 self.assertIn('reader_non_existing', str(err.exception))
2141 self.assertIsInstance(err.exception.__context__, NameError)
2142 self.assertIn('writer_non_existing', str(err.exception.__context__))
2143 self.assertFalse(pair.closed)
2144 self.assertFalse(reader.closed)
2145 self.assertFalse(writer.closed)
2146
Victor Stinner472f7942019-04-12 21:58:24 +02002147 # Silence destructor error
2148 reader.close = lambda: None
2149 writer.close = lambda: None
2150
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002151 def test_isatty(self):
2152 class SelectableIsAtty(MockRawIO):
2153 def __init__(self, isatty):
2154 MockRawIO.__init__(self)
2155 self._isatty = isatty
2156
2157 def isatty(self):
2158 return self._isatty
2159
2160 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2161 self.assertFalse(pair.isatty())
2162
2163 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2164 self.assertTrue(pair.isatty())
2165
2166 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2167 self.assertTrue(pair.isatty())
2168
2169 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2170 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002171
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002172 def test_weakref_clearing(self):
2173 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2174 ref = weakref.ref(brw)
2175 brw = None
2176 ref = None # Shouldn't segfault.
2177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178class CBufferedRWPairTest(BufferedRWPairTest):
2179 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002181class PyBufferedRWPairTest(BufferedRWPairTest):
2182 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002184
2185class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2186 read_mode = "rb+"
2187 write_mode = "wb+"
2188
2189 def test_constructor(self):
2190 BufferedReaderTest.test_constructor(self)
2191 BufferedWriterTest.test_constructor(self)
2192
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002193 def test_uninitialized(self):
2194 BufferedReaderTest.test_uninitialized(self)
2195 BufferedWriterTest.test_uninitialized(self)
2196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 def test_read_and_write(self):
2198 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002199 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002200
2201 self.assertEqual(b"as", rw.read(2))
2202 rw.write(b"ddd")
2203 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002204 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002206 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002207
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002208 def test_seek_and_tell(self):
2209 raw = self.BytesIO(b"asdfghjkl")
2210 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002211
Ezio Melottib3aedd42010-11-20 19:04:17 +00002212 self.assertEqual(b"as", rw.read(2))
2213 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002214 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002215 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002216
Antoine Pitroue05565e2011-08-20 14:39:23 +02002217 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002218 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002219 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002220 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002221 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002222 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002223 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002224 self.assertEqual(7, rw.tell())
2225 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002226 rw.flush()
2227 self.assertEqual(b"asdf123fl", raw.getvalue())
2228
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002229 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002230
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 def check_flush_and_read(self, read_func):
2232 raw = self.BytesIO(b"abcdefghi")
2233 bufio = self.tp(raw)
2234
Ezio Melottib3aedd42010-11-20 19:04:17 +00002235 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002236 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002237 self.assertEqual(b"ef", read_func(bufio, 2))
2238 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002239 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002240 self.assertEqual(6, bufio.tell())
2241 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002242 raw.seek(0, 0)
2243 raw.write(b"XYZ")
2244 # flush() resets the read buffer
2245 bufio.flush()
2246 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002247 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002248
2249 def test_flush_and_read(self):
2250 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2251
2252 def test_flush_and_readinto(self):
2253 def _readinto(bufio, n=-1):
2254 b = bytearray(n if n >= 0 else 9999)
2255 n = bufio.readinto(b)
2256 return bytes(b[:n])
2257 self.check_flush_and_read(_readinto)
2258
2259 def test_flush_and_peek(self):
2260 def _peek(bufio, n=-1):
2261 # This relies on the fact that the buffer can contain the whole
2262 # raw stream, otherwise peek() can return less.
2263 b = bufio.peek(n)
2264 if n != -1:
2265 b = b[:n]
2266 bufio.seek(len(b), 1)
2267 return b
2268 self.check_flush_and_read(_peek)
2269
2270 def test_flush_and_write(self):
2271 raw = self.BytesIO(b"abcdefghi")
2272 bufio = self.tp(raw)
2273
2274 bufio.write(b"123")
2275 bufio.flush()
2276 bufio.write(b"45")
2277 bufio.flush()
2278 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002279 self.assertEqual(b"12345fghi", raw.getvalue())
2280 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002281
2282 def test_threads(self):
2283 BufferedReaderTest.test_threads(self)
2284 BufferedWriterTest.test_threads(self)
2285
2286 def test_writes_and_peek(self):
2287 def _peek(bufio):
2288 bufio.peek(1)
2289 self.check_writes(_peek)
2290 def _peek(bufio):
2291 pos = bufio.tell()
2292 bufio.seek(-1, 1)
2293 bufio.peek(1)
2294 bufio.seek(pos, 0)
2295 self.check_writes(_peek)
2296
2297 def test_writes_and_reads(self):
2298 def _read(bufio):
2299 bufio.seek(-1, 1)
2300 bufio.read(1)
2301 self.check_writes(_read)
2302
2303 def test_writes_and_read1s(self):
2304 def _read1(bufio):
2305 bufio.seek(-1, 1)
2306 bufio.read1(1)
2307 self.check_writes(_read1)
2308
2309 def test_writes_and_readintos(self):
2310 def _read(bufio):
2311 bufio.seek(-1, 1)
2312 bufio.readinto(bytearray(1))
2313 self.check_writes(_read)
2314
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002315 def test_write_after_readahead(self):
2316 # Issue #6629: writing after the buffer was filled by readahead should
2317 # first rewind the raw stream.
2318 for overwrite_size in [1, 5]:
2319 raw = self.BytesIO(b"A" * 10)
2320 bufio = self.tp(raw, 4)
2321 # Trigger readahead
2322 self.assertEqual(bufio.read(1), b"A")
2323 self.assertEqual(bufio.tell(), 1)
2324 # Overwriting should rewind the raw stream if it needs so
2325 bufio.write(b"B" * overwrite_size)
2326 self.assertEqual(bufio.tell(), overwrite_size + 1)
2327 # If the write size was smaller than the buffer size, flush() and
2328 # check that rewind happens.
2329 bufio.flush()
2330 self.assertEqual(bufio.tell(), overwrite_size + 1)
2331 s = raw.getvalue()
2332 self.assertEqual(s,
2333 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2334
Antoine Pitrou7c404892011-05-13 00:13:33 +02002335 def test_write_rewind_write(self):
2336 # Various combinations of reading / writing / seeking backwards / writing again
2337 def mutate(bufio, pos1, pos2):
2338 assert pos2 >= pos1
2339 # Fill the buffer
2340 bufio.seek(pos1)
2341 bufio.read(pos2 - pos1)
2342 bufio.write(b'\x02')
2343 # This writes earlier than the previous write, but still inside
2344 # the buffer.
2345 bufio.seek(pos1)
2346 bufio.write(b'\x01')
2347
2348 b = b"\x80\x81\x82\x83\x84"
2349 for i in range(0, len(b)):
2350 for j in range(i, len(b)):
2351 raw = self.BytesIO(b)
2352 bufio = self.tp(raw, 100)
2353 mutate(bufio, i, j)
2354 bufio.flush()
2355 expected = bytearray(b)
2356 expected[j] = 2
2357 expected[i] = 1
2358 self.assertEqual(raw.getvalue(), expected,
2359 "failed result for i=%d, j=%d" % (i, j))
2360
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002361 def test_truncate_after_read_or_write(self):
2362 raw = self.BytesIO(b"A" * 10)
2363 bufio = self.tp(raw, 100)
2364 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2365 self.assertEqual(bufio.truncate(), 2)
2366 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2367 self.assertEqual(bufio.truncate(), 4)
2368
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002369 def test_misbehaved_io(self):
2370 BufferedReaderTest.test_misbehaved_io(self)
2371 BufferedWriterTest.test_misbehaved_io(self)
2372
Antoine Pitroue05565e2011-08-20 14:39:23 +02002373 def test_interleaved_read_write(self):
2374 # Test for issue #12213
2375 with self.BytesIO(b'abcdefgh') as raw:
2376 with self.tp(raw, 100) as f:
2377 f.write(b"1")
2378 self.assertEqual(f.read(1), b'b')
2379 f.write(b'2')
2380 self.assertEqual(f.read1(1), b'd')
2381 f.write(b'3')
2382 buf = bytearray(1)
2383 f.readinto(buf)
2384 self.assertEqual(buf, b'f')
2385 f.write(b'4')
2386 self.assertEqual(f.peek(1), b'h')
2387 f.flush()
2388 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2389
2390 with self.BytesIO(b'abc') as raw:
2391 with self.tp(raw, 100) as f:
2392 self.assertEqual(f.read(1), b'a')
2393 f.write(b"2")
2394 self.assertEqual(f.read(1), b'c')
2395 f.flush()
2396 self.assertEqual(raw.getvalue(), b'a2c')
2397
2398 def test_interleaved_readline_write(self):
2399 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2400 with self.tp(raw) as f:
2401 f.write(b'1')
2402 self.assertEqual(f.readline(), b'b\n')
2403 f.write(b'2')
2404 self.assertEqual(f.readline(), b'def\n')
2405 f.write(b'3')
2406 self.assertEqual(f.readline(), b'\n')
2407 f.flush()
2408 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2409
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002410 # You can't construct a BufferedRandom over a non-seekable stream.
2411 test_unseekable = None
2412
Berker Peksagfd5116c2020-02-21 20:57:26 +03002413 # writable() returns True, so there's no point to test it over
2414 # a writable stream.
2415 test_truncate_on_read_only = None
2416
R David Murray67bfe802013-02-23 21:51:05 -05002417
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002418class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002419 tp = io.BufferedRandom
2420
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002421 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2422 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002423 def test_constructor(self):
2424 BufferedRandomTest.test_constructor(self)
2425 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002426 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002427 if sys.maxsize > 0x7FFFFFFF:
2428 rawio = self.MockRawIO()
2429 bufio = self.tp(rawio)
2430 self.assertRaises((OverflowError, MemoryError, ValueError),
2431 bufio.__init__, rawio, sys.maxsize)
2432
2433 def test_garbage_collection(self):
2434 CBufferedReaderTest.test_garbage_collection(self)
2435 CBufferedWriterTest.test_garbage_collection(self)
2436
R David Murray67bfe802013-02-23 21:51:05 -05002437 def test_args_error(self):
2438 # Issue #17275
2439 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2440 self.tp(io.BytesIO(), 1024, 1024, 1024)
2441
2442
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002443class PyBufferedRandomTest(BufferedRandomTest):
2444 tp = pyio.BufferedRandom
2445
2446
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002447# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2448# properties:
2449# - A single output character can correspond to many bytes of input.
2450# - The number of input bytes to complete the character can be
2451# undetermined until the last input byte is received.
2452# - The number of input bytes can vary depending on previous input.
2453# - A single input byte can correspond to many characters of output.
2454# - The number of output characters can be undetermined until the
2455# last input byte is received.
2456# - The number of output characters can vary depending on previous input.
2457
2458class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2459 """
2460 For testing seek/tell behavior with a stateful, buffering decoder.
2461
2462 Input is a sequence of words. Words may be fixed-length (length set
2463 by input) or variable-length (period-terminated). In variable-length
2464 mode, extra periods are ignored. Possible words are:
2465 - 'i' followed by a number sets the input length, I (maximum 99).
2466 When I is set to 0, words are space-terminated.
2467 - 'o' followed by a number sets the output length, O (maximum 99).
2468 - Any other word is converted into a word followed by a period on
2469 the output. The output word consists of the input word truncated
2470 or padded out with hyphens to make its length equal to O. If O
2471 is 0, the word is output verbatim without truncating or padding.
2472 I and O are initially set to 1. When I changes, any buffered input is
2473 re-scanned according to the new I. EOF also terminates the last word.
2474 """
2475
2476 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002477 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002478 self.reset()
2479
2480 def __repr__(self):
2481 return '<SID %x>' % id(self)
2482
2483 def reset(self):
2484 self.i = 1
2485 self.o = 1
2486 self.buffer = bytearray()
2487
2488 def getstate(self):
2489 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2490 return bytes(self.buffer), i*100 + o
2491
2492 def setstate(self, state):
2493 buffer, io = state
2494 self.buffer = bytearray(buffer)
2495 i, o = divmod(io, 100)
2496 self.i, self.o = i ^ 1, o ^ 1
2497
2498 def decode(self, input, final=False):
2499 output = ''
2500 for b in input:
2501 if self.i == 0: # variable-length, terminated with period
2502 if b == ord('.'):
2503 if self.buffer:
2504 output += self.process_word()
2505 else:
2506 self.buffer.append(b)
2507 else: # fixed-length, terminate after self.i bytes
2508 self.buffer.append(b)
2509 if len(self.buffer) == self.i:
2510 output += self.process_word()
2511 if final and self.buffer: # EOF terminates the last word
2512 output += self.process_word()
2513 return output
2514
2515 def process_word(self):
2516 output = ''
2517 if self.buffer[0] == ord('i'):
2518 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2519 elif self.buffer[0] == ord('o'):
2520 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2521 else:
2522 output = self.buffer.decode('ascii')
2523 if len(output) < self.o:
2524 output += '-'*self.o # pad out with hyphens
2525 if self.o:
2526 output = output[:self.o] # truncate to output length
2527 output += '.'
2528 self.buffer = bytearray()
2529 return output
2530
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002531 codecEnabled = False
2532
Hai Shi14cdc212020-10-26 02:38:33 +08002533
2534# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
2535# when registering codecs and cleanup functions.
2536def lookupTestDecoder(name):
2537 if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
2538 latin1 = codecs.lookup('latin-1')
2539 return codecs.CodecInfo(
2540 name='test_decoder', encode=latin1.encode, decode=None,
2541 incrementalencoder=None,
2542 streamreader=None, streamwriter=None,
2543 incrementaldecoder=StatefulIncrementalDecoder)
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002544
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002545
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002546class StatefulIncrementalDecoderTest(unittest.TestCase):
2547 """
2548 Make sure the StatefulIncrementalDecoder actually works.
2549 """
2550
2551 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002552 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002553 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002554 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002555 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002556 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002557 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002558 # I=0, O=6 (variable-length input, fixed-length output)
2559 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2560 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002561 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002562 # I=6, O=3 (fixed-length input > fixed-length output)
2563 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2564 # I=0, then 3; O=29, then 15 (with longer output)
2565 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2566 'a----------------------------.' +
2567 'b----------------------------.' +
2568 'cde--------------------------.' +
2569 'abcdefghijabcde.' +
2570 'a.b------------.' +
2571 '.c.------------.' +
2572 'd.e------------.' +
2573 'k--------------.' +
2574 'l--------------.' +
2575 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002576 ]
2577
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002578 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002579 # Try a few one-shot test cases.
2580 for input, eof, output in self.test_cases:
2581 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002582 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002583
2584 # Also test an unfinished decode, followed by forcing EOF.
2585 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002586 self.assertEqual(d.decode(b'oiabcd'), '')
2587 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002588
2589class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002590
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002591 def setUp(self):
2592 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2593 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Hai Shi883bc632020-07-06 17:12:49 +08002594 os_helper.unlink(os_helper.TESTFN)
Hai Shi14cdc212020-10-26 02:38:33 +08002595 codecs.register(lookupTestDecoder)
2596 self.addCleanup(codecs.unregister, lookupTestDecoder)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002597
Guido van Rossumd0712812007-04-11 16:32:43 +00002598 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08002599 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002600
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002601 def test_constructor(self):
2602 r = self.BytesIO(b"\xc3\xa9\n\n")
2603 b = self.BufferedReader(r, 1000)
2604 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002605 t.__init__(b, encoding="latin-1", newline="\r\n")
2606 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002607 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002608 t.__init__(b, encoding="utf-8", line_buffering=True)
2609 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002610 self.assertEqual(t.line_buffering, True)
2611 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002612 self.assertRaises(TypeError, t.__init__, b, newline=42)
2613 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2614
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002615 def test_uninitialized(self):
2616 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2617 del t
2618 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2619 self.assertRaises(Exception, repr, t)
2620 self.assertRaisesRegex((ValueError, AttributeError),
2621 'uninitialized|has no attribute',
2622 t.read, 0)
2623 t.__init__(self.MockRawIO())
2624 self.assertEqual(t.read(0), '')
2625
Nick Coghlana9b15242014-02-04 22:11:18 +10002626 def test_non_text_encoding_codecs_are_rejected(self):
2627 # Ensure the constructor complains if passed a codec that isn't
2628 # marked as a text encoding
2629 # http://bugs.python.org/issue20404
2630 r = self.BytesIO()
2631 b = self.BufferedWriter(r)
2632 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2633 self.TextIOWrapper(b, encoding="hex")
2634
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002635 def test_detach(self):
2636 r = self.BytesIO()
2637 b = self.BufferedWriter(r)
2638 t = self.TextIOWrapper(b)
2639 self.assertIs(t.detach(), b)
2640
2641 t = self.TextIOWrapper(b, encoding="ascii")
2642 t.write("howdy")
2643 self.assertFalse(r.getvalue())
2644 t.detach()
2645 self.assertEqual(r.getvalue(), b"howdy")
2646 self.assertRaises(ValueError, t.detach)
2647
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002648 # Operations independent of the detached stream should still work
2649 repr(t)
2650 self.assertEqual(t.encoding, "ascii")
2651 self.assertEqual(t.errors, "strict")
2652 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002653 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002654
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002655 def test_repr(self):
2656 raw = self.BytesIO("hello".encode("utf-8"))
2657 b = self.BufferedReader(raw)
2658 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002659 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002660 self.assertRegex(repr(t),
2661 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002662 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002663 self.assertRegex(repr(t),
2664 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002665 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002666 self.assertRegex(repr(t),
2667 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002668 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002669 self.assertRegex(repr(t),
2670 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002671
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002672 t.buffer.detach()
2673 repr(t) # Should not raise an exception
2674
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002675 def test_recursive_repr(self):
2676 # Issue #25455
2677 raw = self.BytesIO()
2678 t = self.TextIOWrapper(raw)
2679 with support.swap_attr(raw, 'name', t):
2680 try:
2681 repr(t) # Should not crash
2682 except RuntimeError:
2683 pass
2684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002685 def test_line_buffering(self):
2686 r = self.BytesIO()
2687 b = self.BufferedWriter(r, 1000)
2688 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002689 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002690 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002691 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002692 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002693 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002694 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002695
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002696 def test_reconfigure_line_buffering(self):
2697 r = self.BytesIO()
2698 b = self.BufferedWriter(r, 1000)
2699 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2700 t.write("AB\nC")
2701 self.assertEqual(r.getvalue(), b"")
2702
2703 t.reconfigure(line_buffering=True) # implicit flush
2704 self.assertEqual(r.getvalue(), b"AB\nC")
2705 t.write("DEF\nG")
2706 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2707 t.write("H")
2708 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2709 t.reconfigure(line_buffering=False) # implicit flush
2710 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2711 t.write("IJ")
2712 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2713
2714 # Keeping default value
2715 t.reconfigure()
2716 t.reconfigure(line_buffering=None)
2717 self.assertEqual(t.line_buffering, False)
2718 t.reconfigure(line_buffering=True)
2719 t.reconfigure()
2720 t.reconfigure(line_buffering=None)
2721 self.assertEqual(t.line_buffering, True)
2722
Victor Stinner91106cd2017-12-13 12:29:09 +01002723 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002724 def test_default_encoding(self):
2725 old_environ = dict(os.environ)
2726 try:
2727 # try to get a user preferred encoding different than the current
2728 # locale encoding to check that TextIOWrapper() uses the current
2729 # locale encoding and not the user preferred encoding
2730 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2731 if key in os.environ:
2732 del os.environ[key]
2733
2734 current_locale_encoding = locale.getpreferredencoding(False)
2735 b = self.BytesIO()
2736 t = self.TextIOWrapper(b)
2737 self.assertEqual(t.encoding, current_locale_encoding)
2738 finally:
2739 os.environ.clear()
2740 os.environ.update(old_environ)
2741
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002742 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002743 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002744 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002745 # Issue 15989
2746 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002747 b = self.BytesIO()
2748 b.fileno = lambda: _testcapi.INT_MAX + 1
2749 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2750 b.fileno = lambda: _testcapi.UINT_MAX + 1
2751 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002753 def test_encoding(self):
2754 # Check the encoding attribute is always set, and valid
2755 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002756 t = self.TextIOWrapper(b, encoding="utf-8")
2757 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002758 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002759 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 codecs.lookup(t.encoding)
2761
2762 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002763 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002764 b = self.BytesIO(b"abc\n\xff\n")
2765 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002766 self.assertRaises(UnicodeError, t.read)
2767 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002768 b = self.BytesIO(b"abc\n\xff\n")
2769 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002770 self.assertRaises(UnicodeError, t.read)
2771 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002772 b = self.BytesIO(b"abc\n\xff\n")
2773 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002774 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002775 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002776 b = self.BytesIO(b"abc\n\xff\n")
2777 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002778 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002780 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002781 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002782 b = self.BytesIO()
2783 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002784 self.assertRaises(UnicodeError, t.write, "\xff")
2785 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 b = self.BytesIO()
2787 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002788 self.assertRaises(UnicodeError, t.write, "\xff")
2789 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002790 b = self.BytesIO()
2791 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002792 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002793 t.write("abc\xffdef\n")
2794 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002795 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002796 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002797 b = self.BytesIO()
2798 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002799 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002800 t.write("abc\xffdef\n")
2801 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002802 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002803
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002804 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002805 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2806
2807 tests = [
2808 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002809 [ '', input_lines ],
2810 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2811 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2812 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002813 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002814 encodings = (
2815 'utf-8', 'latin-1',
2816 'utf-16', 'utf-16-le', 'utf-16-be',
2817 'utf-32', 'utf-32-le', 'utf-32-be',
2818 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002819
Guido van Rossum8358db22007-08-18 21:39:55 +00002820 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002821 # character in TextIOWrapper._pending_line.
2822 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002823 # XXX: str.encode() should return bytes
2824 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002825 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002826 for bufsize in range(1, 10):
2827 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002828 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2829 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002830 encoding=encoding)
2831 if do_reads:
2832 got_lines = []
2833 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002834 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002835 if c2 == '':
2836 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002837 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002838 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002839 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002840 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002841
2842 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002843 self.assertEqual(got_line, exp_line)
2844 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002845
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002846 def test_newlines_input(self):
2847 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002848 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2849 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002850 (None, normalized.decode("ascii").splitlines(keepends=True)),
2851 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002852 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2853 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2854 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002855 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002856 buf = self.BytesIO(testdata)
2857 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002858 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002859 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002860 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002862 def test_newlines_output(self):
2863 testdict = {
2864 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2865 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2866 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2867 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2868 }
2869 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2870 for newline, expected in tests:
2871 buf = self.BytesIO()
2872 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2873 txt.write("AAA\nB")
2874 txt.write("BB\nCCC\n")
2875 txt.write("X\rY\r\nZ")
2876 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002877 self.assertEqual(buf.closed, False)
2878 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002879
2880 def test_destructor(self):
2881 l = []
2882 base = self.BytesIO
2883 class MyBytesIO(base):
2884 def close(self):
2885 l.append(self.getvalue())
2886 base.close(self)
2887 b = MyBytesIO()
2888 t = self.TextIOWrapper(b, encoding="ascii")
2889 t.write("abc")
2890 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002891 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002892 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893
2894 def test_override_destructor(self):
2895 record = []
2896 class MyTextIO(self.TextIOWrapper):
2897 def __del__(self):
2898 record.append(1)
2899 try:
2900 f = super().__del__
2901 except AttributeError:
2902 pass
2903 else:
2904 f()
2905 def close(self):
2906 record.append(2)
2907 super().close()
2908 def flush(self):
2909 record.append(3)
2910 super().flush()
2911 b = self.BytesIO()
2912 t = MyTextIO(b, encoding="ascii")
2913 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002914 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002915 self.assertEqual(record, [1, 2, 3])
2916
2917 def test_error_through_destructor(self):
2918 # Test that the exception state is not modified by a destructor,
2919 # even if close() fails.
2920 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002921 with support.catch_unraisable_exception() as cm:
2922 with self.assertRaises(AttributeError):
2923 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002924
2925 if not IOBASE_EMITS_UNRAISABLE:
2926 self.assertIsNone(cm.unraisable)
2927 elif cm.unraisable is not None:
2928 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002929
Guido van Rossum9b76da62007-04-11 01:09:03 +00002930 # Systematic tests of the text I/O API
2931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002932 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002933 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002934 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Hai Shi883bc632020-07-06 17:12:49 +08002935 f = self.open(os_helper.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002936 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002937 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002938 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08002939 f = self.open(os_helper.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002940 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002941 self.assertEqual(f.tell(), 0)
2942 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002943 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002944 self.assertEqual(f.seek(0), 0)
2945 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002946 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002947 self.assertEqual(f.read(2), "ab")
2948 self.assertEqual(f.read(1), "c")
2949 self.assertEqual(f.read(1), "")
2950 self.assertEqual(f.read(), "")
2951 self.assertEqual(f.tell(), cookie)
2952 self.assertEqual(f.seek(0), 0)
2953 self.assertEqual(f.seek(0, 2), cookie)
2954 self.assertEqual(f.write("def"), 3)
2955 self.assertEqual(f.seek(cookie), cookie)
2956 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002957 if enc.startswith("utf"):
2958 self.multi_line_test(f, enc)
2959 f.close()
2960
2961 def multi_line_test(self, f, enc):
2962 f.seek(0)
2963 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002964 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002965 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002966 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002967 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002968 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002969 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002970 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002971 wlines.append((f.tell(), line))
2972 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002973 f.seek(0)
2974 rlines = []
2975 while True:
2976 pos = f.tell()
2977 line = f.readline()
2978 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002979 break
2980 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002981 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002982
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002983 def test_telling(self):
Hai Shi883bc632020-07-06 17:12:49 +08002984 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002985 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002986 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002987 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002988 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002989 p2 = f.tell()
2990 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002991 self.assertEqual(f.tell(), p0)
2992 self.assertEqual(f.readline(), "\xff\n")
2993 self.assertEqual(f.tell(), p1)
2994 self.assertEqual(f.readline(), "\xff\n")
2995 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002996 f.seek(0)
2997 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002998 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002999 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003000 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00003001 f.close()
3002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003003 def test_seeking(self):
3004 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003005 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00003006 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00003007 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003008 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00003009 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00003010 suffix = bytes(u_suffix.encode("utf-8"))
3011 line = prefix + suffix
Hai Shi883bc632020-07-06 17:12:49 +08003012 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003013 f.write(line*2)
Hai Shi883bc632020-07-06 17:12:49 +08003014 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003015 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003016 self.assertEqual(s, str(prefix, "ascii"))
3017 self.assertEqual(f.tell(), prefix_size)
3018 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00003019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003020 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00003021 # Regression test for a specific bug
3022 data = b'\xe0\xbf\xbf\n'
Hai Shi883bc632020-07-06 17:12:49 +08003023 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003024 f.write(data)
Hai Shi883bc632020-07-06 17:12:49 +08003025 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003026 f._CHUNK_SIZE # Just test that it exists
3027 f._CHUNK_SIZE = 2
3028 f.readline()
3029 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003031 def test_seek_and_tell(self):
3032 #Test seek/tell using the StatefulIncrementalDecoder.
3033 # Make test faster by doing smaller seeks
3034 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003035
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003036 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003037 """Tell/seek to various points within a data stream and ensure
3038 that the decoded data returned by read() is consistent."""
Hai Shi883bc632020-07-06 17:12:49 +08003039 f = self.open(os_helper.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003040 f.write(data)
3041 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08003042 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003043 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003044 decoded = f.read()
3045 f.close()
3046
Neal Norwitze2b07052008-03-18 19:52:05 +00003047 for i in range(min_pos, len(decoded) + 1): # seek positions
3048 for j in [1, 5, len(decoded) - i]: # read lengths
Hai Shi883bc632020-07-06 17:12:49 +08003049 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003050 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003051 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003052 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003053 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003054 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003055 f.close()
3056
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003057 # Enable the test decoder.
3058 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003059
3060 # Run the tests.
3061 try:
3062 # Try each test case.
3063 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003064 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003065
3066 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003067 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3068 offset = CHUNK_SIZE - len(input)//2
3069 prefix = b'.'*offset
3070 # Don't bother seeking into the prefix (takes too long).
3071 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003072 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003073
3074 # Ensure our test decoder won't interfere with subsequent tests.
3075 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003076 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003077
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003078 def test_multibyte_seek_and_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +08003079 f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003080 f.write("AB\n\u3046\u3048\n")
3081 f.close()
3082
Hai Shi883bc632020-07-06 17:12:49 +08003083 f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003084 self.assertEqual(f.readline(), "AB\n")
3085 p0 = f.tell()
3086 self.assertEqual(f.readline(), "\u3046\u3048\n")
3087 p1 = f.tell()
3088 f.seek(p0)
3089 self.assertEqual(f.readline(), "\u3046\u3048\n")
3090 self.assertEqual(f.tell(), p1)
3091 f.close()
3092
3093 def test_seek_with_encoder_state(self):
Hai Shi883bc632020-07-06 17:12:49 +08003094 f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003095 f.write("\u00e6\u0300")
3096 p0 = f.tell()
3097 f.write("\u00e6")
3098 f.seek(p0)
3099 f.write("\u0300")
3100 f.close()
3101
Hai Shi883bc632020-07-06 17:12:49 +08003102 f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003103 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3104 f.close()
3105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003106 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003107 data = "1234567890"
3108 tests = ("utf-16",
3109 "utf-16-le",
3110 "utf-16-be",
3111 "utf-32",
3112 "utf-32-le",
3113 "utf-32-be")
3114 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003115 buf = self.BytesIO()
3116 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003117 # Check if the BOM is written only once (see issue1753).
3118 f.write(data)
3119 f.write(data)
3120 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003121 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003122 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003123 self.assertEqual(f.read(), data * 2)
3124 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003125
Benjamin Petersona1b49012009-03-31 23:11:32 +00003126 def test_unreadable(self):
3127 class UnReadable(self.BytesIO):
3128 def readable(self):
3129 return False
3130 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003131 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003133 def test_read_one_by_one(self):
3134 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003135 reads = ""
3136 while True:
3137 c = txt.read(1)
3138 if not c:
3139 break
3140 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003141 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003142
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003143 def test_readlines(self):
3144 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3145 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3146 txt.seek(0)
3147 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3148 txt.seek(0)
3149 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3150
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003151 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003152 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003153 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003154 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003155 reads = ""
3156 while True:
3157 c = txt.read(128)
3158 if not c:
3159 break
3160 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003161 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003162
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003163 def test_writelines(self):
3164 l = ['ab', 'cd', 'ef']
3165 buf = self.BytesIO()
3166 txt = self.TextIOWrapper(buf)
3167 txt.writelines(l)
3168 txt.flush()
3169 self.assertEqual(buf.getvalue(), b'abcdef')
3170
3171 def test_writelines_userlist(self):
3172 l = UserList(['ab', 'cd', 'ef'])
3173 buf = self.BytesIO()
3174 txt = self.TextIOWrapper(buf)
3175 txt.writelines(l)
3176 txt.flush()
3177 self.assertEqual(buf.getvalue(), b'abcdef')
3178
3179 def test_writelines_error(self):
3180 txt = self.TextIOWrapper(self.BytesIO())
3181 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3182 self.assertRaises(TypeError, txt.writelines, None)
3183 self.assertRaises(TypeError, txt.writelines, b'abc')
3184
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003185 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003186 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003187
3188 # read one char at a time
3189 reads = ""
3190 while True:
3191 c = txt.read(1)
3192 if not c:
3193 break
3194 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003195 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003196
3197 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003198 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003199 txt._CHUNK_SIZE = 4
3200
3201 reads = ""
3202 while True:
3203 c = txt.read(4)
3204 if not c:
3205 break
3206 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003207 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003208
3209 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003210 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003211 txt._CHUNK_SIZE = 4
3212
3213 reads = txt.read(4)
3214 reads += txt.read(4)
3215 reads += txt.readline()
3216 reads += txt.readline()
3217 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003218 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003219
3220 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003221 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003222 txt._CHUNK_SIZE = 4
3223
3224 reads = txt.read(4)
3225 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003226 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003227
3228 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003229 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003230 txt._CHUNK_SIZE = 4
3231
3232 reads = txt.read(4)
3233 pos = txt.tell()
3234 txt.seek(0)
3235 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003236 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003237
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003238 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003239 buffer = self.BytesIO(self.testdata)
3240 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003241
3242 self.assertEqual(buffer.seekable(), txt.seekable())
3243
Antoine Pitroue4501852009-05-14 18:55:55 +00003244 def test_append_bom(self):
3245 # The BOM is not written again when appending to a non-empty file
Hai Shi883bc632020-07-06 17:12:49 +08003246 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003247 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3248 with self.open(filename, 'w', encoding=charset) as f:
3249 f.write('aaa')
3250 pos = f.tell()
3251 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003252 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003253
3254 with self.open(filename, 'a', encoding=charset) as f:
3255 f.write('xxx')
3256 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003257 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003258
3259 def test_seek_bom(self):
3260 # Same test, but when seeking manually
Hai Shi883bc632020-07-06 17:12:49 +08003261 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003262 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3263 with self.open(filename, 'w', encoding=charset) as f:
3264 f.write('aaa')
3265 pos = f.tell()
3266 with self.open(filename, 'r+', encoding=charset) as f:
3267 f.seek(pos)
3268 f.write('zzz')
3269 f.seek(0)
3270 f.write('bbb')
3271 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003272 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003273
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003274 def test_seek_append_bom(self):
3275 # Same test, but first seek to the start and then to the end
Hai Shi883bc632020-07-06 17:12:49 +08003276 filename = os_helper.TESTFN
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003277 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3278 with self.open(filename, 'w', encoding=charset) as f:
3279 f.write('aaa')
3280 with self.open(filename, 'a', encoding=charset) as f:
3281 f.seek(0)
3282 f.seek(0, self.SEEK_END)
3283 f.write('xxx')
3284 with self.open(filename, 'rb') as f:
3285 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3286
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003287 def test_errors_property(self):
Hai Shi883bc632020-07-06 17:12:49 +08003288 with self.open(os_helper.TESTFN, "w") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003289 self.assertEqual(f.errors, "strict")
Hai Shi883bc632020-07-06 17:12:49 +08003290 with self.open(os_helper.TESTFN, "w", errors="replace") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003291 self.assertEqual(f.errors, "replace")
3292
Brett Cannon31f59292011-02-21 19:29:56 +00003293 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003294 def test_threads_write(self):
3295 # Issue6750: concurrent writes could duplicate data
3296 event = threading.Event()
Hai Shi883bc632020-07-06 17:12:49 +08003297 with self.open(os_helper.TESTFN, "w", buffering=1) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003298 def run(n):
3299 text = "Thread%03d\n" % n
3300 event.wait()
3301 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003302 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003303 for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08003304 with threading_helper.start_threads(threads, event.set):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003305 time.sleep(0.02)
Hai Shi883bc632020-07-06 17:12:49 +08003306 with self.open(os_helper.TESTFN) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003307 content = f.read()
3308 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003309 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003310
Antoine Pitrou6be88762010-05-03 16:48:20 +00003311 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003312 # Test that text file is closed despite failed flush
3313 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003314 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003315 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003316 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003317 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003318 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003319 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003320 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003321 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003322 self.assertTrue(txt.buffer.closed)
3323 self.assertTrue(closed) # flush() called
3324 self.assertFalse(closed[0]) # flush() called before file closed
3325 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003326 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003327
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003328 def test_close_error_on_close(self):
3329 buffer = self.BytesIO(self.testdata)
3330 def bad_flush():
3331 raise OSError('flush')
3332 def bad_close():
3333 raise OSError('close')
3334 buffer.close = bad_close
3335 txt = self.TextIOWrapper(buffer, encoding="ascii")
3336 txt.flush = bad_flush
3337 with self.assertRaises(OSError) as err: # exception not swallowed
3338 txt.close()
3339 self.assertEqual(err.exception.args, ('close',))
3340 self.assertIsInstance(err.exception.__context__, OSError)
3341 self.assertEqual(err.exception.__context__.args, ('flush',))
3342 self.assertFalse(txt.closed)
3343
Victor Stinner472f7942019-04-12 21:58:24 +02003344 # Silence destructor error
3345 buffer.close = lambda: None
3346 txt.flush = lambda: None
3347
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003348 def test_nonnormalized_close_error_on_close(self):
3349 # Issue #21677
3350 buffer = self.BytesIO(self.testdata)
3351 def bad_flush():
3352 raise non_existing_flush
3353 def bad_close():
3354 raise non_existing_close
3355 buffer.close = bad_close
3356 txt = self.TextIOWrapper(buffer, encoding="ascii")
3357 txt.flush = bad_flush
3358 with self.assertRaises(NameError) as err: # exception not swallowed
3359 txt.close()
3360 self.assertIn('non_existing_close', str(err.exception))
3361 self.assertIsInstance(err.exception.__context__, NameError)
3362 self.assertIn('non_existing_flush', str(err.exception.__context__))
3363 self.assertFalse(txt.closed)
3364
Victor Stinner472f7942019-04-12 21:58:24 +02003365 # Silence destructor error
3366 buffer.close = lambda: None
3367 txt.flush = lambda: None
3368
Antoine Pitrou6be88762010-05-03 16:48:20 +00003369 def test_multi_close(self):
3370 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3371 txt.close()
3372 txt.close()
3373 txt.close()
3374 self.assertRaises(ValueError, txt.flush)
3375
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003376 def test_unseekable(self):
3377 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3378 self.assertRaises(self.UnsupportedOperation, txt.tell)
3379 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3380
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003381 def test_readonly_attributes(self):
3382 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3383 buf = self.BytesIO(self.testdata)
3384 with self.assertRaises(AttributeError):
3385 txt.buffer = buf
3386
Antoine Pitroue96ec682011-07-23 21:46:35 +02003387 def test_rawio(self):
3388 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3389 # that subprocess.Popen() can have the required unbuffered
3390 # semantics with universal_newlines=True.
3391 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3392 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3393 # Reads
3394 self.assertEqual(txt.read(4), 'abcd')
3395 self.assertEqual(txt.readline(), 'efghi\n')
3396 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3397
3398 def test_rawio_write_through(self):
3399 # Issue #12591: with write_through=True, writes don't need a flush
3400 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3401 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3402 write_through=True)
3403 txt.write('1')
3404 txt.write('23\n4')
3405 txt.write('5')
3406 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3407
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003408 def test_bufio_write_through(self):
3409 # Issue #21396: write_through=True doesn't force a flush()
3410 # on the underlying binary buffered object.
3411 flush_called, write_called = [], []
3412 class BufferedWriter(self.BufferedWriter):
3413 def flush(self, *args, **kwargs):
3414 flush_called.append(True)
3415 return super().flush(*args, **kwargs)
3416 def write(self, *args, **kwargs):
3417 write_called.append(True)
3418 return super().write(*args, **kwargs)
3419
3420 rawio = self.BytesIO()
3421 data = b"a"
3422 bufio = BufferedWriter(rawio, len(data)*2)
3423 textio = self.TextIOWrapper(bufio, encoding='ascii',
3424 write_through=True)
3425 # write to the buffered io but don't overflow the buffer
3426 text = data.decode('ascii')
3427 textio.write(text)
3428
3429 # buffer.flush is not called with write_through=True
3430 self.assertFalse(flush_called)
3431 # buffer.write *is* called with write_through=True
3432 self.assertTrue(write_called)
3433 self.assertEqual(rawio.getvalue(), b"") # no flush
3434
3435 write_called = [] # reset
3436 textio.write(text * 10) # total content is larger than bufio buffer
3437 self.assertTrue(write_called)
3438 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3439
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003440 def test_reconfigure_write_through(self):
3441 raw = self.MockRawIO([])
3442 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3443 t.write('1')
3444 t.reconfigure(write_through=True) # implied flush
3445 self.assertEqual(t.write_through, True)
3446 self.assertEqual(b''.join(raw._write_stack), b'1')
3447 t.write('23')
3448 self.assertEqual(b''.join(raw._write_stack), b'123')
3449 t.reconfigure(write_through=False)
3450 self.assertEqual(t.write_through, False)
3451 t.write('45')
3452 t.flush()
3453 self.assertEqual(b''.join(raw._write_stack), b'12345')
3454 # Keeping default value
3455 t.reconfigure()
3456 t.reconfigure(write_through=None)
3457 self.assertEqual(t.write_through, False)
3458 t.reconfigure(write_through=True)
3459 t.reconfigure()
3460 t.reconfigure(write_through=None)
3461 self.assertEqual(t.write_through, True)
3462
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003463 def test_read_nonbytes(self):
3464 # Issue #17106
3465 # Crash when underlying read() returns non-bytes
3466 t = self.TextIOWrapper(self.StringIO('a'))
3467 self.assertRaises(TypeError, t.read, 1)
3468 t = self.TextIOWrapper(self.StringIO('a'))
3469 self.assertRaises(TypeError, t.readline)
3470 t = self.TextIOWrapper(self.StringIO('a'))
3471 self.assertRaises(TypeError, t.read)
3472
Oren Milmana5b4ea12017-08-25 21:14:54 +03003473 def test_illegal_encoder(self):
3474 # Issue 31271: Calling write() while the return value of encoder's
3475 # encode() is invalid shouldn't cause an assertion failure.
3476 rot13 = codecs.lookup("rot13")
3477 with support.swap_attr(rot13, '_is_text_encoding', True):
3478 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3479 self.assertRaises(TypeError, t.write, 'bar')
3480
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003481 def test_illegal_decoder(self):
3482 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003483 # Bypass the early encoding check added in issue 20404
3484 def _make_illegal_wrapper():
3485 quopri = codecs.lookup("quopri")
3486 quopri._is_text_encoding = True
3487 try:
3488 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3489 newline='\n', encoding="quopri")
3490 finally:
3491 quopri._is_text_encoding = False
3492 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003493 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003494 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003495 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003496 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003497 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003498 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003499 self.assertRaises(TypeError, t.read)
3500
Oren Milmanba7d7362017-08-29 11:58:27 +03003501 # Issue 31243: calling read() while the return value of decoder's
3502 # getstate() is invalid should neither crash the interpreter nor
3503 # raise a SystemError.
3504 def _make_very_illegal_wrapper(getstate_ret_val):
3505 class BadDecoder:
3506 def getstate(self):
3507 return getstate_ret_val
3508 def _get_bad_decoder(dummy):
3509 return BadDecoder()
3510 quopri = codecs.lookup("quopri")
3511 with support.swap_attr(quopri, 'incrementaldecoder',
3512 _get_bad_decoder):
3513 return _make_illegal_wrapper()
3514 t = _make_very_illegal_wrapper(42)
3515 self.assertRaises(TypeError, t.read, 42)
3516 t = _make_very_illegal_wrapper(())
3517 self.assertRaises(TypeError, t.read, 42)
3518 t = _make_very_illegal_wrapper((1, 2))
3519 self.assertRaises(TypeError, t.read, 42)
3520
Antoine Pitrou712cb732013-12-21 15:51:54 +01003521 def _check_create_at_shutdown(self, **kwargs):
3522 # Issue #20037: creating a TextIOWrapper at shutdown
3523 # shouldn't crash the interpreter.
3524 iomod = self.io.__name__
3525 code = """if 1:
3526 import codecs
3527 import {iomod} as io
3528
3529 # Avoid looking up codecs at shutdown
3530 codecs.lookup('utf-8')
3531
3532 class C:
3533 def __init__(self):
3534 self.buf = io.BytesIO()
3535 def __del__(self):
3536 io.TextIOWrapper(self.buf, **{kwargs})
3537 print("ok")
3538 c = C()
3539 """.format(iomod=iomod, kwargs=kwargs)
3540 return assert_python_ok("-c", code)
3541
3542 def test_create_at_shutdown_without_encoding(self):
3543 rc, out, err = self._check_create_at_shutdown()
3544 if err:
3545 # Can error out with a RuntimeError if the module state
3546 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003547 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003548 else:
3549 self.assertEqual("ok", out.decode().strip())
3550
3551 def test_create_at_shutdown_with_encoding(self):
3552 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3553 errors='strict')
3554 self.assertFalse(err)
3555 self.assertEqual("ok", out.decode().strip())
3556
Antoine Pitroub8503892014-04-29 10:14:02 +02003557 def test_read_byteslike(self):
3558 r = MemviewBytesIO(b'Just some random string\n')
3559 t = self.TextIOWrapper(r, 'utf-8')
3560
3561 # TextIOwrapper will not read the full string, because
3562 # we truncate it to a multiple of the native int size
3563 # so that we can construct a more complex memoryview.
3564 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3565
3566 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3567
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003568 def test_issue22849(self):
3569 class F(object):
3570 def readable(self): return True
3571 def writable(self): return True
3572 def seekable(self): return True
3573
3574 for i in range(10):
3575 try:
3576 self.TextIOWrapper(F(), encoding='utf-8')
3577 except Exception:
3578 pass
3579
3580 F.tell = lambda x: 0
3581 t = self.TextIOWrapper(F(), encoding='utf-8')
3582
INADA Naoki507434f2017-12-21 09:59:53 +09003583 def test_reconfigure_encoding_read(self):
3584 # latin1 -> utf8
3585 # (latin1 can decode utf-8 encoded string)
3586 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3587 raw = self.BytesIO(data)
3588 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3589 self.assertEqual(txt.readline(), 'abc\xe9\n')
3590 with self.assertRaises(self.UnsupportedOperation):
3591 txt.reconfigure(encoding='utf-8')
3592 with self.assertRaises(self.UnsupportedOperation):
3593 txt.reconfigure(newline=None)
3594
3595 def test_reconfigure_write_fromascii(self):
3596 # ascii has a specific encodefunc in the C implementation,
3597 # but utf-8-sig has not. Make sure that we get rid of the
3598 # cached encodefunc when we switch encoders.
3599 raw = self.BytesIO()
3600 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3601 txt.write('foo\n')
3602 txt.reconfigure(encoding='utf-8-sig')
3603 txt.write('\xe9\n')
3604 txt.flush()
3605 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3606
3607 def test_reconfigure_write(self):
3608 # latin -> utf8
3609 raw = self.BytesIO()
3610 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3611 txt.write('abc\xe9\n')
3612 txt.reconfigure(encoding='utf-8')
3613 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3614 txt.write('d\xe9f\n')
3615 txt.flush()
3616 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3617
3618 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3619 # the file
3620 raw = self.BytesIO()
3621 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3622 txt.write('abc\n')
3623 txt.reconfigure(encoding='utf-8-sig')
3624 txt.write('d\xe9f\n')
3625 txt.flush()
3626 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3627
3628 def test_reconfigure_write_non_seekable(self):
3629 raw = self.BytesIO()
3630 raw.seekable = lambda: False
3631 raw.seek = None
3632 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3633 txt.write('abc\n')
3634 txt.reconfigure(encoding='utf-8-sig')
3635 txt.write('d\xe9f\n')
3636 txt.flush()
3637
3638 # If the raw stream is not seekable, there'll be a BOM
3639 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3640
3641 def test_reconfigure_defaults(self):
3642 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3643 txt.reconfigure(encoding=None)
3644 self.assertEqual(txt.encoding, 'ascii')
3645 self.assertEqual(txt.errors, 'replace')
3646 txt.write('LF\n')
3647
3648 txt.reconfigure(newline='\r\n')
3649 self.assertEqual(txt.encoding, 'ascii')
3650 self.assertEqual(txt.errors, 'replace')
3651
3652 txt.reconfigure(errors='ignore')
3653 self.assertEqual(txt.encoding, 'ascii')
3654 self.assertEqual(txt.errors, 'ignore')
3655 txt.write('CRLF\n')
3656
3657 txt.reconfigure(encoding='utf-8', newline=None)
3658 self.assertEqual(txt.errors, 'strict')
3659 txt.seek(0)
3660 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3661
3662 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3663
3664 def test_reconfigure_newline(self):
3665 raw = self.BytesIO(b'CR\rEOF')
3666 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3667 txt.reconfigure(newline=None)
3668 self.assertEqual(txt.readline(), 'CR\n')
3669 raw = self.BytesIO(b'CR\rEOF')
3670 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3671 txt.reconfigure(newline='')
3672 self.assertEqual(txt.readline(), 'CR\r')
3673 raw = self.BytesIO(b'CR\rLF\nEOF')
3674 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3675 txt.reconfigure(newline='\n')
3676 self.assertEqual(txt.readline(), 'CR\rLF\n')
3677 raw = self.BytesIO(b'LF\nCR\rEOF')
3678 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3679 txt.reconfigure(newline='\r')
3680 self.assertEqual(txt.readline(), 'LF\nCR\r')
3681 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3682 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3683 txt.reconfigure(newline='\r\n')
3684 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3685
3686 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3687 txt.reconfigure(newline=None)
3688 txt.write('linesep\n')
3689 txt.reconfigure(newline='')
3690 txt.write('LF\n')
3691 txt.reconfigure(newline='\n')
3692 txt.write('LF\n')
3693 txt.reconfigure(newline='\r')
3694 txt.write('CR\n')
3695 txt.reconfigure(newline='\r\n')
3696 txt.write('CRLF\n')
3697 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3698 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3699
Zackery Spytz23db9352018-06-29 04:14:58 -06003700 def test_issue25862(self):
3701 # Assertion failures occurred in tell() after read() and write().
3702 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3703 t.read(1)
3704 t.read()
3705 t.tell()
3706 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3707 t.read(1)
3708 t.write('x')
3709 t.tell()
3710
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003711
Antoine Pitroub8503892014-04-29 10:14:02 +02003712class MemviewBytesIO(io.BytesIO):
3713 '''A BytesIO object whose read method returns memoryviews
3714 rather than bytes'''
3715
3716 def read1(self, len_):
3717 return _to_memoryview(super().read1(len_))
3718
3719 def read(self, len_):
3720 return _to_memoryview(super().read(len_))
3721
3722def _to_memoryview(buf):
3723 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3724
3725 arr = array.array('i')
3726 idx = len(buf) - len(buf) % arr.itemsize
3727 arr.frombytes(buf[:idx])
3728 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003729
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003731class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003732 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003733 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003734
3735 def test_initialization(self):
3736 r = self.BytesIO(b"\xc3\xa9\n\n")
3737 b = self.BufferedReader(r, 1000)
3738 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003739 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3740 self.assertRaises(ValueError, t.read)
3741
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003742 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3743 self.assertRaises(Exception, repr, t)
3744
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003745 def test_garbage_collection(self):
3746 # C TextIOWrapper objects are collected, and collecting them flushes
3747 # all data to disk.
3748 # The Python version has __del__, so it ends in gc.garbage instead.
Hai Shi883bc632020-07-06 17:12:49 +08003749 with warnings_helper.check_warnings(('', ResourceWarning)):
3750 rawio = io.FileIO(os_helper.TESTFN, "wb")
Antoine Pitrou796564c2013-07-30 19:59:21 +02003751 b = self.BufferedWriter(rawio)
3752 t = self.TextIOWrapper(b, encoding="ascii")
3753 t.write("456def")
3754 t.x = t
3755 wr = weakref.ref(t)
3756 del t
3757 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003758 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08003759 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003760 self.assertEqual(f.read(), b"456def")
3761
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003762 def test_rwpair_cleared_before_textio(self):
3763 # Issue 13070: TextIOWrapper's finalization would crash when called
3764 # after the reference to the underlying BufferedRWPair's writer got
3765 # cleared by the GC.
3766 for i in range(1000):
3767 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3768 t1 = self.TextIOWrapper(b1, encoding="ascii")
3769 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3770 t2 = self.TextIOWrapper(b2, encoding="ascii")
3771 # circular references
3772 t1.buddy = t2
3773 t2.buddy = t1
3774 support.gc_collect()
3775
Zackery Spytz842acaa2018-12-17 07:52:45 -07003776 def test_del__CHUNK_SIZE_SystemError(self):
3777 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3778 with self.assertRaises(AttributeError):
3779 del t._CHUNK_SIZE
3780
Inada Naoki01806d52021-02-22 08:29:30 +09003781 def test_internal_buffer_size(self):
3782 # bpo-43260: TextIOWrapper's internal buffer should not store
3783 # data larger than chunk size.
3784 chunk_size = 8192 # default chunk size, updated later
3785
3786 class MockIO(self.MockRawIO):
3787 def write(self, data):
3788 if len(data) > chunk_size:
3789 raise RuntimeError
3790 return super().write(data)
3791
3792 buf = MockIO()
3793 t = self.TextIOWrapper(buf, encoding="ascii")
3794 chunk_size = t._CHUNK_SIZE
3795 t.write("abc")
3796 t.write("def")
3797 # default chunk size is 8192 bytes so t don't write data to buf.
3798 self.assertEqual([], buf._write_stack)
3799
3800 with self.assertRaises(RuntimeError):
3801 t.write("x"*(chunk_size+1))
3802
3803 self.assertEqual([b"abcdef"], buf._write_stack)
3804 t.write("ghi")
3805 t.write("x"*chunk_size)
3806 self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
3807
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003808
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003809class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003810 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003811 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003812
3813
3814class IncrementalNewlineDecoderTest(unittest.TestCase):
3815
3816 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003817 # UTF-8 specific tests for a newline decoder
3818 def _check_decode(b, s, **kwargs):
3819 # We exercise getstate() / setstate() as well as decode()
3820 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003821 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003822 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003823 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003824
Antoine Pitrou180a3362008-12-14 16:36:46 +00003825 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003826
Antoine Pitrou180a3362008-12-14 16:36:46 +00003827 _check_decode(b'\xe8', "")
3828 _check_decode(b'\xa2', "")
3829 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003830
Antoine Pitrou180a3362008-12-14 16:36:46 +00003831 _check_decode(b'\xe8', "")
3832 _check_decode(b'\xa2', "")
3833 _check_decode(b'\x88', "\u8888")
3834
3835 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003836 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3837
Antoine Pitrou180a3362008-12-14 16:36:46 +00003838 decoder.reset()
3839 _check_decode(b'\n', "\n")
3840 _check_decode(b'\r', "")
3841 _check_decode(b'', "\n", final=True)
3842 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003843
Antoine Pitrou180a3362008-12-14 16:36:46 +00003844 _check_decode(b'\r', "")
3845 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003846
Antoine Pitrou180a3362008-12-14 16:36:46 +00003847 _check_decode(b'\r\r\n', "\n\n")
3848 _check_decode(b'\r', "")
3849 _check_decode(b'\r', "\n")
3850 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003851
Antoine Pitrou180a3362008-12-14 16:36:46 +00003852 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3853 _check_decode(b'\xe8\xa2\x88', "\u8888")
3854 _check_decode(b'\n', "\n")
3855 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3856 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003858 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003859 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003860 if encoding is not None:
3861 encoder = codecs.getincrementalencoder(encoding)()
3862 def _decode_bytewise(s):
3863 # Decode one byte at a time
3864 for b in encoder.encode(s):
3865 result.append(decoder.decode(bytes([b])))
3866 else:
3867 encoder = None
3868 def _decode_bytewise(s):
3869 # Decode one char at a time
3870 for c in s:
3871 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003872 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003873 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003874 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003875 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003876 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003877 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003878 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003879 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003880 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003881 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003882 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003883 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003884 input = "abc"
3885 if encoder is not None:
3886 encoder.reset()
3887 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003888 self.assertEqual(decoder.decode(input), "abc")
3889 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003890
3891 def test_newline_decoder(self):
3892 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003893 # None meaning the IncrementalNewlineDecoder takes unicode input
3894 # rather than bytes input
3895 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003896 'utf-16', 'utf-16-le', 'utf-16-be',
3897 'utf-32', 'utf-32-le', 'utf-32-be',
3898 )
3899 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003900 decoder = enc and codecs.getincrementaldecoder(enc)()
3901 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3902 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003903 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003904 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3905 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003906 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003907
Antoine Pitrou66913e22009-03-06 23:40:56 +00003908 def test_newline_bytes(self):
3909 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3910 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003911 self.assertEqual(dec.newlines, None)
3912 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3913 self.assertEqual(dec.newlines, None)
3914 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3915 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003916 dec = self.IncrementalNewlineDecoder(None, translate=False)
3917 _check(dec)
3918 dec = self.IncrementalNewlineDecoder(None, translate=True)
3919 _check(dec)
3920
Xiang Zhangb08746b2018-10-31 19:49:16 +08003921 def test_translate(self):
3922 # issue 35062
3923 for translate in (-2, -1, 1, 2):
3924 decoder = codecs.getincrementaldecoder("utf-8")()
3925 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3926 self.check_newline_decoding_utf8(decoder)
3927 decoder = codecs.getincrementaldecoder("utf-8")()
3928 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3929 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003931class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3932 pass
3933
3934class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3935 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003936
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003937
Guido van Rossum01a27522007-03-07 01:00:12 +00003938# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003939
Guido van Rossum5abbf752007-08-27 17:39:33 +00003940class MiscIOTest(unittest.TestCase):
3941
Barry Warsaw40e82462008-11-20 20:14:50 +00003942 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08003943 os_helper.unlink(os_helper.TESTFN)
Barry Warsaw40e82462008-11-20 20:14:50 +00003944
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003945 def test___all__(self):
3946 for name in self.io.__all__:
3947 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003948 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003949 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003950 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003951 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003952 self.assertTrue(issubclass(obj, Exception), name)
3953 elif not name.startswith("SEEK_"):
3954 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003955
Barry Warsaw40e82462008-11-20 20:14:50 +00003956 def test_attributes(self):
Hai Shi883bc632020-07-06 17:12:49 +08003957 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003958 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003959 f.close()
3960
Hai Shi883bc632020-07-06 17:12:49 +08003961 with warnings_helper.check_warnings(('', DeprecationWarning)):
3962 f = self.open(os_helper.TESTFN, "U")
3963 self.assertEqual(f.name, os_helper.TESTFN)
3964 self.assertEqual(f.buffer.name, os_helper.TESTFN)
3965 self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
Victor Stinner942f7a22020-03-04 18:50:22 +01003966 self.assertEqual(f.mode, "U")
3967 self.assertEqual(f.buffer.mode, "rb")
3968 self.assertEqual(f.buffer.raw.mode, "rb")
3969 f.close()
3970
Hai Shi883bc632020-07-06 17:12:49 +08003971 f = self.open(os_helper.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003972 self.assertEqual(f.mode, "w+")
3973 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3974 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003976 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003977 self.assertEqual(g.mode, "wb")
3978 self.assertEqual(g.raw.mode, "wb")
3979 self.assertEqual(g.name, f.fileno())
3980 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003981 f.close()
3982 g.close()
3983
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003984 def test_open_pipe_with_append(self):
3985 # bpo-27805: Ignore ESPIPE from lseek() in open().
3986 r, w = os.pipe()
3987 self.addCleanup(os.close, r)
3988 f = self.open(w, 'a')
3989 self.addCleanup(f.close)
3990 # Check that the file is marked non-seekable. On Windows, however, lseek
3991 # somehow succeeds on pipes.
3992 if sys.platform != 'win32':
3993 self.assertFalse(f.seekable())
3994
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003995 def test_io_after_close(self):
3996 for kwargs in [
3997 {"mode": "w"},
3998 {"mode": "wb"},
3999 {"mode": "w", "buffering": 1},
4000 {"mode": "w", "buffering": 2},
4001 {"mode": "wb", "buffering": 0},
4002 {"mode": "r"},
4003 {"mode": "rb"},
4004 {"mode": "r", "buffering": 1},
4005 {"mode": "r", "buffering": 2},
4006 {"mode": "rb", "buffering": 0},
4007 {"mode": "w+"},
4008 {"mode": "w+b"},
4009 {"mode": "w+", "buffering": 1},
4010 {"mode": "w+", "buffering": 2},
4011 {"mode": "w+b", "buffering": 0},
4012 ]:
Hai Shi883bc632020-07-06 17:12:49 +08004013 f = self.open(os_helper.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004014 f.close()
4015 self.assertRaises(ValueError, f.flush)
4016 self.assertRaises(ValueError, f.fileno)
4017 self.assertRaises(ValueError, f.isatty)
4018 self.assertRaises(ValueError, f.__iter__)
4019 if hasattr(f, "peek"):
4020 self.assertRaises(ValueError, f.peek, 1)
4021 self.assertRaises(ValueError, f.read)
4022 if hasattr(f, "read1"):
4023 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00004024 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02004025 if hasattr(f, "readall"):
4026 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004027 if hasattr(f, "readinto"):
4028 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07004029 if hasattr(f, "readinto1"):
4030 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004031 self.assertRaises(ValueError, f.readline)
4032 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08004033 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004034 self.assertRaises(ValueError, f.seek, 0)
4035 self.assertRaises(ValueError, f.tell)
4036 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004037 self.assertRaises(ValueError, f.write,
4038 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004039 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004040 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004041
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004042 def test_blockingioerror(self):
4043 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004044 class C(str):
4045 pass
4046 c = C("")
4047 b = self.BlockingIOError(1, c)
4048 c.b = b
4049 b.c = c
4050 wr = weakref.ref(c)
4051 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00004052 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004053 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004054
4055 def test_abcs(self):
4056 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00004057 self.assertIsInstance(self.IOBase, abc.ABCMeta)
4058 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4059 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4060 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004061
4062 def _check_abc_inheritance(self, abcmodule):
Hai Shi883bc632020-07-06 17:12:49 +08004063 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004064 self.assertIsInstance(f, abcmodule.IOBase)
4065 self.assertIsInstance(f, abcmodule.RawIOBase)
4066 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4067 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004068 with self.open(os_helper.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004069 self.assertIsInstance(f, abcmodule.IOBase)
4070 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4071 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4072 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004073 with self.open(os_helper.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004074 self.assertIsInstance(f, abcmodule.IOBase)
4075 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4076 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4077 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004078
4079 def test_abc_inheritance(self):
4080 # Test implementations inherit from their respective ABCs
4081 self._check_abc_inheritance(self)
4082
4083 def test_abc_inheritance_official(self):
4084 # Test implementations inherit from the official ABCs of the
4085 # baseline "io" module.
4086 self._check_abc_inheritance(io)
4087
Antoine Pitroue033e062010-10-29 10:38:18 +00004088 def _check_warn_on_dealloc(self, *args, **kwargs):
4089 f = open(*args, **kwargs)
4090 r = repr(f)
4091 with self.assertWarns(ResourceWarning) as cm:
4092 f = None
4093 support.gc_collect()
4094 self.assertIn(r, str(cm.warning.args[0]))
4095
4096 def test_warn_on_dealloc(self):
Hai Shi883bc632020-07-06 17:12:49 +08004097 self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4098 self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
4099 self._check_warn_on_dealloc(os_helper.TESTFN, "w")
Antoine Pitroue033e062010-10-29 10:38:18 +00004100
4101 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4102 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004103 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004104 for fd in fds:
4105 try:
4106 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004107 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004108 if e.errno != errno.EBADF:
4109 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004110 self.addCleanup(cleanup_fds)
4111 r, w = os.pipe()
4112 fds += r, w
4113 self._check_warn_on_dealloc(r, *args, **kwargs)
4114 # When using closefd=False, there's no warning
4115 r, w = os.pipe()
4116 fds += r, w
Hai Shi883bc632020-07-06 17:12:49 +08004117 with warnings_helper.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004118 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004119
4120 def test_warn_on_dealloc_fd(self):
4121 self._check_warn_on_dealloc_fd("rb", buffering=0)
4122 self._check_warn_on_dealloc_fd("rb")
4123 self._check_warn_on_dealloc_fd("r")
4124
4125
Antoine Pitrou243757e2010-11-05 21:15:39 +00004126 def test_pickling(self):
4127 # Pickling file objects is forbidden
4128 for kwargs in [
4129 {"mode": "w"},
4130 {"mode": "wb"},
4131 {"mode": "wb", "buffering": 0},
4132 {"mode": "r"},
4133 {"mode": "rb"},
4134 {"mode": "rb", "buffering": 0},
4135 {"mode": "w+"},
4136 {"mode": "w+b"},
4137 {"mode": "w+b", "buffering": 0},
4138 ]:
4139 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
Hai Shi883bc632020-07-06 17:12:49 +08004140 with self.open(os_helper.TESTFN, **kwargs) as f:
Antoine Pitrou243757e2010-11-05 21:15:39 +00004141 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4142
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004143 def test_nonblock_pipe_write_bigbuf(self):
4144 self._test_nonblock_pipe_write(16*1024)
4145
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004146 def test_nonblock_pipe_write_smallbuf(self):
4147 self._test_nonblock_pipe_write(1024)
4148
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004149 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4150 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004151 def _test_nonblock_pipe_write(self, bufsize):
4152 sent = []
4153 received = []
4154 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004155 os.set_blocking(r, False)
4156 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004157
4158 # To exercise all code paths in the C implementation we need
4159 # to play with buffer sizes. For instance, if we choose a
4160 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4161 # then we will never get a partial write of the buffer.
4162 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4163 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4164
4165 with rf, wf:
4166 for N in 9999, 73, 7574:
4167 try:
4168 i = 0
4169 while True:
4170 msg = bytes([i % 26 + 97]) * N
4171 sent.append(msg)
4172 wf.write(msg)
4173 i += 1
4174
4175 except self.BlockingIOError as e:
4176 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004177 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004178 sent[-1] = sent[-1][:e.characters_written]
4179 received.append(rf.read())
4180 msg = b'BLOCKED'
4181 wf.write(msg)
4182 sent.append(msg)
4183
4184 while True:
4185 try:
4186 wf.flush()
4187 break
4188 except self.BlockingIOError as e:
4189 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004190 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004191 self.assertEqual(e.characters_written, 0)
4192 received.append(rf.read())
4193
4194 received += iter(rf.read, None)
4195
4196 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004197 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004198 self.assertTrue(wf.closed)
4199 self.assertTrue(rf.closed)
4200
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004201 def test_create_fail(self):
4202 # 'x' mode fails if file is existing
Hai Shi883bc632020-07-06 17:12:49 +08004203 with self.open(os_helper.TESTFN, 'w'):
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004204 pass
Hai Shi883bc632020-07-06 17:12:49 +08004205 self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x')
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004206
4207 def test_create_writes(self):
4208 # 'x' mode opens for writing
Hai Shi883bc632020-07-06 17:12:49 +08004209 with self.open(os_helper.TESTFN, 'xb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004210 f.write(b"spam")
Hai Shi883bc632020-07-06 17:12:49 +08004211 with self.open(os_helper.TESTFN, 'rb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004212 self.assertEqual(b"spam", f.read())
4213
Christian Heimes7b648752012-09-10 14:48:43 +02004214 def test_open_allargs(self):
4215 # there used to be a buffer overflow in the parser for rawmode
Hai Shi883bc632020-07-06 17:12:49 +08004216 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+')
Christian Heimes7b648752012-09-10 14:48:43 +02004217
Victor Stinner22eb6892019-06-26 00:51:05 +02004218 def test_check_encoding_errors(self):
4219 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4220 # arguments in dev mode
4221 mod = self.io.__name__
4222 filename = __file__
4223 invalid = 'Boom, Shaka Laka, Boom!'
4224 code = textwrap.dedent(f'''
4225 import sys
4226 from {mod} import open, TextIOWrapper
4227
4228 try:
4229 open({filename!r}, encoding={invalid!r})
4230 except LookupError:
4231 pass
4232 else:
4233 sys.exit(21)
4234
4235 try:
4236 open({filename!r}, errors={invalid!r})
4237 except LookupError:
4238 pass
4239 else:
4240 sys.exit(22)
4241
4242 fp = open({filename!r}, "rb")
4243 with fp:
4244 try:
4245 TextIOWrapper(fp, encoding={invalid!r})
4246 except LookupError:
4247 pass
4248 else:
4249 sys.exit(23)
4250
4251 try:
4252 TextIOWrapper(fp, errors={invalid!r})
4253 except LookupError:
4254 pass
4255 else:
4256 sys.exit(24)
4257
4258 sys.exit(10)
4259 ''')
4260 proc = assert_python_failure('-X', 'dev', '-c', code)
4261 self.assertEqual(proc.rc, 10, proc)
4262
Inada Naoki48274832021-03-29 12:28:14 +09004263 def test_check_encoding_warning(self):
4264 # PEP 597: Raise warning when encoding is not specified
4265 # and sys.flags.warn_default_encoding is set.
4266 mod = self.io.__name__
4267 filename = __file__
4268 code = textwrap.dedent(f'''\
4269 import sys
4270 from {mod} import open, TextIOWrapper
4271 import pathlib
4272
4273 with open({filename!r}) as f: # line 5
4274 pass
4275
4276 pathlib.Path({filename!r}).read_text() # line 8
4277 ''')
4278 proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
4279 warnings = proc.err.splitlines()
4280 self.assertEqual(len(warnings), 2)
4281 self.assertTrue(
4282 warnings[0].startswith(b"<string>:5: EncodingWarning: "))
4283 self.assertTrue(
4284 warnings[1].startswith(b"<string>:8: EncodingWarning: "))
4285
Christian Heimes7b648752012-09-10 14:48:43 +02004286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004287class CMiscIOTest(MiscIOTest):
4288 io = io
4289
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004290 def test_readinto_buffer_overflow(self):
4291 # Issue #18025
4292 class BadReader(self.io.BufferedIOBase):
4293 def read(self, n=-1):
4294 return b'x' * 10**6
4295 bufio = BadReader()
4296 b = bytearray(2)
4297 self.assertRaises(ValueError, bufio.readinto, b)
4298
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004299 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4300 # Issue #23309: deadlocks at shutdown should be avoided when a
4301 # daemon thread and the main thread both write to a file.
4302 code = """if 1:
4303 import sys
4304 import time
4305 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004306 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004307
4308 file = sys.{stream_name}
4309
4310 def run():
4311 while True:
4312 file.write('.')
4313 file.flush()
4314
Victor Stinner2a1aed02017-04-21 17:59:23 +02004315 crash = SuppressCrashReport()
4316 crash.__enter__()
4317 # don't call __exit__(): the crash occurs at Python shutdown
4318
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004319 thread = threading.Thread(target=run)
4320 thread.daemon = True
4321 thread.start()
4322
4323 time.sleep(0.5)
4324 file.write('!')
4325 file.flush()
4326 """.format_map(locals())
4327 res, _ = run_python_until_end("-c", code)
4328 err = res.err.decode()
4329 if res.rc != 0:
4330 # Failure: should be a fatal error
Victor Stinner9e5d30c2020-03-07 00:54:20 +01004331 pattern = (r"Fatal Python error: _enter_buffered_busy: "
4332 r"could not acquire lock "
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004333 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4334 r"at interpreter shutdown, possibly due to "
4335 r"daemon threads".format_map(locals()))
4336 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004337 else:
4338 self.assertFalse(err.strip('.!'))
4339
4340 def test_daemon_threads_shutdown_stdout_deadlock(self):
4341 self.check_daemon_threads_shutdown_deadlock('stdout')
4342
4343 def test_daemon_threads_shutdown_stderr_deadlock(self):
4344 self.check_daemon_threads_shutdown_deadlock('stderr')
4345
4346
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004347class PyMiscIOTest(MiscIOTest):
4348 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004349
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004350
4351@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4352class SignalsTest(unittest.TestCase):
4353
4354 def setUp(self):
4355 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4356
4357 def tearDown(self):
4358 signal.signal(signal.SIGALRM, self.oldalrm)
4359
4360 def alarm_interrupt(self, sig, frame):
4361 1/0
4362
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004363 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4364 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004365 invokes the signal handler, and bubbles up the exception raised
4366 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004367 read_results = []
4368 def _read():
4369 s = os.read(r, 1)
4370 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004371
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004372 t = threading.Thread(target=_read)
4373 t.daemon = True
4374 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004375 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004376 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004377 try:
4378 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004379 if hasattr(signal, 'pthread_sigmask'):
4380 # create the thread with SIGALRM signal blocked
4381 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4382 t.start()
4383 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4384 else:
4385 t.start()
4386
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004387 # Fill the pipe enough that the write will be blocking.
4388 # It will be interrupted by the timer armed above. Since the
4389 # other thread has read one byte, the low-level write will
4390 # return with a successful (partial) result rather than an EINTR.
4391 # The buffered IO layer must check for pending signal
4392 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004393 signal.alarm(1)
4394 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004395 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004396 finally:
4397 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004398 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004399 # We got one byte, get another one and check that it isn't a
4400 # repeat of the first one.
4401 read_results.append(os.read(r, 1))
4402 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4403 finally:
4404 os.close(w)
4405 os.close(r)
4406 # This is deliberate. If we didn't close the file descriptor
4407 # before closing wio, wio would try to flush its internal
4408 # buffer, and block again.
4409 try:
4410 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004411 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004412 if e.errno != errno.EBADF:
4413 raise
4414
4415 def test_interrupted_write_unbuffered(self):
4416 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4417
4418 def test_interrupted_write_buffered(self):
4419 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4420
4421 def test_interrupted_write_text(self):
4422 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4423
Brett Cannon31f59292011-02-21 19:29:56 +00004424 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004425 def check_reentrant_write(self, data, **fdopen_kwargs):
4426 def on_alarm(*args):
4427 # Will be called reentrantly from the same thread
4428 wio.write(data)
4429 1/0
4430 signal.signal(signal.SIGALRM, on_alarm)
4431 r, w = os.pipe()
4432 wio = self.io.open(w, **fdopen_kwargs)
4433 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004434 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004435 # Either the reentrant call to wio.write() fails with RuntimeError,
4436 # or the signal handler raises ZeroDivisionError.
4437 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4438 while 1:
4439 for i in range(100):
4440 wio.write(data)
4441 wio.flush()
4442 # Make sure the buffer doesn't fill up and block further writes
4443 os.read(r, len(data) * 100)
4444 exc = cm.exception
4445 if isinstance(exc, RuntimeError):
4446 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4447 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004448 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004449 wio.close()
4450 os.close(r)
4451
4452 def test_reentrant_write_buffered(self):
4453 self.check_reentrant_write(b"xy", mode="wb")
4454
4455 def test_reentrant_write_text(self):
4456 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4457
Antoine Pitrou707ce822011-02-25 21:24:11 +00004458 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4459 """Check that a buffered read, when it gets interrupted (either
4460 returning a partial result or EINTR), properly invokes the signal
4461 handler and retries if the latter returned successfully."""
4462 r, w = os.pipe()
4463 fdopen_kwargs["closefd"] = False
4464 def alarm_handler(sig, frame):
4465 os.write(w, b"bar")
4466 signal.signal(signal.SIGALRM, alarm_handler)
4467 try:
4468 rio = self.io.open(r, **fdopen_kwargs)
4469 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004470 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004471 # Expected behaviour:
4472 # - first raw read() returns partial b"foo"
4473 # - second raw read() returns EINTR
4474 # - third raw read() returns b"bar"
4475 self.assertEqual(decode(rio.read(6)), "foobar")
4476 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004477 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004478 rio.close()
4479 os.close(w)
4480 os.close(r)
4481
Antoine Pitrou20db5112011-08-19 20:32:34 +02004482 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004483 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4484 mode="rb")
4485
Antoine Pitrou20db5112011-08-19 20:32:34 +02004486 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004487 self.check_interrupted_read_retry(lambda x: x,
4488 mode="r")
4489
Antoine Pitrou707ce822011-02-25 21:24:11 +00004490 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4491 """Check that a buffered write, when it gets interrupted (either
4492 returning a partial result or EINTR), properly invokes the signal
4493 handler and retries if the latter returned successfully."""
Hai Shi883bc632020-07-06 17:12:49 +08004494 select = import_helper.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004495
Antoine Pitrou707ce822011-02-25 21:24:11 +00004496 # A quantity that exceeds the buffer size of an anonymous pipe's
4497 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004498 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004499 r, w = os.pipe()
4500 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004501
Antoine Pitrou707ce822011-02-25 21:24:11 +00004502 # We need a separate thread to read from the pipe and allow the
4503 # write() to finish. This thread is started after the SIGALRM is
4504 # received (forcing a first EINTR in write()).
4505 read_results = []
4506 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004507 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004508 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004509 try:
4510 while not write_finished:
4511 while r in select.select([r], [], [], 1.0)[0]:
4512 s = os.read(r, 1024)
4513 read_results.append(s)
4514 except BaseException as exc:
4515 nonlocal error
4516 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004517 t = threading.Thread(target=_read)
4518 t.daemon = True
4519 def alarm1(sig, frame):
4520 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004521 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004522 def alarm2(sig, frame):
4523 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004524
4525 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004526 signal.signal(signal.SIGALRM, alarm1)
4527 try:
4528 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004529 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004530 # Expected behaviour:
4531 # - first raw write() is partial (because of the limited pipe buffer
4532 # and the first alarm)
4533 # - second raw write() returns EINTR (because of the second alarm)
4534 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004535 written = wio.write(large_data)
4536 self.assertEqual(N, written)
4537
Antoine Pitrou707ce822011-02-25 21:24:11 +00004538 wio.flush()
4539 write_finished = True
4540 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004541
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004542 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004543 self.assertEqual(N, sum(len(x) for x in read_results))
4544 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004545 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004546 write_finished = True
4547 os.close(w)
4548 os.close(r)
4549 # This is deliberate. If we didn't close the file descriptor
4550 # before closing wio, wio would try to flush its internal
4551 # buffer, and could block (in case of failure).
4552 try:
4553 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004554 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004555 if e.errno != errno.EBADF:
4556 raise
4557
Antoine Pitrou20db5112011-08-19 20:32:34 +02004558 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004559 self.check_interrupted_write_retry(b"x", mode="wb")
4560
Antoine Pitrou20db5112011-08-19 20:32:34 +02004561 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004562 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4563
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004564
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004565class CSignalsTest(SignalsTest):
4566 io = io
4567
4568class PySignalsTest(SignalsTest):
4569 io = pyio
4570
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004571 # Handling reentrancy issues would slow down _pyio even more, so the
4572 # tests are disabled.
4573 test_reentrant_write_buffered = None
4574 test_reentrant_write_text = None
4575
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004576
Ezio Melottidaa42c72013-03-23 16:30:16 +02004577def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004578 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004579 CBufferedReaderTest, PyBufferedReaderTest,
4580 CBufferedWriterTest, PyBufferedWriterTest,
4581 CBufferedRWPairTest, PyBufferedRWPairTest,
4582 CBufferedRandomTest, PyBufferedRandomTest,
4583 StatefulIncrementalDecoderTest,
4584 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4585 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004586 CMiscIOTest, PyMiscIOTest,
4587 CSignalsTest, PySignalsTest,
4588 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004589
4590 # Put the namespaces of the IO module we are testing and some useful mock
4591 # classes in the __dict__ of each test.
4592 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004593 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4594 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004595 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4596 c_io_ns = {name : getattr(io, name) for name in all_members}
4597 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4598 globs = globals()
4599 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4600 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4601 # Avoid turning open into a bound method.
4602 py_io_ns["open"] = pyio.OpenWrapper
4603 for test in tests:
4604 if test.__name__.startswith("C"):
4605 for name, obj in c_io_ns.items():
4606 setattr(test, name, obj)
4607 elif test.__name__.startswith("Py"):
4608 for name, obj in py_io_ns.items():
4609 setattr(test, name, obj)
4610
Ezio Melottidaa42c72013-03-23 16:30:16 +02004611 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4612 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004613
4614if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004615 unittest.main()