blob: 35013b6a090cbb9a5afcf3604ce79b33c3e96cde [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Hai Shi883bc632020-07-06 17:12:49 +080043from test.support import import_helper
44from test.support import os_helper
Hai Shie80697d2020-05-28 06:10:27 +080045from test.support import threading_helper
Hai Shi883bc632020-07-06 17:12:49 +080046from test.support import warnings_helper
47from test.support.os_helper import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000048
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000049import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050import io # C implementation of io
51import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Martin Panter6bb91f32016-05-28 00:41:57 +000053try:
54 import ctypes
55except ImportError:
56 def byteslike(*pos, **kw):
57 return array.array("b", bytes(*pos, **kw))
58else:
59 def byteslike(*pos, **kw):
60 """Create a bytes-like object having no string or sequence methods"""
61 data = bytes(*pos, **kw)
62 obj = EmptyStruct()
63 ctypes.resize(obj, len(data))
64 memoryview(obj).cast("B")[:] = data
65 return obj
66 class EmptyStruct(ctypes.Structure):
67 pass
68
Gregory P. Smithe5796c42018-12-30 20:17:57 -080069_cflags = sysconfig.get_config_var('CFLAGS') or ''
70_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
71MEMORY_SANITIZER = (
72 '-fsanitize=memory' in _cflags or
73 '--with-memory-sanitizer' in _config_args
74)
75
Miss Islington (bot)be200c32021-09-14 11:58:19 -070076ADDRESS_SANITIZER = (
77 '-fsanitize=address' in _cflags
78)
79
Victor Stinnerbc2aa812019-05-23 03:45:09 +020080# Does io.IOBase finalizer log the exception if the close() method fails?
81# The exception is ignored silently by default in release build.
82IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020083
84
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000085def _default_chunk_size():
86 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000087 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 return f._CHUNK_SIZE
89
90
Antoine Pitrou328ec742010-09-14 18:37:24 +000091class MockRawIOWithoutRead:
92 """A RawIO implementation without read(), so as to exercise the default
93 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000094
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000095 def __init__(self, read_stack=()):
96 self._read_stack = list(read_stack)
97 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000099 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000100
Guido van Rossum01a27522007-03-07 01:00:12 +0000101 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000102 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +0000103 return len(b)
104
105 def writable(self):
106 return True
107
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000108 def fileno(self):
109 return 42
110
111 def readable(self):
112 return True
113
Guido van Rossum01a27522007-03-07 01:00:12 +0000114 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000115 return True
116
Guido van Rossum01a27522007-03-07 01:00:12 +0000117 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000118 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000119
120 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000121 return 0 # same comment as above
122
123 def readinto(self, buf):
124 self._reads += 1
125 max_len = len(buf)
126 try:
127 data = self._read_stack[0]
128 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000129 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000130 return 0
131 if data is None:
132 del self._read_stack[0]
133 return None
134 n = len(data)
135 if len(data) <= max_len:
136 del self._read_stack[0]
137 buf[:n] = data
138 return n
139 else:
140 buf[:] = data[:max_len]
141 self._read_stack[0] = data[max_len:]
142 return max_len
143
144 def truncate(self, pos=None):
145 return pos
146
Antoine Pitrou328ec742010-09-14 18:37:24 +0000147class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
148 pass
149
150class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
151 pass
152
153
154class MockRawIO(MockRawIOWithoutRead):
155
156 def read(self, n=None):
157 self._reads += 1
158 try:
159 return self._read_stack.pop(0)
160 except:
161 self._extraneous_reads += 1
162 return b""
163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164class CMockRawIO(MockRawIO, io.RawIOBase):
165 pass
166
167class PyMockRawIO(MockRawIO, pyio.RawIOBase):
168 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000169
Guido van Rossuma9e20242007-03-08 00:43:48 +0000170
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000171class MisbehavedRawIO(MockRawIO):
172 def write(self, b):
173 return super().write(b) * 2
174
175 def read(self, n=None):
176 return super().read(n) * 2
177
178 def seek(self, pos, whence):
179 return -123
180
181 def tell(self):
182 return -456
183
184 def readinto(self, buf):
185 super().readinto(buf)
186 return len(buf) * 5
187
188class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
189 pass
190
191class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
192 pass
193
194
benfogle9703f092017-11-10 16:03:40 -0500195class SlowFlushRawIO(MockRawIO):
196 def __init__(self):
197 super().__init__()
198 self.in_flush = threading.Event()
199
200 def flush(self):
201 self.in_flush.set()
202 time.sleep(0.25)
203
204class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
205 pass
206
207class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
208 pass
209
210
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000211class CloseFailureIO(MockRawIO):
212 closed = 0
213
214 def close(self):
215 if not self.closed:
216 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200217 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000218
219class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
220 pass
221
222class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
223 pass
224
225
226class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000227
228 def __init__(self, data):
229 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000230 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000231
232 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000233 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000234 self.read_history.append(None if res is None else len(res))
235 return res
236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237 def readinto(self, b):
238 res = super().readinto(b)
239 self.read_history.append(res)
240 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000241
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000242class CMockFileIO(MockFileIO, io.BytesIO):
243 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000244
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000245class PyMockFileIO(MockFileIO, pyio.BytesIO):
246 pass
247
248
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000249class MockUnseekableIO:
250 def seekable(self):
251 return False
252
253 def seek(self, *args):
254 raise self.UnsupportedOperation("not seekable")
255
256 def tell(self, *args):
257 raise self.UnsupportedOperation("not seekable")
258
Martin Panter754aab22016-03-31 07:21:56 +0000259 def truncate(self, *args):
260 raise self.UnsupportedOperation("not seekable")
261
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000262class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
263 UnsupportedOperation = io.UnsupportedOperation
264
265class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
266 UnsupportedOperation = pyio.UnsupportedOperation
267
268
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000269class MockNonBlockWriterIO:
270
271 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000272 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000273 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000274
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000275 def pop_written(self):
276 s = b"".join(self._write_stack)
277 self._write_stack[:] = []
278 return s
279
280 def block_on(self, char):
281 """Block when a given char is encountered."""
282 self._blocker_char = char
283
284 def readable(self):
285 return True
286
287 def seekable(self):
288 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000289
Victor Stinnerb589cef2019-06-11 03:10:59 +0200290 def seek(self, pos, whence=0):
291 # naive implementation, enough for tests
292 return 0
293
Guido van Rossum01a27522007-03-07 01:00:12 +0000294 def writable(self):
295 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000296
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000297 def write(self, b):
298 b = bytes(b)
299 n = -1
300 if self._blocker_char:
301 try:
302 n = b.index(self._blocker_char)
303 except ValueError:
304 pass
305 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100306 if n > 0:
307 # write data up to the first blocker
308 self._write_stack.append(b[:n])
309 return n
310 else:
311 # cancel blocker and indicate would block
312 self._blocker_char = None
313 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000314 self._write_stack.append(b)
315 return len(b)
316
317class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
318 BlockingIOError = io.BlockingIOError
319
320class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
321 BlockingIOError = pyio.BlockingIOError
322
Guido van Rossuma9e20242007-03-08 00:43:48 +0000323
Guido van Rossum28524c72007-02-27 05:47:44 +0000324class IOTest(unittest.TestCase):
325
Neal Norwitze7789b12008-03-24 06:18:09 +0000326 def setUp(self):
Hai Shi883bc632020-07-06 17:12:49 +0800327 os_helper.unlink(os_helper.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000328
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000329 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +0800330 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000331
Guido van Rossum28524c72007-02-27 05:47:44 +0000332 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000333 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000334 f.truncate(0)
335 self.assertEqual(f.tell(), 5)
336 f.seek(0)
337
338 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.seek(0), 0)
340 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000341 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000342 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000343 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000344 buffer = bytearray(b" world\n\n\n")
345 self.assertEqual(f.write(buffer), 9)
346 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000347 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000348 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000349 self.assertEqual(f.seek(-1, 2), 13)
350 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000351
Guido van Rossum87429772007-04-10 21:06:59 +0000352 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000353 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000354 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000355
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 def read_ops(self, f, buffered=False):
357 data = f.read(5)
358 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000359 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000361 self.assertEqual(bytes(data), b" worl")
362 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000363 self.assertEqual(f.readinto(data), 2)
364 self.assertEqual(len(data), 5)
365 self.assertEqual(data[:2], b"d\n")
366 self.assertEqual(f.seek(0), 0)
367 self.assertEqual(f.read(20), b"hello world\n")
368 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000369 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000370 self.assertEqual(f.seek(-6, 2), 6)
371 self.assertEqual(f.read(5), b"world")
372 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000373 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000374 self.assertEqual(f.seek(-6, 1), 5)
375 self.assertEqual(f.read(5), b" worl")
376 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000377 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000378 if buffered:
379 f.seek(0)
380 self.assertEqual(f.read(), b"hello world\n")
381 f.seek(6)
382 self.assertEqual(f.read(), b"world\n")
383 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000384 f.seek(0)
385 data = byteslike(5)
386 self.assertEqual(f.readinto1(data), 5)
387 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000388
Guido van Rossum34d69e52007-04-10 20:08:41 +0000389 LARGE = 2**31
390
Guido van Rossum53807da2007-04-10 19:01:47 +0000391 def large_file_ops(self, f):
392 assert f.readable()
393 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100394 try:
395 self.assertEqual(f.seek(self.LARGE), self.LARGE)
396 except (OverflowError, ValueError):
397 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000398 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000399 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000400 self.assertEqual(f.tell(), self.LARGE + 3)
401 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000402 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000403 self.assertEqual(f.tell(), self.LARGE + 2)
404 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000405 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000406 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000407 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
408 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000409 self.assertEqual(f.read(2), b"x")
410
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000411 def test_invalid_operations(self):
412 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000413 exc = self.UnsupportedOperation
Inada Naoki58cffba2021-04-01 11:25:04 +0900414 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as fp:
415 self.assertRaises(exc, fp.read)
416 self.assertRaises(exc, fp.readline)
417 with self.open(os_helper.TESTFN, "wb") as fp:
418 self.assertRaises(exc, fp.read)
419 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800420 with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000421 self.assertRaises(exc, fp.read)
422 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800423 with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000424 self.assertRaises(exc, fp.write, b"blah")
425 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800426 with self.open(os_helper.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000427 self.assertRaises(exc, fp.write, b"blah")
428 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Inada Naoki58cffba2021-04-01 11:25:04 +0900429 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000430 self.assertRaises(exc, fp.write, "blah")
431 self.assertRaises(exc, fp.writelines, ["blah\n"])
432 # Non-zero seeking from current or end pos
433 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
434 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000435
Martin Panter754aab22016-03-31 07:21:56 +0000436 def test_optional_abilities(self):
437 # Test for OSError when optional APIs are not supported
438 # The purpose of this test is to try fileno(), reading, writing and
439 # seeking operations with various objects that indicate they do not
440 # support these operations.
441
442 def pipe_reader():
443 [r, w] = os.pipe()
444 os.close(w) # So that read() is harmless
445 return self.FileIO(r, "r")
446
447 def pipe_writer():
448 [r, w] = os.pipe()
449 self.addCleanup(os.close, r)
450 # Guarantee that we can write into the pipe without blocking
451 thread = threading.Thread(target=os.read, args=(r, 100))
452 thread.start()
453 self.addCleanup(thread.join)
454 return self.FileIO(w, "w")
455
456 def buffered_reader():
457 return self.BufferedReader(self.MockUnseekableIO())
458
459 def buffered_writer():
460 return self.BufferedWriter(self.MockUnseekableIO())
461
462 def buffered_random():
463 return self.BufferedRandom(self.BytesIO())
464
465 def buffered_rw_pair():
466 return self.BufferedRWPair(self.MockUnseekableIO(),
467 self.MockUnseekableIO())
468
469 def text_reader():
470 class UnseekableReader(self.MockUnseekableIO):
471 writable = self.BufferedIOBase.writable
472 write = self.BufferedIOBase.write
473 return self.TextIOWrapper(UnseekableReader(), "ascii")
474
475 def text_writer():
476 class UnseekableWriter(self.MockUnseekableIO):
477 readable = self.BufferedIOBase.readable
478 read = self.BufferedIOBase.read
479 return self.TextIOWrapper(UnseekableWriter(), "ascii")
480
481 tests = (
482 (pipe_reader, "fr"), (pipe_writer, "fw"),
483 (buffered_reader, "r"), (buffered_writer, "w"),
484 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
485 (text_reader, "r"), (text_writer, "w"),
486 (self.BytesIO, "rws"), (self.StringIO, "rws"),
487 )
488 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000489 with self.subTest(test), test() as obj:
490 readable = "r" in abilities
491 self.assertEqual(obj.readable(), readable)
492 writable = "w" in abilities
493 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000494
495 if isinstance(obj, self.TextIOBase):
496 data = "3"
497 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
498 data = b"3"
499 else:
500 self.fail("Unknown base class")
501
502 if "f" in abilities:
503 obj.fileno()
504 else:
505 self.assertRaises(OSError, obj.fileno)
506
507 if readable:
508 obj.read(1)
509 obj.read()
510 else:
511 self.assertRaises(OSError, obj.read, 1)
512 self.assertRaises(OSError, obj.read)
513
514 if writable:
515 obj.write(data)
516 else:
517 self.assertRaises(OSError, obj.write, data)
518
Martin Panter3ee147f2016-03-31 21:05:31 +0000519 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000520 pipe_reader, pipe_writer):
521 # Pipes seem to appear as seekable on Windows
522 continue
523 seekable = "s" in abilities
524 self.assertEqual(obj.seekable(), seekable)
525
Martin Panter754aab22016-03-31 07:21:56 +0000526 if seekable:
527 obj.tell()
528 obj.seek(0)
529 else:
530 self.assertRaises(OSError, obj.tell)
531 self.assertRaises(OSError, obj.seek, 0)
532
533 if writable and seekable:
534 obj.truncate()
535 obj.truncate(0)
536 else:
537 self.assertRaises(OSError, obj.truncate)
538 self.assertRaises(OSError, obj.truncate, 0)
539
Antoine Pitrou13348842012-01-29 18:36:34 +0100540 def test_open_handles_NUL_chars(self):
541 fn_with_NUL = 'foo\0bar'
Inada Naoki58cffba2021-04-01 11:25:04 +0900542 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w', encoding="utf-8")
Victor Stinner47b45572016-03-25 09:07:07 +0100543
544 bytes_fn = bytes(fn_with_NUL, 'ascii')
545 with warnings.catch_warnings():
546 warnings.simplefilter("ignore", DeprecationWarning)
Inada Naoki58cffba2021-04-01 11:25:04 +0900547 self.assertRaises(ValueError, self.open, bytes_fn, 'w', encoding="utf-8")
Antoine Pitrou13348842012-01-29 18:36:34 +0100548
Guido van Rossum28524c72007-02-27 05:47:44 +0000549 def test_raw_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800550 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000551 self.assertEqual(f.readable(), False)
552 self.assertEqual(f.writable(), True)
553 self.assertEqual(f.seekable(), True)
554 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800555 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000556 self.assertEqual(f.readable(), True)
557 self.assertEqual(f.writable(), False)
558 self.assertEqual(f.seekable(), True)
559 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000560
Guido van Rossum87429772007-04-10 21:06:59 +0000561 def test_buffered_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800562 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000563 self.assertEqual(f.readable(), False)
564 self.assertEqual(f.writable(), True)
565 self.assertEqual(f.seekable(), True)
566 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800567 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000568 self.assertEqual(f.readable(), True)
569 self.assertEqual(f.writable(), False)
570 self.assertEqual(f.seekable(), True)
571 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000572
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000573 def test_readline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800574 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000575 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Hai Shi883bc632020-07-06 17:12:49 +0800576 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000577 self.assertEqual(f.readline(), b"abc\n")
578 self.assertEqual(f.readline(10), b"def\n")
579 self.assertEqual(f.readline(2), b"xy")
580 self.assertEqual(f.readline(4), b"zzy\n")
581 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000582 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000583 self.assertRaises(TypeError, f.readline, 5.3)
Inada Naoki58cffba2021-04-01 11:25:04 +0900584 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000585 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000586
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300587 def test_readline_nonsizeable(self):
588 # Issue #30061
589 # Crash when readline() returns an object without __len__
590 class R(self.IOBase):
591 def readline(self):
592 return None
593 self.assertRaises((TypeError, StopIteration), next, R())
594
595 def test_next_nonsizeable(self):
596 # Issue #30061
597 # Crash when __next__() returns an object without __len__
598 class R(self.IOBase):
599 def __next__(self):
600 return None
601 self.assertRaises(TypeError, R().readlines, 1)
602
Guido van Rossum28524c72007-02-27 05:47:44 +0000603 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000604 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000605 self.write_ops(f)
606 data = f.getvalue()
607 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000608 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000609 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000610
Guido van Rossum53807da2007-04-10 19:01:47 +0000611 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300612 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800613 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000614 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200615 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600616 support.requires(
617 'largefile',
618 'test requires %s bytes and a long time to run' % self.LARGE)
Hai Shi883bc632020-07-06 17:12:49 +0800619 with self.open(os_helper.TESTFN, "w+b", 0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000620 self.large_file_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800621 with self.open(os_helper.TESTFN, "w+b") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000622 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000623
624 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300625 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000626 f = None
Hai Shi883bc632020-07-06 17:12:49 +0800627 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000628 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000629 self.assertEqual(f.closed, True)
630 f = None
631 try:
Hai Shi883bc632020-07-06 17:12:49 +0800632 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000633 1/0
634 except ZeroDivisionError:
635 self.assertEqual(f.closed, True)
636 else:
637 self.fail("1/0 didn't raise an exception")
638
Antoine Pitrou08838b62009-01-21 00:55:13 +0000639 # issue 5008
640 def test_append_mode_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +0800641 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000642 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800643 with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000644 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800645 with self.open(os_helper.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000646 self.assertEqual(f.tell(), 3)
Inada Naoki58cffba2021-04-01 11:25:04 +0900647 with self.open(os_helper.TESTFN, "a", encoding="utf-8") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300648 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000649
Guido van Rossum87429772007-04-10 21:06:59 +0000650 def test_destructor(self):
651 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000652 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000653 def __del__(self):
654 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000655 try:
656 f = super().__del__
657 except AttributeError:
658 pass
659 else:
660 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000661 def close(self):
662 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000663 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000664 def flush(self):
665 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666 super().flush()
Hai Shi883bc632020-07-06 17:12:49 +0800667 with warnings_helper.check_warnings(('', ResourceWarning)):
668 f = MyFileIO(os_helper.TESTFN, "wb")
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000669 f.write(b"xxx")
670 del f
671 support.gc_collect()
672 self.assertEqual(record, [1, 2, 3])
Hai Shi883bc632020-07-06 17:12:49 +0800673 with self.open(os_helper.TESTFN, "rb") as f:
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000674 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000675
676 def _check_base_destructor(self, base):
677 record = []
678 class MyIO(base):
679 def __init__(self):
680 # This exercises the availability of attributes on object
681 # destruction.
682 # (in the C version, close() is called by the tp_dealloc
683 # function, not by __del__)
684 self.on_del = 1
685 self.on_close = 2
686 self.on_flush = 3
687 def __del__(self):
688 record.append(self.on_del)
689 try:
690 f = super().__del__
691 except AttributeError:
692 pass
693 else:
694 f()
695 def close(self):
696 record.append(self.on_close)
697 super().close()
698 def flush(self):
699 record.append(self.on_flush)
700 super().flush()
701 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000702 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000703 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000704 self.assertEqual(record, [1, 2, 3])
705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000706 def test_IOBase_destructor(self):
707 self._check_base_destructor(self.IOBase)
708
709 def test_RawIOBase_destructor(self):
710 self._check_base_destructor(self.RawIOBase)
711
712 def test_BufferedIOBase_destructor(self):
713 self._check_base_destructor(self.BufferedIOBase)
714
715 def test_TextIOBase_destructor(self):
716 self._check_base_destructor(self.TextIOBase)
717
Guido van Rossum87429772007-04-10 21:06:59 +0000718 def test_close_flushes(self):
Hai Shi883bc632020-07-06 17:12:49 +0800719 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000720 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800721 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000722 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000723
Guido van Rossumd4103952007-04-12 05:44:49 +0000724 def test_array_writes(self):
725 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000726 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000727 def check(f):
728 with f:
729 self.assertEqual(f.write(a), n)
730 f.writelines((a,))
731 check(self.BytesIO())
Hai Shi883bc632020-07-06 17:12:49 +0800732 check(self.FileIO(os_helper.TESTFN, "w"))
Martin Panter6bb91f32016-05-28 00:41:57 +0000733 check(self.BufferedWriter(self.MockRawIO()))
734 check(self.BufferedRandom(self.MockRawIO()))
735 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000736
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000737 def test_closefd(self):
Hai Shi883bc632020-07-06 17:12:49 +0800738 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
Inada Naoki58cffba2021-04-01 11:25:04 +0900739 encoding="utf-8", closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741 def test_read_closed(self):
Inada Naoki58cffba2021-04-01 11:25:04 +0900742 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000743 f.write("egg\n")
Inada Naoki58cffba2021-04-01 11:25:04 +0900744 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
745 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000746 self.assertEqual(file.read(), "egg\n")
747 file.seek(0)
748 file.close()
749 self.assertRaises(ValueError, file.read)
Hai Shi883bc632020-07-06 17:12:49 +0800750 with self.open(os_helper.TESTFN, "rb") as f:
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100751 file = self.open(f.fileno(), "rb", closefd=False)
752 self.assertEqual(file.read()[:3], b"egg")
753 file.close()
754 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000755
756 def test_no_closefd_with_filename(self):
757 # can't use closefd in combination with a file name
Inada Naoki58cffba2021-04-01 11:25:04 +0900758 self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r",
759 encoding="utf-8", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000760
761 def test_closefd_attr(self):
Hai Shi883bc632020-07-06 17:12:49 +0800762 with self.open(os_helper.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000763 f.write(b"egg\n")
Inada Naoki58cffba2021-04-01 11:25:04 +0900764 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000765 self.assertEqual(f.buffer.raw.closefd, True)
Inada Naoki58cffba2021-04-01 11:25:04 +0900766 file = self.open(f.fileno(), "r", encoding="utf-8", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000767 self.assertEqual(file.buffer.raw.closefd, False)
768
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000769 def test_garbage_collection(self):
770 # FileIO objects are collected, and collecting them flushes
771 # all data to disk.
Hai Shi883bc632020-07-06 17:12:49 +0800772 with warnings_helper.check_warnings(('', ResourceWarning)):
773 f = self.FileIO(os_helper.TESTFN, "wb")
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000774 f.write(b"abcxxx")
775 f.f = f
776 wr = weakref.ref(f)
777 del f
778 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300779 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +0800780 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000781 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000782
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000783 def test_unbounded_file(self):
784 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
785 zero = "/dev/zero"
786 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000787 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000788 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000789 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000790 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800791 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000792 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000793 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000794 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000795 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000796 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000797 self.assertRaises(OverflowError, f.read)
798
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200799 def check_flush_error_on_close(self, *args, **kwargs):
800 # Test that the file is closed despite failed flush
801 # and that flush() is called before file closed.
802 f = self.open(*args, **kwargs)
803 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000804 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200805 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200806 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000807 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200808 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600809 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200810 self.assertTrue(closed) # flush() called
811 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200812 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200813
814 def test_flush_error_on_close(self):
815 # raw file
816 # Issue #5700: io.FileIO calls flush() after file closed
Hai Shi883bc632020-07-06 17:12:49 +0800817 self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
818 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200819 self.check_flush_error_on_close(fd, 'wb', buffering=0)
Hai Shi883bc632020-07-06 17:12:49 +0800820 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200821 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
822 os.close(fd)
823 # buffered io
Hai Shi883bc632020-07-06 17:12:49 +0800824 self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
825 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200826 self.check_flush_error_on_close(fd, 'wb')
Hai Shi883bc632020-07-06 17:12:49 +0800827 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200828 self.check_flush_error_on_close(fd, 'wb', closefd=False)
829 os.close(fd)
830 # text io
Inada Naoki58cffba2021-04-01 11:25:04 +0900831 self.check_flush_error_on_close(os_helper.TESTFN, 'w', encoding="utf-8")
Hai Shi883bc632020-07-06 17:12:49 +0800832 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Inada Naoki58cffba2021-04-01 11:25:04 +0900833 self.check_flush_error_on_close(fd, 'w', encoding="utf-8")
Hai Shi883bc632020-07-06 17:12:49 +0800834 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Inada Naoki58cffba2021-04-01 11:25:04 +0900835 self.check_flush_error_on_close(fd, 'w', encoding="utf-8", closefd=False)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200836 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000837
838 def test_multi_close(self):
Hai Shi883bc632020-07-06 17:12:49 +0800839 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000840 f.close()
841 f.close()
842 f.close()
843 self.assertRaises(ValueError, f.flush)
844
Antoine Pitrou328ec742010-09-14 18:37:24 +0000845 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530846 # Exercise the default limited RawIOBase.read(n) implementation (which
847 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000848 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
849 self.assertEqual(rawio.read(2), b"ab")
850 self.assertEqual(rawio.read(2), b"c")
851 self.assertEqual(rawio.read(2), b"d")
852 self.assertEqual(rawio.read(2), None)
853 self.assertEqual(rawio.read(2), b"ef")
854 self.assertEqual(rawio.read(2), b"g")
855 self.assertEqual(rawio.read(2), None)
856 self.assertEqual(rawio.read(2), b"")
857
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400858 def test_types_have_dict(self):
859 test = (
860 self.IOBase(),
861 self.RawIOBase(),
862 self.TextIOBase(),
863 self.StringIO(),
864 self.BytesIO()
865 )
866 for obj in test:
867 self.assertTrue(hasattr(obj, "__dict__"))
868
Ross Lagerwall59142db2011-10-31 20:34:46 +0200869 def test_opener(self):
Inada Naoki58cffba2021-04-01 11:25:04 +0900870 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Ross Lagerwall59142db2011-10-31 20:34:46 +0200871 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800872 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Ross Lagerwall59142db2011-10-31 20:34:46 +0200873 def opener(path, flags):
874 return fd
Inada Naoki58cffba2021-04-01 11:25:04 +0900875 with self.open("non-existent", "r", encoding="utf-8", opener=opener) as f:
Ross Lagerwall59142db2011-10-31 20:34:46 +0200876 self.assertEqual(f.read(), "egg\n")
877
Barry Warsaw480e2852016-06-08 17:47:26 -0400878 def test_bad_opener_negative_1(self):
879 # Issue #27066.
880 def badopener(fname, flags):
881 return -1
882 with self.assertRaises(ValueError) as cm:
883 open('non-existent', 'r', opener=badopener)
884 self.assertEqual(str(cm.exception), 'opener returned -1')
885
886 def test_bad_opener_other_negative(self):
887 # Issue #27066.
888 def badopener(fname, flags):
889 return -2
890 with self.assertRaises(ValueError) as cm:
891 open('non-existent', 'r', opener=badopener)
892 self.assertEqual(str(cm.exception), 'opener returned -2')
893
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200894 def test_fileio_closefd(self):
895 # Issue #4841
896 with self.open(__file__, 'rb') as f1, \
897 self.open(__file__, 'rb') as f2:
898 fileio = self.FileIO(f1.fileno(), closefd=False)
899 # .__init__() must not close f1
900 fileio.__init__(f2.fileno(), closefd=False)
901 f1.readline()
902 # .close() must not close f2
903 fileio.close()
904 f2.readline()
905
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300906 def test_nonbuffered_textio(self):
Hai Shi883bc632020-07-06 17:12:49 +0800907 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300908 with self.assertRaises(ValueError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900909 self.open(os_helper.TESTFN, 'w', encoding="utf-8", buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300910
911 def test_invalid_newline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800912 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300913 with self.assertRaises(ValueError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900914 self.open(os_helper.TESTFN, 'w', encoding="utf-8", newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300915
Martin Panter6bb91f32016-05-28 00:41:57 +0000916 def test_buffered_readinto_mixin(self):
917 # Test the implementation provided by BufferedIOBase
918 class Stream(self.BufferedIOBase):
919 def read(self, size):
920 return b"12345"
921 read1 = read
922 stream = Stream()
923 for method in ("readinto", "readinto1"):
924 with self.subTest(method):
925 buffer = byteslike(5)
926 self.assertEqual(getattr(stream, method)(buffer), 5)
927 self.assertEqual(bytes(buffer), b"12345")
928
Ethan Furmand62548a2016-06-04 14:38:43 -0700929 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700930 def check_path_succeeds(path):
Inada Naoki58cffba2021-04-01 11:25:04 +0900931 with self.open(path, "w", encoding="utf-8") as f:
Ethan Furmand62548a2016-06-04 14:38:43 -0700932 f.write("egg\n")
933
Inada Naoki58cffba2021-04-01 11:25:04 +0900934 with self.open(path, "r", encoding="utf-8") as f:
Ethan Furmand62548a2016-06-04 14:38:43 -0700935 self.assertEqual(f.read(), "egg\n")
936
Hai Shi883bc632020-07-06 17:12:49 +0800937 check_path_succeeds(FakePath(os_helper.TESTFN))
Serhiy Storchaka67987ac2020-07-27 20:58:35 +0300938 check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
Ethan Furmand62548a2016-06-04 14:38:43 -0700939
Inada Naoki58cffba2021-04-01 11:25:04 +0900940 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200941 bad_path = FakePath(f.fileno())
942 with self.assertRaises(TypeError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900943 self.open(bad_path, 'w', encoding="utf-8")
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200944
945 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700946 with self.assertRaises(TypeError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900947 self.open(bad_path, 'w', encoding="utf-8")
Ethan Furmand62548a2016-06-04 14:38:43 -0700948
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200949 bad_path = FakePath(FloatingPointError)
950 with self.assertRaises(FloatingPointError):
Inada Naoki58cffba2021-04-01 11:25:04 +0900951 self.open(bad_path, 'w', encoding="utf-8")
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200952
Ethan Furmand62548a2016-06-04 14:38:43 -0700953 # ensure that refcounting is correct with some error conditions
954 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Inada Naoki58cffba2021-04-01 11:25:04 +0900955 self.open(FakePath(os_helper.TESTFN), 'rwxa', encoding="utf-8")
Ethan Furmand62548a2016-06-04 14:38:43 -0700956
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530957 def test_RawIOBase_readall(self):
958 # Exercise the default unlimited RawIOBase.read() and readall()
959 # implementations.
960 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
961 self.assertEqual(rawio.read(), b"abcdefg")
962 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
963 self.assertEqual(rawio.readall(), b"abcdefg")
964
965 def test_BufferedIOBase_readinto(self):
966 # Exercise the default BufferedIOBase.readinto() and readinto1()
967 # implementations (which call read() or read1() internally).
968 class Reader(self.BufferedIOBase):
969 def __init__(self, avail):
970 self.avail = avail
971 def read(self, size):
972 result = self.avail[:size]
973 self.avail = self.avail[size:]
974 return result
975 def read1(self, size):
976 """Returns no more than 5 bytes at once"""
977 return self.read(min(size, 5))
978 tests = (
979 # (test method, total data available, read buffer size, expected
980 # read size)
981 ("readinto", 10, 5, 5),
982 ("readinto", 10, 6, 6), # More than read1() can return
983 ("readinto", 5, 6, 5), # Buffer larger than total available
984 ("readinto", 6, 7, 6),
985 ("readinto", 10, 0, 0), # Empty buffer
986 ("readinto1", 10, 5, 5), # Result limited to single read1() call
987 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
988 ("readinto1", 5, 6, 5), # Buffer larger than total available
989 ("readinto1", 6, 7, 5),
990 ("readinto1", 10, 0, 0), # Empty buffer
991 )
992 UNUSED_BYTE = 0x81
993 for test in tests:
994 with self.subTest(test):
995 method, avail, request, result = test
996 reader = Reader(bytes(range(avail)))
997 buffer = bytearray((UNUSED_BYTE,) * request)
998 method = getattr(reader, method)
999 self.assertEqual(method(buffer), result)
1000 self.assertEqual(len(buffer), request)
1001 self.assertSequenceEqual(buffer[:result], range(result))
1002 unused = (UNUSED_BYTE,) * (request - result)
1003 self.assertSequenceEqual(buffer[result:], unused)
1004 self.assertEqual(len(reader.avail), avail - result)
1005
Zackery Spytz28f07362018-07-17 00:31:44 -06001006 def test_close_assert(self):
1007 class R(self.IOBase):
1008 def __setattr__(self, name, value):
1009 pass
1010 def flush(self):
1011 raise OSError()
1012 f = R()
1013 # This would cause an assertion failure.
1014 self.assertRaises(OSError, f.close)
1015
Victor Stinner472f7942019-04-12 21:58:24 +02001016 # Silence destructor error
1017 R.flush = lambda self: None
1018
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001020class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001021
1022 def test_IOBase_finalize(self):
1023 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1024 # class which inherits IOBase and an object of this class are caught
1025 # in a reference cycle and close() is already in the method cache.
1026 class MyIO(self.IOBase):
1027 def close(self):
1028 pass
1029
1030 # create an instance to populate the method cache
1031 MyIO()
1032 obj = MyIO()
1033 obj.obj = obj
1034 wr = weakref.ref(obj)
1035 del MyIO
1036 del obj
1037 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001038 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001039
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001040class PyIOTest(IOTest):
1041 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001042
Guido van Rossuma9e20242007-03-08 00:43:48 +00001043
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001044@support.cpython_only
1045class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001046
Gregory P. Smith054b0652015-04-14 12:58:05 -07001047 def test_RawIOBase_io_in_pyio_match(self):
1048 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001049 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1050 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001051 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1052
1053 def test_RawIOBase_pyio_in_io_match(self):
1054 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1055 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1056 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1057
1058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001059class CommonBufferedTests:
1060 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1061
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001062 def test_detach(self):
1063 raw = self.MockRawIO()
1064 buf = self.tp(raw)
1065 self.assertIs(buf.detach(), raw)
1066 self.assertRaises(ValueError, buf.detach)
1067
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001068 repr(buf) # Should still work
1069
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001070 def test_fileno(self):
1071 rawio = self.MockRawIO()
1072 bufio = self.tp(rawio)
1073
Ezio Melottib3aedd42010-11-20 19:04:17 +00001074 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001076 def test_invalid_args(self):
1077 rawio = self.MockRawIO()
1078 bufio = self.tp(rawio)
1079 # Invalid whence
1080 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001081 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001082
1083 def test_override_destructor(self):
1084 tp = self.tp
1085 record = []
1086 class MyBufferedIO(tp):
1087 def __del__(self):
1088 record.append(1)
1089 try:
1090 f = super().__del__
1091 except AttributeError:
1092 pass
1093 else:
1094 f()
1095 def close(self):
1096 record.append(2)
1097 super().close()
1098 def flush(self):
1099 record.append(3)
1100 super().flush()
1101 rawio = self.MockRawIO()
1102 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001103 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001104 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001105 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001106
1107 def test_context_manager(self):
1108 # Test usability as a context manager
1109 rawio = self.MockRawIO()
1110 bufio = self.tp(rawio)
1111 def _with():
1112 with bufio:
1113 pass
1114 _with()
1115 # bufio should now be closed, and using it a second time should raise
1116 # a ValueError.
1117 self.assertRaises(ValueError, _with)
1118
1119 def test_error_through_destructor(self):
1120 # Test that the exception state is not modified by a destructor,
1121 # even if close() fails.
1122 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001123 with support.catch_unraisable_exception() as cm:
1124 with self.assertRaises(AttributeError):
1125 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001126
1127 if not IOBASE_EMITS_UNRAISABLE:
1128 self.assertIsNone(cm.unraisable)
1129 elif cm.unraisable is not None:
1130 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001131
Antoine Pitrou716c4442009-05-23 19:04:03 +00001132 def test_repr(self):
1133 raw = self.MockRawIO()
1134 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001135 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1136 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001137 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001138 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001139 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001140 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001141
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001142 def test_recursive_repr(self):
1143 # Issue #25455
1144 raw = self.MockRawIO()
1145 b = self.tp(raw)
1146 with support.swap_attr(raw, 'name', b):
1147 try:
1148 repr(b) # Should not crash
1149 except RuntimeError:
1150 pass
1151
Antoine Pitrou6be88762010-05-03 16:48:20 +00001152 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001153 # Test that buffered file is closed despite failed flush
1154 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001155 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001156 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001157 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001158 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001159 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001160 raw.flush = bad_flush
1161 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001162 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001163 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001164 self.assertTrue(raw.closed)
1165 self.assertTrue(closed) # flush() called
1166 self.assertFalse(closed[0]) # flush() called before file closed
1167 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001168 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001169
1170 def test_close_error_on_close(self):
1171 raw = self.MockRawIO()
1172 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001173 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001174 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001175 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001176 raw.close = bad_close
1177 b = self.tp(raw)
1178 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001179 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001180 b.close()
1181 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001182 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001183 self.assertEqual(err.exception.__context__.args, ('flush',))
1184 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001185
Victor Stinner472f7942019-04-12 21:58:24 +02001186 # Silence destructor error
1187 raw.close = lambda: None
1188 b.flush = lambda: None
1189
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001190 def test_nonnormalized_close_error_on_close(self):
1191 # Issue #21677
1192 raw = self.MockRawIO()
1193 def bad_flush():
1194 raise non_existing_flush
1195 def bad_close():
1196 raise non_existing_close
1197 raw.close = bad_close
1198 b = self.tp(raw)
1199 b.flush = bad_flush
1200 with self.assertRaises(NameError) as err: # exception not swallowed
1201 b.close()
1202 self.assertIn('non_existing_close', str(err.exception))
1203 self.assertIsInstance(err.exception.__context__, NameError)
1204 self.assertIn('non_existing_flush', str(err.exception.__context__))
1205 self.assertFalse(b.closed)
1206
Victor Stinner472f7942019-04-12 21:58:24 +02001207 # Silence destructor error
1208 b.flush = lambda: None
1209 raw.close = lambda: None
1210
Antoine Pitrou6be88762010-05-03 16:48:20 +00001211 def test_multi_close(self):
1212 raw = self.MockRawIO()
1213 b = self.tp(raw)
1214 b.close()
1215 b.close()
1216 b.close()
1217 self.assertRaises(ValueError, b.flush)
1218
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001219 def test_unseekable(self):
1220 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1221 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1222 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1223
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001224 def test_readonly_attributes(self):
1225 raw = self.MockRawIO()
1226 buf = self.tp(raw)
1227 x = self.MockRawIO()
1228 with self.assertRaises(AttributeError):
1229 buf.raw = x
1230
Guido van Rossum78892e42007-04-06 17:31:18 +00001231
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001232class SizeofTest:
1233
1234 @support.cpython_only
1235 def test_sizeof(self):
1236 bufsize1 = 4096
1237 bufsize2 = 8192
1238 rawio = self.MockRawIO()
1239 bufio = self.tp(rawio, buffer_size=bufsize1)
1240 size = sys.getsizeof(bufio) - bufsize1
1241 rawio = self.MockRawIO()
1242 bufio = self.tp(rawio, buffer_size=bufsize2)
1243 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1244
Jesus Ceadc469452012-10-04 12:37:56 +02001245 @support.cpython_only
1246 def test_buffer_freeing(self) :
1247 bufsize = 4096
1248 rawio = self.MockRawIO()
1249 bufio = self.tp(rawio, buffer_size=bufsize)
1250 size = sys.getsizeof(bufio) - bufsize
1251 bufio.close()
1252 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001253
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001254class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1255 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001256
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 def test_constructor(self):
1258 rawio = self.MockRawIO([b"abc"])
1259 bufio = self.tp(rawio)
1260 bufio.__init__(rawio)
1261 bufio.__init__(rawio, buffer_size=1024)
1262 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001263 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001264 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1265 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1266 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1267 rawio = self.MockRawIO([b"abc"])
1268 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001269 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001270
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001271 def test_uninitialized(self):
1272 bufio = self.tp.__new__(self.tp)
1273 del bufio
1274 bufio = self.tp.__new__(self.tp)
1275 self.assertRaisesRegex((ValueError, AttributeError),
1276 'uninitialized|has no attribute',
1277 bufio.read, 0)
1278 bufio.__init__(self.MockRawIO())
1279 self.assertEqual(bufio.read(0), b'')
1280
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001281 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001282 for arg in (None, 7):
1283 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1284 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001285 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001286 # Invalid args
1287 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001288
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001289 def test_read1(self):
1290 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1291 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001292 self.assertEqual(b"a", bufio.read(1))
1293 self.assertEqual(b"b", bufio.read1(1))
1294 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001295 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001296 self.assertEqual(b"c", bufio.read1(100))
1297 self.assertEqual(rawio._reads, 1)
1298 self.assertEqual(b"d", bufio.read1(100))
1299 self.assertEqual(rawio._reads, 2)
1300 self.assertEqual(b"efg", bufio.read1(100))
1301 self.assertEqual(rawio._reads, 3)
1302 self.assertEqual(b"", bufio.read1(100))
1303 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001304
1305 def test_read1_arbitrary(self):
1306 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1307 bufio = self.tp(rawio)
1308 self.assertEqual(b"a", bufio.read(1))
1309 self.assertEqual(b"bc", bufio.read1())
1310 self.assertEqual(b"d", bufio.read1())
1311 self.assertEqual(b"efg", bufio.read1(-1))
1312 self.assertEqual(rawio._reads, 3)
1313 self.assertEqual(b"", bufio.read1())
1314 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001315
1316 def test_readinto(self):
1317 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1318 bufio = self.tp(rawio)
1319 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001320 self.assertEqual(bufio.readinto(b), 2)
1321 self.assertEqual(b, b"ab")
1322 self.assertEqual(bufio.readinto(b), 2)
1323 self.assertEqual(b, b"cd")
1324 self.assertEqual(bufio.readinto(b), 2)
1325 self.assertEqual(b, b"ef")
1326 self.assertEqual(bufio.readinto(b), 1)
1327 self.assertEqual(b, b"gf")
1328 self.assertEqual(bufio.readinto(b), 0)
1329 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001330 rawio = self.MockRawIO((b"abc", None))
1331 bufio = self.tp(rawio)
1332 self.assertEqual(bufio.readinto(b), 2)
1333 self.assertEqual(b, b"ab")
1334 self.assertEqual(bufio.readinto(b), 1)
1335 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001336
Benjamin Petersona96fea02014-06-22 14:17:44 -07001337 def test_readinto1(self):
1338 buffer_size = 10
1339 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1340 bufio = self.tp(rawio, buffer_size=buffer_size)
1341 b = bytearray(2)
1342 self.assertEqual(bufio.peek(3), b'abc')
1343 self.assertEqual(rawio._reads, 1)
1344 self.assertEqual(bufio.readinto1(b), 2)
1345 self.assertEqual(b, b"ab")
1346 self.assertEqual(rawio._reads, 1)
1347 self.assertEqual(bufio.readinto1(b), 1)
1348 self.assertEqual(b[:1], b"c")
1349 self.assertEqual(rawio._reads, 1)
1350 self.assertEqual(bufio.readinto1(b), 2)
1351 self.assertEqual(b, b"de")
1352 self.assertEqual(rawio._reads, 2)
1353 b = bytearray(2*buffer_size)
1354 self.assertEqual(bufio.peek(3), b'fgh')
1355 self.assertEqual(rawio._reads, 3)
1356 self.assertEqual(bufio.readinto1(b), 6)
1357 self.assertEqual(b[:6], b"fghjkl")
1358 self.assertEqual(rawio._reads, 4)
1359
1360 def test_readinto_array(self):
1361 buffer_size = 60
1362 data = b"a" * 26
1363 rawio = self.MockRawIO((data,))
1364 bufio = self.tp(rawio, buffer_size=buffer_size)
1365
1366 # Create an array with element size > 1 byte
1367 b = array.array('i', b'x' * 32)
1368 assert len(b) != 16
1369
1370 # Read into it. We should get as many *bytes* as we can fit into b
1371 # (which is more than the number of elements)
1372 n = bufio.readinto(b)
1373 self.assertGreater(n, len(b))
1374
1375 # Check that old contents of b are preserved
1376 bm = memoryview(b).cast('B')
1377 self.assertLess(n, len(bm))
1378 self.assertEqual(bm[:n], data[:n])
1379 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1380
1381 def test_readinto1_array(self):
1382 buffer_size = 60
1383 data = b"a" * 26
1384 rawio = self.MockRawIO((data,))
1385 bufio = self.tp(rawio, buffer_size=buffer_size)
1386
1387 # Create an array with element size > 1 byte
1388 b = array.array('i', b'x' * 32)
1389 assert len(b) != 16
1390
1391 # Read into it. We should get as many *bytes* as we can fit into b
1392 # (which is more than the number of elements)
1393 n = bufio.readinto1(b)
1394 self.assertGreater(n, len(b))
1395
1396 # Check that old contents of b are preserved
1397 bm = memoryview(b).cast('B')
1398 self.assertLess(n, len(bm))
1399 self.assertEqual(bm[:n], data[:n])
1400 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1401
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001402 def test_readlines(self):
1403 def bufio():
1404 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1405 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001406 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1407 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1408 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001409
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001410 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001411 data = b"abcdefghi"
1412 dlen = len(data)
1413
1414 tests = [
1415 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1416 [ 100, [ 3, 3, 3], [ dlen ] ],
1417 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1418 ]
1419
1420 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001421 rawio = self.MockFileIO(data)
1422 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001423 pos = 0
1424 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001425 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001426 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001427 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001428 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001429
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001430 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001431 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001432 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1433 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001434 self.assertEqual(b"abcd", bufio.read(6))
1435 self.assertEqual(b"e", bufio.read(1))
1436 self.assertEqual(b"fg", bufio.read())
1437 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001438 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001439 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001440
Victor Stinnera80987f2011-05-25 22:47:16 +02001441 rawio = self.MockRawIO((b"a", None, None))
1442 self.assertEqual(b"a", rawio.readall())
1443 self.assertIsNone(rawio.readall())
1444
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445 def test_read_past_eof(self):
1446 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1447 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001448
Ezio Melottib3aedd42010-11-20 19:04:17 +00001449 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001450
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451 def test_read_all(self):
1452 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1453 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001454
Ezio Melottib3aedd42010-11-20 19:04:17 +00001455 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001456
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001457 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001458 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001459 try:
1460 # Write out many bytes with exactly the same number of 0's,
1461 # 1's... 255's. This will help us check that concurrent reading
1462 # doesn't duplicate or forget contents.
1463 N = 1000
1464 l = list(range(256)) * N
1465 random.shuffle(l)
1466 s = bytes(bytearray(l))
Hai Shi883bc632020-07-06 17:12:49 +08001467 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001468 f.write(s)
Hai Shi883bc632020-07-06 17:12:49 +08001469 with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001470 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001471 errors = []
1472 results = []
1473 def f():
1474 try:
1475 # Intra-buffer read then buffer-flushing read
1476 for n in cycle([1, 19]):
1477 s = bufio.read(n)
1478 if not s:
1479 break
1480 # list.append() is atomic
1481 results.append(s)
1482 except Exception as e:
1483 errors.append(e)
1484 raise
1485 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001486 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001487 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001488 self.assertFalse(errors,
1489 "the following exceptions were caught: %r" % errors)
1490 s = b''.join(results)
1491 for i in range(256):
1492 c = bytes(bytearray([i]))
1493 self.assertEqual(s.count(c), N)
1494 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001495 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001496
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001497 def test_unseekable(self):
1498 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1499 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1500 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1501 bufio.read(1)
1502 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1503 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1504
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001505 def test_misbehaved_io(self):
1506 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1507 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001508 self.assertRaises(OSError, bufio.seek, 0)
1509 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001510
Victor Stinnerb589cef2019-06-11 03:10:59 +02001511 # Silence destructor error
1512 bufio.close = lambda: None
1513
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001514 def test_no_extraneous_read(self):
1515 # Issue #9550; when the raw IO object has satisfied the read request,
1516 # we should not issue any additional reads, otherwise it may block
1517 # (e.g. socket).
1518 bufsize = 16
1519 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1520 rawio = self.MockRawIO([b"x" * n])
1521 bufio = self.tp(rawio, bufsize)
1522 self.assertEqual(bufio.read(n), b"x" * n)
1523 # Simple case: one raw read is enough to satisfy the request.
1524 self.assertEqual(rawio._extraneous_reads, 0,
1525 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1526 # A more complex case where two raw reads are needed to satisfy
1527 # the request.
1528 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1529 bufio = self.tp(rawio, bufsize)
1530 self.assertEqual(bufio.read(n), b"x" * n)
1531 self.assertEqual(rawio._extraneous_reads, 0,
1532 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1533
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001534 def test_read_on_closed(self):
1535 # Issue #23796
1536 b = io.BufferedReader(io.BytesIO(b"12"))
1537 b.read(1)
1538 b.close()
1539 self.assertRaises(ValueError, b.peek)
1540 self.assertRaises(ValueError, b.read1, 1)
1541
Berker Peksagfd5116c2020-02-21 20:57:26 +03001542 def test_truncate_on_read_only(self):
1543 rawio = self.MockFileIO(b"abc")
1544 bufio = self.tp(rawio)
1545 self.assertFalse(bufio.writable())
1546 self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1547 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1548
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001549
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001550class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001551 tp = io.BufferedReader
1552
Miss Islington (bot)be200c32021-09-14 11:58:19 -07001553 @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001554 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001555 def test_constructor(self):
1556 BufferedReaderTest.test_constructor(self)
1557 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001558 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001559 if sys.maxsize > 0x7FFFFFFF:
1560 rawio = self.MockRawIO()
1561 bufio = self.tp(rawio)
1562 self.assertRaises((OverflowError, MemoryError, ValueError),
1563 bufio.__init__, rawio, sys.maxsize)
1564
1565 def test_initialization(self):
1566 rawio = self.MockRawIO([b"abc"])
1567 bufio = self.tp(rawio)
1568 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1569 self.assertRaises(ValueError, bufio.read)
1570 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1571 self.assertRaises(ValueError, bufio.read)
1572 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1573 self.assertRaises(ValueError, bufio.read)
1574
1575 def test_misbehaved_io_read(self):
1576 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1577 bufio = self.tp(rawio)
1578 # _pyio.BufferedReader seems to implement reading different, so that
1579 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001580 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001581
1582 def test_garbage_collection(self):
1583 # C BufferedReader objects are collected.
1584 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001585 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1586 with warnings_helper.check_warnings(('', ResourceWarning)):
1587 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001588 f = self.tp(rawio)
1589 f.f = f
1590 wr = weakref.ref(f)
1591 del f
1592 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001593 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001594
R David Murray67bfe802013-02-23 21:51:05 -05001595 def test_args_error(self):
1596 # Issue #17275
1597 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1598 self.tp(io.BytesIO(), 1024, 1024, 1024)
1599
David Szotten86663562020-06-16 00:53:57 +01001600 def test_bad_readinto_value(self):
1601 rawio = io.BufferedReader(io.BytesIO(b"12"))
1602 rawio.readinto = lambda buf: -1
1603 bufio = self.tp(rawio)
1604 with self.assertRaises(OSError) as cm:
1605 bufio.readline()
1606 self.assertIsNone(cm.exception.__cause__)
1607
1608 def test_bad_readinto_type(self):
1609 rawio = io.BufferedReader(io.BytesIO(b"12"))
1610 rawio.readinto = lambda buf: b''
1611 bufio = self.tp(rawio)
1612 with self.assertRaises(OSError) as cm:
1613 bufio.readline()
1614 self.assertIsInstance(cm.exception.__cause__, TypeError)
1615
R David Murray67bfe802013-02-23 21:51:05 -05001616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617class PyBufferedReaderTest(BufferedReaderTest):
1618 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001619
Guido van Rossuma9e20242007-03-08 00:43:48 +00001620
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1622 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 def test_constructor(self):
1625 rawio = self.MockRawIO()
1626 bufio = self.tp(rawio)
1627 bufio.__init__(rawio)
1628 bufio.__init__(rawio, buffer_size=1024)
1629 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001630 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001631 bufio.flush()
1632 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1633 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1634 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1635 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001636 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001637 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001638 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001639
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001640 def test_uninitialized(self):
1641 bufio = self.tp.__new__(self.tp)
1642 del bufio
1643 bufio = self.tp.__new__(self.tp)
1644 self.assertRaisesRegex((ValueError, AttributeError),
1645 'uninitialized|has no attribute',
1646 bufio.write, b'')
1647 bufio.__init__(self.MockRawIO())
1648 self.assertEqual(bufio.write(b''), 0)
1649
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001650 def test_detach_flush(self):
1651 raw = self.MockRawIO()
1652 buf = self.tp(raw)
1653 buf.write(b"howdy!")
1654 self.assertFalse(raw._write_stack)
1655 buf.detach()
1656 self.assertEqual(raw._write_stack, [b"howdy!"])
1657
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001658 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001659 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 writer = self.MockRawIO()
1661 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001662 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001663 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001664 buffer = bytearray(b"def")
1665 bufio.write(buffer)
1666 buffer[:] = b"***" # Overwrite our copy of the data
1667 bufio.flush()
1668 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 def test_write_overflow(self):
1671 writer = self.MockRawIO()
1672 bufio = self.tp(writer, 8)
1673 contents = b"abcdefghijklmnop"
1674 for n in range(0, len(contents), 3):
1675 bufio.write(contents[n:n+3])
1676 flushed = b"".join(writer._write_stack)
1677 # At least (total - 8) bytes were implicitly flushed, perhaps more
1678 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001679 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 def check_writes(self, intermediate_func):
1682 # Lots of writes, test the flushed output is as expected.
1683 contents = bytes(range(256)) * 1000
1684 n = 0
1685 writer = self.MockRawIO()
1686 bufio = self.tp(writer, 13)
1687 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1688 def gen_sizes():
1689 for size in count(1):
1690 for i in range(15):
1691 yield size
1692 sizes = gen_sizes()
1693 while n < len(contents):
1694 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001695 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001696 intermediate_func(bufio)
1697 n += size
1698 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001699 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001700
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001701 def test_writes(self):
1702 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001704 def test_writes_and_flushes(self):
1705 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001706
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707 def test_writes_and_seeks(self):
1708 def _seekabs(bufio):
1709 pos = bufio.tell()
1710 bufio.seek(pos + 1, 0)
1711 bufio.seek(pos - 1, 0)
1712 bufio.seek(pos, 0)
1713 self.check_writes(_seekabs)
1714 def _seekrel(bufio):
1715 pos = bufio.seek(0, 1)
1716 bufio.seek(+1, 1)
1717 bufio.seek(-1, 1)
1718 bufio.seek(pos, 0)
1719 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001720
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721 def test_writes_and_truncates(self):
1722 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001723
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 def test_write_non_blocking(self):
1725 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001726 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001727
Ezio Melottib3aedd42010-11-20 19:04:17 +00001728 self.assertEqual(bufio.write(b"abcd"), 4)
1729 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001730 # 1 byte will be written, the rest will be buffered
1731 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001732 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001733
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001734 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1735 raw.block_on(b"0")
1736 try:
1737 bufio.write(b"opqrwxyz0123456789")
1738 except self.BlockingIOError as e:
1739 written = e.characters_written
1740 else:
1741 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001742 self.assertEqual(written, 16)
1743 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001745
Ezio Melottib3aedd42010-11-20 19:04:17 +00001746 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001747 s = raw.pop_written()
1748 # Previously buffered bytes were flushed
1749 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001751 def test_write_and_rewind(self):
1752 raw = io.BytesIO()
1753 bufio = self.tp(raw, 4)
1754 self.assertEqual(bufio.write(b"abcdef"), 6)
1755 self.assertEqual(bufio.tell(), 6)
1756 bufio.seek(0, 0)
1757 self.assertEqual(bufio.write(b"XY"), 2)
1758 bufio.seek(6, 0)
1759 self.assertEqual(raw.getvalue(), b"XYcdef")
1760 self.assertEqual(bufio.write(b"123456"), 6)
1761 bufio.flush()
1762 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001763
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001764 def test_flush(self):
1765 writer = self.MockRawIO()
1766 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001767 bufio.write(b"abc")
1768 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001769 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001770
Antoine Pitrou131a4892012-10-16 22:57:11 +02001771 def test_writelines(self):
1772 l = [b'ab', b'cd', b'ef']
1773 writer = self.MockRawIO()
1774 bufio = self.tp(writer, 8)
1775 bufio.writelines(l)
1776 bufio.flush()
1777 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1778
1779 def test_writelines_userlist(self):
1780 l = UserList([b'ab', b'cd', b'ef'])
1781 writer = self.MockRawIO()
1782 bufio = self.tp(writer, 8)
1783 bufio.writelines(l)
1784 bufio.flush()
1785 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1786
1787 def test_writelines_error(self):
1788 writer = self.MockRawIO()
1789 bufio = self.tp(writer, 8)
1790 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1791 self.assertRaises(TypeError, bufio.writelines, None)
1792 self.assertRaises(TypeError, bufio.writelines, 'abc')
1793
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794 def test_destructor(self):
1795 writer = self.MockRawIO()
1796 bufio = self.tp(writer, 8)
1797 bufio.write(b"abc")
1798 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001799 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001800 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001801
1802 def test_truncate(self):
1803 # Truncate implicitly flushes the buffer.
Hai Shi883bc632020-07-06 17:12:49 +08001804 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1805 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001806 bufio = self.tp(raw, 8)
1807 bufio.write(b"abcdef")
1808 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001809 self.assertEqual(bufio.tell(), 6)
Hai Shi883bc632020-07-06 17:12:49 +08001810 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001811 self.assertEqual(f.read(), b"abc")
1812
Nitish Chandra059f58c2018-01-28 21:30:09 +05301813 def test_truncate_after_write(self):
1814 # Ensure that truncate preserves the file position after
1815 # writes longer than the buffer size.
1816 # Issue: https://bugs.python.org/issue32228
Hai Shi883bc632020-07-06 17:12:49 +08001817 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1818 with self.open(os_helper.TESTFN, "wb") as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301819 # Fill with some buffer
1820 f.write(b'\x00' * 10000)
1821 buffer_sizes = [8192, 4096, 200]
1822 for buffer_size in buffer_sizes:
Hai Shi883bc632020-07-06 17:12:49 +08001823 with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301824 f.write(b'\x00' * (buffer_size + 1))
1825 # After write write_pos and write_end are set to 0
1826 f.read(1)
1827 # read operation makes sure that pos != raw_pos
1828 f.truncate()
1829 self.assertEqual(f.tell(), buffer_size + 2)
1830
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001831 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001832 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001833 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001834 # Write out many bytes from many threads and test they were
1835 # all flushed.
1836 N = 1000
1837 contents = bytes(range(256)) * N
1838 sizes = cycle([1, 19])
1839 n = 0
1840 queue = deque()
1841 while n < len(contents):
1842 size = next(sizes)
1843 queue.append(contents[n:n+size])
1844 n += size
1845 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001846 # We use a real file object because it allows us to
1847 # exercise situations where the GIL is released before
1848 # writing the buffer to the raw streams. This is in addition
1849 # to concurrency issues due to switching threads in the middle
1850 # of Python code.
Hai Shi883bc632020-07-06 17:12:49 +08001851 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001852 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001853 errors = []
1854 def f():
1855 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001856 while True:
1857 try:
1858 s = queue.popleft()
1859 except IndexError:
1860 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001861 bufio.write(s)
1862 except Exception as e:
1863 errors.append(e)
1864 raise
1865 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001866 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001867 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001868 self.assertFalse(errors,
1869 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001870 bufio.close()
Hai Shi883bc632020-07-06 17:12:49 +08001871 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001872 s = f.read()
1873 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001874 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001875 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001876 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001877
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001878 def test_misbehaved_io(self):
1879 rawio = self.MisbehavedRawIO()
1880 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001881 self.assertRaises(OSError, bufio.seek, 0)
1882 self.assertRaises(OSError, bufio.tell)
1883 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884
Victor Stinnerb589cef2019-06-11 03:10:59 +02001885 # Silence destructor error
1886 bufio.close = lambda: None
1887
Florent Xicluna109d5732012-07-07 17:03:22 +02001888 def test_max_buffer_size_removal(self):
1889 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001890 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001891
Benjamin Peterson68623612012-12-20 11:53:11 -06001892 def test_write_error_on_close(self):
1893 raw = self.MockRawIO()
1894 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001895 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001896 raw.write = bad_write
1897 b = self.tp(raw)
1898 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001899 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001900 self.assertTrue(b.closed)
1901
benfogle9703f092017-11-10 16:03:40 -05001902 def test_slow_close_from_thread(self):
1903 # Issue #31976
1904 rawio = self.SlowFlushRawIO()
1905 bufio = self.tp(rawio, 8)
1906 t = threading.Thread(target=bufio.close)
1907 t.start()
1908 rawio.in_flush.wait()
1909 self.assertRaises(ValueError, bufio.write, b'spam')
1910 self.assertTrue(bufio.closed)
1911 t.join()
1912
1913
Benjamin Peterson59406a92009-03-26 17:10:29 +00001914
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001915class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 tp = io.BufferedWriter
1917
Miss Islington (bot)be200c32021-09-14 11:58:19 -07001918 @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001919 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001920 def test_constructor(self):
1921 BufferedWriterTest.test_constructor(self)
1922 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001923 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001924 if sys.maxsize > 0x7FFFFFFF:
1925 rawio = self.MockRawIO()
1926 bufio = self.tp(rawio)
1927 self.assertRaises((OverflowError, MemoryError, ValueError),
1928 bufio.__init__, rawio, sys.maxsize)
1929
1930 def test_initialization(self):
1931 rawio = self.MockRawIO()
1932 bufio = self.tp(rawio)
1933 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1934 self.assertRaises(ValueError, bufio.write, b"def")
1935 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1936 self.assertRaises(ValueError, bufio.write, b"def")
1937 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1938 self.assertRaises(ValueError, bufio.write, b"def")
1939
1940 def test_garbage_collection(self):
1941 # C BufferedWriter objects are collected, and collecting them flushes
1942 # all data to disk.
1943 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001944 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1945 with warnings_helper.check_warnings(('', ResourceWarning)):
1946 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001947 f = self.tp(rawio)
1948 f.write(b"123xxx")
1949 f.x = f
1950 wr = weakref.ref(f)
1951 del f
1952 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001953 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08001954 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955 self.assertEqual(f.read(), b"123xxx")
1956
R David Murray67bfe802013-02-23 21:51:05 -05001957 def test_args_error(self):
1958 # Issue #17275
1959 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1960 self.tp(io.BytesIO(), 1024, 1024, 1024)
1961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001962
1963class PyBufferedWriterTest(BufferedWriterTest):
1964 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001965
Guido van Rossum01a27522007-03-07 01:00:12 +00001966class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001967
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001968 def test_constructor(self):
1969 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001970 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001971
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001972 def test_uninitialized(self):
1973 pair = self.tp.__new__(self.tp)
1974 del pair
1975 pair = self.tp.__new__(self.tp)
1976 self.assertRaisesRegex((ValueError, AttributeError),
1977 'uninitialized|has no attribute',
1978 pair.read, 0)
1979 self.assertRaisesRegex((ValueError, AttributeError),
1980 'uninitialized|has no attribute',
1981 pair.write, b'')
1982 pair.__init__(self.MockRawIO(), self.MockRawIO())
1983 self.assertEqual(pair.read(0), b'')
1984 self.assertEqual(pair.write(b''), 0)
1985
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001986 def test_detach(self):
1987 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1988 self.assertRaises(self.UnsupportedOperation, pair.detach)
1989
Florent Xicluna109d5732012-07-07 17:03:22 +02001990 def test_constructor_max_buffer_size_removal(self):
1991 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001992 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001993
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001994 def test_constructor_with_not_readable(self):
1995 class NotReadable(MockRawIO):
1996 def readable(self):
1997 return False
1998
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001999 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002000
2001 def test_constructor_with_not_writeable(self):
2002 class NotWriteable(MockRawIO):
2003 def writable(self):
2004 return False
2005
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002006 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002007
2008 def test_read(self):
2009 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2010
2011 self.assertEqual(pair.read(3), b"abc")
2012 self.assertEqual(pair.read(1), b"d")
2013 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002014 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2015 self.assertEqual(pair.read(None), b"abc")
2016
2017 def test_readlines(self):
2018 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2019 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2020 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2021 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002022
2023 def test_read1(self):
2024 # .read1() is delegated to the underlying reader object, so this test
2025 # can be shallow.
2026 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2027
2028 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00002029 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002030
2031 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00002032 for method in ("readinto", "readinto1"):
2033 with self.subTest(method):
2034 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002035
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002036 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002037 self.assertEqual(getattr(pair, method)(data), 5)
2038 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002039
2040 def test_write(self):
2041 w = self.MockRawIO()
2042 pair = self.tp(self.MockRawIO(), w)
2043
2044 pair.write(b"abc")
2045 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002046 buffer = bytearray(b"def")
2047 pair.write(buffer)
2048 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002049 pair.flush()
2050 self.assertEqual(w._write_stack, [b"abc", b"def"])
2051
2052 def test_peek(self):
2053 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2054
2055 self.assertTrue(pair.peek(3).startswith(b"abc"))
2056 self.assertEqual(pair.read(3), b"abc")
2057
2058 def test_readable(self):
2059 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2060 self.assertTrue(pair.readable())
2061
2062 def test_writeable(self):
2063 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2064 self.assertTrue(pair.writable())
2065
2066 def test_seekable(self):
2067 # BufferedRWPairs are never seekable, even if their readers and writers
2068 # are.
2069 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2070 self.assertFalse(pair.seekable())
2071
2072 # .flush() is delegated to the underlying writer object and has been
2073 # tested in the test_write method.
2074
2075 def test_close_and_closed(self):
2076 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2077 self.assertFalse(pair.closed)
2078 pair.close()
2079 self.assertTrue(pair.closed)
2080
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002081 def test_reader_close_error_on_close(self):
2082 def reader_close():
2083 reader_non_existing
2084 reader = self.MockRawIO()
2085 reader.close = reader_close
2086 writer = self.MockRawIO()
2087 pair = self.tp(reader, writer)
2088 with self.assertRaises(NameError) as err:
2089 pair.close()
2090 self.assertIn('reader_non_existing', str(err.exception))
2091 self.assertTrue(pair.closed)
2092 self.assertFalse(reader.closed)
2093 self.assertTrue(writer.closed)
2094
Victor Stinner472f7942019-04-12 21:58:24 +02002095 # Silence destructor error
2096 reader.close = lambda: None
2097
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002098 def test_writer_close_error_on_close(self):
2099 def writer_close():
2100 writer_non_existing
2101 reader = self.MockRawIO()
2102 writer = self.MockRawIO()
2103 writer.close = writer_close
2104 pair = self.tp(reader, writer)
2105 with self.assertRaises(NameError) as err:
2106 pair.close()
2107 self.assertIn('writer_non_existing', str(err.exception))
2108 self.assertFalse(pair.closed)
2109 self.assertTrue(reader.closed)
2110 self.assertFalse(writer.closed)
2111
Victor Stinner472f7942019-04-12 21:58:24 +02002112 # Silence destructor error
2113 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002114 writer = None
2115
Victor Stinner212646c2019-06-14 18:03:22 +02002116 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002117 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002118 # Ignore BufferedRWPair unraisable exception
2119 with support.catch_unraisable_exception():
2120 pair = None
2121 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002122 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002123
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002124 def test_reader_writer_close_error_on_close(self):
2125 def reader_close():
2126 reader_non_existing
2127 def writer_close():
2128 writer_non_existing
2129 reader = self.MockRawIO()
2130 reader.close = reader_close
2131 writer = self.MockRawIO()
2132 writer.close = writer_close
2133 pair = self.tp(reader, writer)
2134 with self.assertRaises(NameError) as err:
2135 pair.close()
2136 self.assertIn('reader_non_existing', str(err.exception))
2137 self.assertIsInstance(err.exception.__context__, NameError)
2138 self.assertIn('writer_non_existing', str(err.exception.__context__))
2139 self.assertFalse(pair.closed)
2140 self.assertFalse(reader.closed)
2141 self.assertFalse(writer.closed)
2142
Victor Stinner472f7942019-04-12 21:58:24 +02002143 # Silence destructor error
2144 reader.close = lambda: None
2145 writer.close = lambda: None
2146
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002147 def test_isatty(self):
2148 class SelectableIsAtty(MockRawIO):
2149 def __init__(self, isatty):
2150 MockRawIO.__init__(self)
2151 self._isatty = isatty
2152
2153 def isatty(self):
2154 return self._isatty
2155
2156 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2157 self.assertFalse(pair.isatty())
2158
2159 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2160 self.assertTrue(pair.isatty())
2161
2162 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2163 self.assertTrue(pair.isatty())
2164
2165 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2166 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002167
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002168 def test_weakref_clearing(self):
2169 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2170 ref = weakref.ref(brw)
2171 brw = None
2172 ref = None # Shouldn't segfault.
2173
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002174class CBufferedRWPairTest(BufferedRWPairTest):
2175 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002176
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002177class PyBufferedRWPairTest(BufferedRWPairTest):
2178 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002179
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002180
2181class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2182 read_mode = "rb+"
2183 write_mode = "wb+"
2184
2185 def test_constructor(self):
2186 BufferedReaderTest.test_constructor(self)
2187 BufferedWriterTest.test_constructor(self)
2188
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002189 def test_uninitialized(self):
2190 BufferedReaderTest.test_uninitialized(self)
2191 BufferedWriterTest.test_uninitialized(self)
2192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193 def test_read_and_write(self):
2194 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002195 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002196
2197 self.assertEqual(b"as", rw.read(2))
2198 rw.write(b"ddd")
2199 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002200 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002201 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002202 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002204 def test_seek_and_tell(self):
2205 raw = self.BytesIO(b"asdfghjkl")
2206 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002207
Ezio Melottib3aedd42010-11-20 19:04:17 +00002208 self.assertEqual(b"as", rw.read(2))
2209 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002210 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002211 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002212
Antoine Pitroue05565e2011-08-20 14:39:23 +02002213 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002214 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002215 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002216 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002217 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002218 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002219 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002220 self.assertEqual(7, rw.tell())
2221 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002222 rw.flush()
2223 self.assertEqual(b"asdf123fl", raw.getvalue())
2224
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002225 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002227 def check_flush_and_read(self, read_func):
2228 raw = self.BytesIO(b"abcdefghi")
2229 bufio = self.tp(raw)
2230
Ezio Melottib3aedd42010-11-20 19:04:17 +00002231 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002232 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002233 self.assertEqual(b"ef", read_func(bufio, 2))
2234 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002235 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(6, bufio.tell())
2237 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238 raw.seek(0, 0)
2239 raw.write(b"XYZ")
2240 # flush() resets the read buffer
2241 bufio.flush()
2242 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002243 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002244
2245 def test_flush_and_read(self):
2246 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2247
2248 def test_flush_and_readinto(self):
2249 def _readinto(bufio, n=-1):
2250 b = bytearray(n if n >= 0 else 9999)
2251 n = bufio.readinto(b)
2252 return bytes(b[:n])
2253 self.check_flush_and_read(_readinto)
2254
2255 def test_flush_and_peek(self):
2256 def _peek(bufio, n=-1):
2257 # This relies on the fact that the buffer can contain the whole
2258 # raw stream, otherwise peek() can return less.
2259 b = bufio.peek(n)
2260 if n != -1:
2261 b = b[:n]
2262 bufio.seek(len(b), 1)
2263 return b
2264 self.check_flush_and_read(_peek)
2265
2266 def test_flush_and_write(self):
2267 raw = self.BytesIO(b"abcdefghi")
2268 bufio = self.tp(raw)
2269
2270 bufio.write(b"123")
2271 bufio.flush()
2272 bufio.write(b"45")
2273 bufio.flush()
2274 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002275 self.assertEqual(b"12345fghi", raw.getvalue())
2276 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002277
2278 def test_threads(self):
2279 BufferedReaderTest.test_threads(self)
2280 BufferedWriterTest.test_threads(self)
2281
2282 def test_writes_and_peek(self):
2283 def _peek(bufio):
2284 bufio.peek(1)
2285 self.check_writes(_peek)
2286 def _peek(bufio):
2287 pos = bufio.tell()
2288 bufio.seek(-1, 1)
2289 bufio.peek(1)
2290 bufio.seek(pos, 0)
2291 self.check_writes(_peek)
2292
2293 def test_writes_and_reads(self):
2294 def _read(bufio):
2295 bufio.seek(-1, 1)
2296 bufio.read(1)
2297 self.check_writes(_read)
2298
2299 def test_writes_and_read1s(self):
2300 def _read1(bufio):
2301 bufio.seek(-1, 1)
2302 bufio.read1(1)
2303 self.check_writes(_read1)
2304
2305 def test_writes_and_readintos(self):
2306 def _read(bufio):
2307 bufio.seek(-1, 1)
2308 bufio.readinto(bytearray(1))
2309 self.check_writes(_read)
2310
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002311 def test_write_after_readahead(self):
2312 # Issue #6629: writing after the buffer was filled by readahead should
2313 # first rewind the raw stream.
2314 for overwrite_size in [1, 5]:
2315 raw = self.BytesIO(b"A" * 10)
2316 bufio = self.tp(raw, 4)
2317 # Trigger readahead
2318 self.assertEqual(bufio.read(1), b"A")
2319 self.assertEqual(bufio.tell(), 1)
2320 # Overwriting should rewind the raw stream if it needs so
2321 bufio.write(b"B" * overwrite_size)
2322 self.assertEqual(bufio.tell(), overwrite_size + 1)
2323 # If the write size was smaller than the buffer size, flush() and
2324 # check that rewind happens.
2325 bufio.flush()
2326 self.assertEqual(bufio.tell(), overwrite_size + 1)
2327 s = raw.getvalue()
2328 self.assertEqual(s,
2329 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2330
Antoine Pitrou7c404892011-05-13 00:13:33 +02002331 def test_write_rewind_write(self):
2332 # Various combinations of reading / writing / seeking backwards / writing again
2333 def mutate(bufio, pos1, pos2):
2334 assert pos2 >= pos1
2335 # Fill the buffer
2336 bufio.seek(pos1)
2337 bufio.read(pos2 - pos1)
2338 bufio.write(b'\x02')
2339 # This writes earlier than the previous write, but still inside
2340 # the buffer.
2341 bufio.seek(pos1)
2342 bufio.write(b'\x01')
2343
2344 b = b"\x80\x81\x82\x83\x84"
2345 for i in range(0, len(b)):
2346 for j in range(i, len(b)):
2347 raw = self.BytesIO(b)
2348 bufio = self.tp(raw, 100)
2349 mutate(bufio, i, j)
2350 bufio.flush()
2351 expected = bytearray(b)
2352 expected[j] = 2
2353 expected[i] = 1
2354 self.assertEqual(raw.getvalue(), expected,
2355 "failed result for i=%d, j=%d" % (i, j))
2356
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002357 def test_truncate_after_read_or_write(self):
2358 raw = self.BytesIO(b"A" * 10)
2359 bufio = self.tp(raw, 100)
2360 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2361 self.assertEqual(bufio.truncate(), 2)
2362 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2363 self.assertEqual(bufio.truncate(), 4)
2364
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 def test_misbehaved_io(self):
2366 BufferedReaderTest.test_misbehaved_io(self)
2367 BufferedWriterTest.test_misbehaved_io(self)
2368
Antoine Pitroue05565e2011-08-20 14:39:23 +02002369 def test_interleaved_read_write(self):
2370 # Test for issue #12213
2371 with self.BytesIO(b'abcdefgh') as raw:
2372 with self.tp(raw, 100) as f:
2373 f.write(b"1")
2374 self.assertEqual(f.read(1), b'b')
2375 f.write(b'2')
2376 self.assertEqual(f.read1(1), b'd')
2377 f.write(b'3')
2378 buf = bytearray(1)
2379 f.readinto(buf)
2380 self.assertEqual(buf, b'f')
2381 f.write(b'4')
2382 self.assertEqual(f.peek(1), b'h')
2383 f.flush()
2384 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2385
2386 with self.BytesIO(b'abc') as raw:
2387 with self.tp(raw, 100) as f:
2388 self.assertEqual(f.read(1), b'a')
2389 f.write(b"2")
2390 self.assertEqual(f.read(1), b'c')
2391 f.flush()
2392 self.assertEqual(raw.getvalue(), b'a2c')
2393
2394 def test_interleaved_readline_write(self):
2395 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2396 with self.tp(raw) as f:
2397 f.write(b'1')
2398 self.assertEqual(f.readline(), b'b\n')
2399 f.write(b'2')
2400 self.assertEqual(f.readline(), b'def\n')
2401 f.write(b'3')
2402 self.assertEqual(f.readline(), b'\n')
2403 f.flush()
2404 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2405
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002406 # You can't construct a BufferedRandom over a non-seekable stream.
2407 test_unseekable = None
2408
Berker Peksagfd5116c2020-02-21 20:57:26 +03002409 # writable() returns True, so there's no point to test it over
2410 # a writable stream.
2411 test_truncate_on_read_only = None
2412
R David Murray67bfe802013-02-23 21:51:05 -05002413
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002414class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002415 tp = io.BufferedRandom
2416
Miss Islington (bot)be200c32021-09-14 11:58:19 -07002417 @unittest.skipIf(MEMORY_SANITIZER or ADDRESS_SANITIZER, "sanitizer defaults to crashing "
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002418 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002419 def test_constructor(self):
2420 BufferedRandomTest.test_constructor(self)
2421 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002422 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002423 if sys.maxsize > 0x7FFFFFFF:
2424 rawio = self.MockRawIO()
2425 bufio = self.tp(rawio)
2426 self.assertRaises((OverflowError, MemoryError, ValueError),
2427 bufio.__init__, rawio, sys.maxsize)
2428
2429 def test_garbage_collection(self):
2430 CBufferedReaderTest.test_garbage_collection(self)
2431 CBufferedWriterTest.test_garbage_collection(self)
2432
R David Murray67bfe802013-02-23 21:51:05 -05002433 def test_args_error(self):
2434 # Issue #17275
2435 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2436 self.tp(io.BytesIO(), 1024, 1024, 1024)
2437
2438
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002439class PyBufferedRandomTest(BufferedRandomTest):
2440 tp = pyio.BufferedRandom
2441
2442
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002443# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2444# properties:
2445# - A single output character can correspond to many bytes of input.
2446# - The number of input bytes to complete the character can be
2447# undetermined until the last input byte is received.
2448# - The number of input bytes can vary depending on previous input.
2449# - A single input byte can correspond to many characters of output.
2450# - The number of output characters can be undetermined until the
2451# last input byte is received.
2452# - The number of output characters can vary depending on previous input.
2453
2454class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2455 """
2456 For testing seek/tell behavior with a stateful, buffering decoder.
2457
2458 Input is a sequence of words. Words may be fixed-length (length set
2459 by input) or variable-length (period-terminated). In variable-length
2460 mode, extra periods are ignored. Possible words are:
2461 - 'i' followed by a number sets the input length, I (maximum 99).
2462 When I is set to 0, words are space-terminated.
2463 - 'o' followed by a number sets the output length, O (maximum 99).
2464 - Any other word is converted into a word followed by a period on
2465 the output. The output word consists of the input word truncated
2466 or padded out with hyphens to make its length equal to O. If O
2467 is 0, the word is output verbatim without truncating or padding.
2468 I and O are initially set to 1. When I changes, any buffered input is
2469 re-scanned according to the new I. EOF also terminates the last word.
2470 """
2471
2472 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002473 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002474 self.reset()
2475
2476 def __repr__(self):
2477 return '<SID %x>' % id(self)
2478
2479 def reset(self):
2480 self.i = 1
2481 self.o = 1
2482 self.buffer = bytearray()
2483
2484 def getstate(self):
2485 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2486 return bytes(self.buffer), i*100 + o
2487
2488 def setstate(self, state):
2489 buffer, io = state
2490 self.buffer = bytearray(buffer)
2491 i, o = divmod(io, 100)
2492 self.i, self.o = i ^ 1, o ^ 1
2493
2494 def decode(self, input, final=False):
2495 output = ''
2496 for b in input:
2497 if self.i == 0: # variable-length, terminated with period
2498 if b == ord('.'):
2499 if self.buffer:
2500 output += self.process_word()
2501 else:
2502 self.buffer.append(b)
2503 else: # fixed-length, terminate after self.i bytes
2504 self.buffer.append(b)
2505 if len(self.buffer) == self.i:
2506 output += self.process_word()
2507 if final and self.buffer: # EOF terminates the last word
2508 output += self.process_word()
2509 return output
2510
2511 def process_word(self):
2512 output = ''
2513 if self.buffer[0] == ord('i'):
2514 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2515 elif self.buffer[0] == ord('o'):
2516 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2517 else:
2518 output = self.buffer.decode('ascii')
2519 if len(output) < self.o:
2520 output += '-'*self.o # pad out with hyphens
2521 if self.o:
2522 output = output[:self.o] # truncate to output length
2523 output += '.'
2524 self.buffer = bytearray()
2525 return output
2526
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002527 codecEnabled = False
2528
Hai Shi14cdc212020-10-26 02:38:33 +08002529
2530# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
2531# when registering codecs and cleanup functions.
2532def lookupTestDecoder(name):
2533 if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
2534 latin1 = codecs.lookup('latin-1')
2535 return codecs.CodecInfo(
2536 name='test_decoder', encode=latin1.encode, decode=None,
2537 incrementalencoder=None,
2538 streamreader=None, streamwriter=None,
2539 incrementaldecoder=StatefulIncrementalDecoder)
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002540
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002541
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002542class StatefulIncrementalDecoderTest(unittest.TestCase):
2543 """
2544 Make sure the StatefulIncrementalDecoder actually works.
2545 """
2546
2547 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002548 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002549 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002550 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002551 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002552 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002553 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002554 # I=0, O=6 (variable-length input, fixed-length output)
2555 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2556 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002557 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002558 # I=6, O=3 (fixed-length input > fixed-length output)
2559 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2560 # I=0, then 3; O=29, then 15 (with longer output)
2561 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2562 'a----------------------------.' +
2563 'b----------------------------.' +
2564 'cde--------------------------.' +
2565 'abcdefghijabcde.' +
2566 'a.b------------.' +
2567 '.c.------------.' +
2568 'd.e------------.' +
2569 'k--------------.' +
2570 'l--------------.' +
2571 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002572 ]
2573
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002574 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002575 # Try a few one-shot test cases.
2576 for input, eof, output in self.test_cases:
2577 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002578 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002579
2580 # Also test an unfinished decode, followed by forcing EOF.
2581 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002582 self.assertEqual(d.decode(b'oiabcd'), '')
2583 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002584
2585class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002586
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002587 def setUp(self):
2588 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2589 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Hai Shi883bc632020-07-06 17:12:49 +08002590 os_helper.unlink(os_helper.TESTFN)
Hai Shi14cdc212020-10-26 02:38:33 +08002591 codecs.register(lookupTestDecoder)
2592 self.addCleanup(codecs.unregister, lookupTestDecoder)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002593
Guido van Rossumd0712812007-04-11 16:32:43 +00002594 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08002595 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002597 def test_constructor(self):
2598 r = self.BytesIO(b"\xc3\xa9\n\n")
2599 b = self.BufferedReader(r, 1000)
Inada Naoki58cffba2021-04-01 11:25:04 +09002600 t = self.TextIOWrapper(b, encoding="utf-8")
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002601 t.__init__(b, encoding="latin-1", newline="\r\n")
2602 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002603 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002604 t.__init__(b, encoding="utf-8", line_buffering=True)
2605 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002606 self.assertEqual(t.line_buffering, True)
2607 self.assertEqual("\xe9\n", t.readline())
Inada Naokifb786922021-04-06 11:18:41 +09002608 self.assertRaises(TypeError, t.__init__, b, encoding="utf-8", newline=42)
2609 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002610
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002611 def test_uninitialized(self):
2612 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2613 del t
2614 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2615 self.assertRaises(Exception, repr, t)
2616 self.assertRaisesRegex((ValueError, AttributeError),
2617 'uninitialized|has no attribute',
2618 t.read, 0)
Inada Naoki58cffba2021-04-01 11:25:04 +09002619 t.__init__(self.MockRawIO(), encoding="utf-8")
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002620 self.assertEqual(t.read(0), '')
2621
Nick Coghlana9b15242014-02-04 22:11:18 +10002622 def test_non_text_encoding_codecs_are_rejected(self):
2623 # Ensure the constructor complains if passed a codec that isn't
2624 # marked as a text encoding
2625 # http://bugs.python.org/issue20404
2626 r = self.BytesIO()
2627 b = self.BufferedWriter(r)
2628 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2629 self.TextIOWrapper(b, encoding="hex")
2630
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002631 def test_detach(self):
2632 r = self.BytesIO()
2633 b = self.BufferedWriter(r)
Inada Naoki58cffba2021-04-01 11:25:04 +09002634 t = self.TextIOWrapper(b, encoding="ascii")
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002635 self.assertIs(t.detach(), b)
2636
2637 t = self.TextIOWrapper(b, encoding="ascii")
2638 t.write("howdy")
2639 self.assertFalse(r.getvalue())
2640 t.detach()
2641 self.assertEqual(r.getvalue(), b"howdy")
2642 self.assertRaises(ValueError, t.detach)
2643
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002644 # Operations independent of the detached stream should still work
2645 repr(t)
2646 self.assertEqual(t.encoding, "ascii")
2647 self.assertEqual(t.errors, "strict")
2648 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002649 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002650
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002651 def test_repr(self):
2652 raw = self.BytesIO("hello".encode("utf-8"))
2653 b = self.BufferedReader(raw)
2654 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002655 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002656 self.assertRegex(repr(t),
2657 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002658 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002659 self.assertRegex(repr(t),
2660 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002661 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002662 self.assertRegex(repr(t),
2663 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002664 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002665 self.assertRegex(repr(t),
2666 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002667
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002668 t.buffer.detach()
2669 repr(t) # Should not raise an exception
2670
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002671 def test_recursive_repr(self):
2672 # Issue #25455
2673 raw = self.BytesIO()
Inada Naoki58cffba2021-04-01 11:25:04 +09002674 t = self.TextIOWrapper(raw, encoding="utf-8")
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002675 with support.swap_attr(raw, 'name', t):
2676 try:
2677 repr(t) # Should not crash
2678 except RuntimeError:
2679 pass
2680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002681 def test_line_buffering(self):
2682 r = self.BytesIO()
2683 b = self.BufferedWriter(r, 1000)
Inada Naoki58cffba2021-04-01 11:25:04 +09002684 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002685 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002686 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002687 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002688 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002689 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002690 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002691
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002692 def test_reconfigure_line_buffering(self):
2693 r = self.BytesIO()
2694 b = self.BufferedWriter(r, 1000)
Inada Naoki58cffba2021-04-01 11:25:04 +09002695 t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002696 t.write("AB\nC")
2697 self.assertEqual(r.getvalue(), b"")
2698
2699 t.reconfigure(line_buffering=True) # implicit flush
2700 self.assertEqual(r.getvalue(), b"AB\nC")
2701 t.write("DEF\nG")
2702 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2703 t.write("H")
2704 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2705 t.reconfigure(line_buffering=False) # implicit flush
2706 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2707 t.write("IJ")
2708 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2709
2710 # Keeping default value
2711 t.reconfigure()
2712 t.reconfigure(line_buffering=None)
2713 self.assertEqual(t.line_buffering, False)
2714 t.reconfigure(line_buffering=True)
2715 t.reconfigure()
2716 t.reconfigure(line_buffering=None)
2717 self.assertEqual(t.line_buffering, True)
2718
Victor Stinner91106cd2017-12-13 12:29:09 +01002719 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002720 def test_default_encoding(self):
2721 old_environ = dict(os.environ)
2722 try:
2723 # try to get a user preferred encoding different than the current
2724 # locale encoding to check that TextIOWrapper() uses the current
2725 # locale encoding and not the user preferred encoding
2726 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2727 if key in os.environ:
2728 del os.environ[key]
2729
2730 current_locale_encoding = locale.getpreferredencoding(False)
2731 b = self.BytesIO()
Inada Naoki58cffba2021-04-01 11:25:04 +09002732 with warnings.catch_warnings():
2733 warnings.simplefilter("ignore", EncodingWarning)
2734 t = self.TextIOWrapper(b)
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002735 self.assertEqual(t.encoding, current_locale_encoding)
2736 finally:
2737 os.environ.clear()
2738 os.environ.update(old_environ)
2739
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002740 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002741 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002742 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002743 # Issue 15989
2744 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002745 b = self.BytesIO()
2746 b.fileno = lambda: _testcapi.INT_MAX + 1
Inada Naoki58cffba2021-04-01 11:25:04 +09002747 self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002748 b.fileno = lambda: _testcapi.UINT_MAX + 1
Inada Naoki58cffba2021-04-01 11:25:04 +09002749 self.assertRaises(OverflowError, self.TextIOWrapper, b, encoding="locale")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002751 def test_encoding(self):
2752 # Check the encoding attribute is always set, and valid
2753 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002754 t = self.TextIOWrapper(b, encoding="utf-8")
2755 self.assertEqual(t.encoding, "utf-8")
Inada Naoki58cffba2021-04-01 11:25:04 +09002756 with warnings.catch_warnings():
2757 warnings.simplefilter("ignore", EncodingWarning)
2758 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002759 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 codecs.lookup(t.encoding)
2761
2762 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002763 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002764 b = self.BytesIO(b"abc\n\xff\n")
2765 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002766 self.assertRaises(UnicodeError, t.read)
2767 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002768 b = self.BytesIO(b"abc\n\xff\n")
2769 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002770 self.assertRaises(UnicodeError, t.read)
2771 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002772 b = self.BytesIO(b"abc\n\xff\n")
2773 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002774 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002775 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002776 b = self.BytesIO(b"abc\n\xff\n")
2777 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002778 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002780 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002781 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002782 b = self.BytesIO()
2783 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002784 self.assertRaises(UnicodeError, t.write, "\xff")
2785 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 b = self.BytesIO()
2787 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002788 self.assertRaises(UnicodeError, t.write, "\xff")
2789 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002790 b = self.BytesIO()
2791 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002792 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002793 t.write("abc\xffdef\n")
2794 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002795 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002796 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002797 b = self.BytesIO()
2798 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002799 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002800 t.write("abc\xffdef\n")
2801 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002802 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002803
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002804 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002805 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2806
2807 tests = [
2808 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002809 [ '', input_lines ],
2810 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2811 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2812 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002813 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002814 encodings = (
2815 'utf-8', 'latin-1',
2816 'utf-16', 'utf-16-le', 'utf-16-be',
2817 'utf-32', 'utf-32-le', 'utf-32-be',
2818 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002819
Guido van Rossum8358db22007-08-18 21:39:55 +00002820 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002821 # character in TextIOWrapper._pending_line.
2822 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002823 # XXX: str.encode() should return bytes
2824 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002825 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002826 for bufsize in range(1, 10):
2827 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002828 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2829 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002830 encoding=encoding)
2831 if do_reads:
2832 got_lines = []
2833 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002834 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002835 if c2 == '':
2836 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002837 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002838 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002839 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002840 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002841
2842 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002843 self.assertEqual(got_line, exp_line)
2844 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002845
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002846 def test_newlines_input(self):
2847 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002848 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2849 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002850 (None, normalized.decode("ascii").splitlines(keepends=True)),
2851 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002852 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2853 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2854 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002855 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002856 buf = self.BytesIO(testdata)
2857 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002858 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002859 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002860 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002862 def test_newlines_output(self):
2863 testdict = {
2864 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2865 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2866 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2867 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2868 }
2869 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2870 for newline, expected in tests:
2871 buf = self.BytesIO()
2872 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2873 txt.write("AAA\nB")
2874 txt.write("BB\nCCC\n")
2875 txt.write("X\rY\r\nZ")
2876 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002877 self.assertEqual(buf.closed, False)
2878 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002879
2880 def test_destructor(self):
2881 l = []
2882 base = self.BytesIO
2883 class MyBytesIO(base):
2884 def close(self):
2885 l.append(self.getvalue())
2886 base.close(self)
2887 b = MyBytesIO()
2888 t = self.TextIOWrapper(b, encoding="ascii")
2889 t.write("abc")
2890 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002891 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002892 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893
2894 def test_override_destructor(self):
2895 record = []
2896 class MyTextIO(self.TextIOWrapper):
2897 def __del__(self):
2898 record.append(1)
2899 try:
2900 f = super().__del__
2901 except AttributeError:
2902 pass
2903 else:
2904 f()
2905 def close(self):
2906 record.append(2)
2907 super().close()
2908 def flush(self):
2909 record.append(3)
2910 super().flush()
2911 b = self.BytesIO()
2912 t = MyTextIO(b, encoding="ascii")
2913 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002914 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002915 self.assertEqual(record, [1, 2, 3])
2916
2917 def test_error_through_destructor(self):
2918 # Test that the exception state is not modified by a destructor,
2919 # even if close() fails.
2920 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002921 with support.catch_unraisable_exception() as cm:
2922 with self.assertRaises(AttributeError):
Inada Naoki58cffba2021-04-01 11:25:04 +09002923 self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002924
2925 if not IOBASE_EMITS_UNRAISABLE:
2926 self.assertIsNone(cm.unraisable)
2927 elif cm.unraisable is not None:
2928 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002929
Guido van Rossum9b76da62007-04-11 01:09:03 +00002930 # Systematic tests of the text I/O API
2931
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002932 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002933 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002934 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Hai Shi883bc632020-07-06 17:12:49 +08002935 f = self.open(os_helper.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002936 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002937 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002938 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08002939 f = self.open(os_helper.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002940 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002941 self.assertEqual(f.tell(), 0)
2942 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002943 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002944 self.assertEqual(f.seek(0), 0)
2945 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002946 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002947 self.assertEqual(f.read(2), "ab")
2948 self.assertEqual(f.read(1), "c")
2949 self.assertEqual(f.read(1), "")
2950 self.assertEqual(f.read(), "")
2951 self.assertEqual(f.tell(), cookie)
2952 self.assertEqual(f.seek(0), 0)
2953 self.assertEqual(f.seek(0, 2), cookie)
2954 self.assertEqual(f.write("def"), 3)
2955 self.assertEqual(f.seek(cookie), cookie)
2956 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002957 if enc.startswith("utf"):
2958 self.multi_line_test(f, enc)
2959 f.close()
2960
2961 def multi_line_test(self, f, enc):
2962 f.seek(0)
2963 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002964 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002965 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002966 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002967 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002968 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002969 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002970 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002971 wlines.append((f.tell(), line))
2972 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002973 f.seek(0)
2974 rlines = []
2975 while True:
2976 pos = f.tell()
2977 line = f.readline()
2978 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002979 break
2980 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002981 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002982
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002983 def test_telling(self):
Hai Shi883bc632020-07-06 17:12:49 +08002984 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002985 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002986 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002987 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002988 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002989 p2 = f.tell()
2990 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002991 self.assertEqual(f.tell(), p0)
2992 self.assertEqual(f.readline(), "\xff\n")
2993 self.assertEqual(f.tell(), p1)
2994 self.assertEqual(f.readline(), "\xff\n")
2995 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002996 f.seek(0)
2997 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002998 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002999 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003000 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00003001 f.close()
3002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003003 def test_seeking(self):
3004 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003005 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00003006 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00003007 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003008 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00003009 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00003010 suffix = bytes(u_suffix.encode("utf-8"))
3011 line = prefix + suffix
Hai Shi883bc632020-07-06 17:12:49 +08003012 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003013 f.write(line*2)
Hai Shi883bc632020-07-06 17:12:49 +08003014 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003015 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003016 self.assertEqual(s, str(prefix, "ascii"))
3017 self.assertEqual(f.tell(), prefix_size)
3018 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00003019
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003020 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00003021 # Regression test for a specific bug
3022 data = b'\xe0\xbf\xbf\n'
Hai Shi883bc632020-07-06 17:12:49 +08003023 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003024 f.write(data)
Hai Shi883bc632020-07-06 17:12:49 +08003025 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003026 f._CHUNK_SIZE # Just test that it exists
3027 f._CHUNK_SIZE = 2
3028 f.readline()
3029 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003030
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003031 def test_seek_and_tell(self):
3032 #Test seek/tell using the StatefulIncrementalDecoder.
3033 # Make test faster by doing smaller seeks
3034 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003035
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003036 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003037 """Tell/seek to various points within a data stream and ensure
3038 that the decoded data returned by read() is consistent."""
Hai Shi883bc632020-07-06 17:12:49 +08003039 f = self.open(os_helper.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003040 f.write(data)
3041 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08003042 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003043 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003044 decoded = f.read()
3045 f.close()
3046
Neal Norwitze2b07052008-03-18 19:52:05 +00003047 for i in range(min_pos, len(decoded) + 1): # seek positions
3048 for j in [1, 5, len(decoded) - i]: # read lengths
Hai Shi883bc632020-07-06 17:12:49 +08003049 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003050 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003051 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003052 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003053 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003054 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003055 f.close()
3056
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003057 # Enable the test decoder.
3058 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003059
3060 # Run the tests.
3061 try:
3062 # Try each test case.
3063 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003064 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003065
3066 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003067 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3068 offset = CHUNK_SIZE - len(input)//2
3069 prefix = b'.'*offset
3070 # Don't bother seeking into the prefix (takes too long).
3071 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003072 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003073
3074 # Ensure our test decoder won't interfere with subsequent tests.
3075 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003076 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003077
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003078 def test_multibyte_seek_and_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +08003079 f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003080 f.write("AB\n\u3046\u3048\n")
3081 f.close()
3082
Hai Shi883bc632020-07-06 17:12:49 +08003083 f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003084 self.assertEqual(f.readline(), "AB\n")
3085 p0 = f.tell()
3086 self.assertEqual(f.readline(), "\u3046\u3048\n")
3087 p1 = f.tell()
3088 f.seek(p0)
3089 self.assertEqual(f.readline(), "\u3046\u3048\n")
3090 self.assertEqual(f.tell(), p1)
3091 f.close()
3092
3093 def test_seek_with_encoder_state(self):
Hai Shi883bc632020-07-06 17:12:49 +08003094 f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003095 f.write("\u00e6\u0300")
3096 p0 = f.tell()
3097 f.write("\u00e6")
3098 f.seek(p0)
3099 f.write("\u0300")
3100 f.close()
3101
Hai Shi883bc632020-07-06 17:12:49 +08003102 f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003103 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3104 f.close()
3105
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003106 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003107 data = "1234567890"
3108 tests = ("utf-16",
3109 "utf-16-le",
3110 "utf-16-be",
3111 "utf-32",
3112 "utf-32-le",
3113 "utf-32-be")
3114 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003115 buf = self.BytesIO()
3116 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003117 # Check if the BOM is written only once (see issue1753).
3118 f.write(data)
3119 f.write(data)
3120 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003121 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003122 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003123 self.assertEqual(f.read(), data * 2)
3124 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003125
Benjamin Petersona1b49012009-03-31 23:11:32 +00003126 def test_unreadable(self):
3127 class UnReadable(self.BytesIO):
3128 def readable(self):
3129 return False
Inada Naoki58cffba2021-04-01 11:25:04 +09003130 txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003131 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003133 def test_read_one_by_one(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003134 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003135 reads = ""
3136 while True:
3137 c = txt.read(1)
3138 if not c:
3139 break
3140 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003141 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003142
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003143 def test_readlines(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003144 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003145 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3146 txt.seek(0)
3147 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3148 txt.seek(0)
3149 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3150
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003151 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003152 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003153 # make sure "\r\n" straddles 128 char boundary.
Inada Naoki58cffba2021-04-01 11:25:04 +09003154 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003155 reads = ""
3156 while True:
3157 c = txt.read(128)
3158 if not c:
3159 break
3160 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003161 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003162
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003163 def test_writelines(self):
3164 l = ['ab', 'cd', 'ef']
3165 buf = self.BytesIO()
Inada Naoki58cffba2021-04-01 11:25:04 +09003166 txt = self.TextIOWrapper(buf, encoding="utf-8")
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003167 txt.writelines(l)
3168 txt.flush()
3169 self.assertEqual(buf.getvalue(), b'abcdef')
3170
3171 def test_writelines_userlist(self):
3172 l = UserList(['ab', 'cd', 'ef'])
3173 buf = self.BytesIO()
Inada Naoki58cffba2021-04-01 11:25:04 +09003174 txt = self.TextIOWrapper(buf, encoding="utf-8")
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003175 txt.writelines(l)
3176 txt.flush()
3177 self.assertEqual(buf.getvalue(), b'abcdef')
3178
3179 def test_writelines_error(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003180 txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003181 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3182 self.assertRaises(TypeError, txt.writelines, None)
3183 self.assertRaises(TypeError, txt.writelines, b'abc')
3184
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003185 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003186 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003187
3188 # read one char at a time
3189 reads = ""
3190 while True:
3191 c = txt.read(1)
3192 if not c:
3193 break
3194 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003195 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003196
3197 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003198 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003199 txt._CHUNK_SIZE = 4
3200
3201 reads = ""
3202 while True:
3203 c = txt.read(4)
3204 if not c:
3205 break
3206 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003207 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003208
3209 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003210 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003211 txt._CHUNK_SIZE = 4
3212
3213 reads = txt.read(4)
3214 reads += txt.read(4)
3215 reads += txt.readline()
3216 reads += txt.readline()
3217 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003218 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003219
3220 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003221 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003222 txt._CHUNK_SIZE = 4
3223
3224 reads = txt.read(4)
3225 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003226 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003227
3228 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003229 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003230 txt._CHUNK_SIZE = 4
3231
3232 reads = txt.read(4)
3233 pos = txt.tell()
3234 txt.seek(0)
3235 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003236 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003237
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003238 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003239 buffer = self.BytesIO(self.testdata)
3240 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003241
3242 self.assertEqual(buffer.seekable(), txt.seekable())
3243
Antoine Pitroue4501852009-05-14 18:55:55 +00003244 def test_append_bom(self):
3245 # The BOM is not written again when appending to a non-empty file
Hai Shi883bc632020-07-06 17:12:49 +08003246 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003247 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3248 with self.open(filename, 'w', encoding=charset) as f:
3249 f.write('aaa')
3250 pos = f.tell()
3251 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003252 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003253
3254 with self.open(filename, 'a', encoding=charset) as f:
3255 f.write('xxx')
3256 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003257 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003258
3259 def test_seek_bom(self):
3260 # Same test, but when seeking manually
Hai Shi883bc632020-07-06 17:12:49 +08003261 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003262 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3263 with self.open(filename, 'w', encoding=charset) as f:
3264 f.write('aaa')
3265 pos = f.tell()
3266 with self.open(filename, 'r+', encoding=charset) as f:
3267 f.seek(pos)
3268 f.write('zzz')
3269 f.seek(0)
3270 f.write('bbb')
3271 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003272 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003273
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003274 def test_seek_append_bom(self):
3275 # Same test, but first seek to the start and then to the end
Hai Shi883bc632020-07-06 17:12:49 +08003276 filename = os_helper.TESTFN
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003277 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3278 with self.open(filename, 'w', encoding=charset) as f:
3279 f.write('aaa')
3280 with self.open(filename, 'a', encoding=charset) as f:
3281 f.seek(0)
3282 f.seek(0, self.SEEK_END)
3283 f.write('xxx')
3284 with self.open(filename, 'rb') as f:
3285 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3286
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003287 def test_errors_property(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003288 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003289 self.assertEqual(f.errors, "strict")
Inada Naoki58cffba2021-04-01 11:25:04 +09003290 with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003291 self.assertEqual(f.errors, "replace")
3292
Brett Cannon31f59292011-02-21 19:29:56 +00003293 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003294 def test_threads_write(self):
3295 # Issue6750: concurrent writes could duplicate data
3296 event = threading.Event()
Inada Naoki58cffba2021-04-01 11:25:04 +09003297 with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003298 def run(n):
3299 text = "Thread%03d\n" % n
3300 event.wait()
3301 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003302 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003303 for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08003304 with threading_helper.start_threads(threads, event.set):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003305 time.sleep(0.02)
Inada Naoki58cffba2021-04-01 11:25:04 +09003306 with self.open(os_helper.TESTFN, encoding="utf-8") as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003307 content = f.read()
3308 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003309 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003310
Antoine Pitrou6be88762010-05-03 16:48:20 +00003311 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003312 # Test that text file is closed despite failed flush
3313 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003314 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003315 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003316 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003317 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003318 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003319 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003320 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003321 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003322 self.assertTrue(txt.buffer.closed)
3323 self.assertTrue(closed) # flush() called
3324 self.assertFalse(closed[0]) # flush() called before file closed
3325 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003326 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003327
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003328 def test_close_error_on_close(self):
3329 buffer = self.BytesIO(self.testdata)
3330 def bad_flush():
3331 raise OSError('flush')
3332 def bad_close():
3333 raise OSError('close')
3334 buffer.close = bad_close
3335 txt = self.TextIOWrapper(buffer, encoding="ascii")
3336 txt.flush = bad_flush
3337 with self.assertRaises(OSError) as err: # exception not swallowed
3338 txt.close()
3339 self.assertEqual(err.exception.args, ('close',))
3340 self.assertIsInstance(err.exception.__context__, OSError)
3341 self.assertEqual(err.exception.__context__.args, ('flush',))
3342 self.assertFalse(txt.closed)
3343
Victor Stinner472f7942019-04-12 21:58:24 +02003344 # Silence destructor error
3345 buffer.close = lambda: None
3346 txt.flush = lambda: None
3347
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003348 def test_nonnormalized_close_error_on_close(self):
3349 # Issue #21677
3350 buffer = self.BytesIO(self.testdata)
3351 def bad_flush():
3352 raise non_existing_flush
3353 def bad_close():
3354 raise non_existing_close
3355 buffer.close = bad_close
3356 txt = self.TextIOWrapper(buffer, encoding="ascii")
3357 txt.flush = bad_flush
3358 with self.assertRaises(NameError) as err: # exception not swallowed
3359 txt.close()
3360 self.assertIn('non_existing_close', str(err.exception))
3361 self.assertIsInstance(err.exception.__context__, NameError)
3362 self.assertIn('non_existing_flush', str(err.exception.__context__))
3363 self.assertFalse(txt.closed)
3364
Victor Stinner472f7942019-04-12 21:58:24 +02003365 # Silence destructor error
3366 buffer.close = lambda: None
3367 txt.flush = lambda: None
3368
Antoine Pitrou6be88762010-05-03 16:48:20 +00003369 def test_multi_close(self):
3370 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3371 txt.close()
3372 txt.close()
3373 txt.close()
3374 self.assertRaises(ValueError, txt.flush)
3375
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003376 def test_unseekable(self):
Inada Naoki58cffba2021-04-01 11:25:04 +09003377 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003378 self.assertRaises(self.UnsupportedOperation, txt.tell)
3379 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3380
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003381 def test_readonly_attributes(self):
3382 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3383 buf = self.BytesIO(self.testdata)
3384 with self.assertRaises(AttributeError):
3385 txt.buffer = buf
3386
Antoine Pitroue96ec682011-07-23 21:46:35 +02003387 def test_rawio(self):
3388 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3389 # that subprocess.Popen() can have the required unbuffered
3390 # semantics with universal_newlines=True.
3391 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3392 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3393 # Reads
3394 self.assertEqual(txt.read(4), 'abcd')
3395 self.assertEqual(txt.readline(), 'efghi\n')
3396 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3397
3398 def test_rawio_write_through(self):
3399 # Issue #12591: with write_through=True, writes don't need a flush
3400 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3401 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3402 write_through=True)
3403 txt.write('1')
3404 txt.write('23\n4')
3405 txt.write('5')
3406 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3407
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003408 def test_bufio_write_through(self):
3409 # Issue #21396: write_through=True doesn't force a flush()
3410 # on the underlying binary buffered object.
3411 flush_called, write_called = [], []
3412 class BufferedWriter(self.BufferedWriter):
3413 def flush(self, *args, **kwargs):
3414 flush_called.append(True)
3415 return super().flush(*args, **kwargs)
3416 def write(self, *args, **kwargs):
3417 write_called.append(True)
3418 return super().write(*args, **kwargs)
3419
3420 rawio = self.BytesIO()
3421 data = b"a"
3422 bufio = BufferedWriter(rawio, len(data)*2)
3423 textio = self.TextIOWrapper(bufio, encoding='ascii',
3424 write_through=True)
3425 # write to the buffered io but don't overflow the buffer
3426 text = data.decode('ascii')
3427 textio.write(text)
3428
3429 # buffer.flush is not called with write_through=True
3430 self.assertFalse(flush_called)
3431 # buffer.write *is* called with write_through=True
3432 self.assertTrue(write_called)
3433 self.assertEqual(rawio.getvalue(), b"") # no flush
3434
3435 write_called = [] # reset
3436 textio.write(text * 10) # total content is larger than bufio buffer
3437 self.assertTrue(write_called)
3438 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3439
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003440 def test_reconfigure_write_through(self):
3441 raw = self.MockRawIO([])
3442 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3443 t.write('1')
3444 t.reconfigure(write_through=True) # implied flush
3445 self.assertEqual(t.write_through, True)
3446 self.assertEqual(b''.join(raw._write_stack), b'1')
3447 t.write('23')
3448 self.assertEqual(b''.join(raw._write_stack), b'123')
3449 t.reconfigure(write_through=False)
3450 self.assertEqual(t.write_through, False)
3451 t.write('45')
3452 t.flush()
3453 self.assertEqual(b''.join(raw._write_stack), b'12345')
3454 # Keeping default value
3455 t.reconfigure()
3456 t.reconfigure(write_through=None)
3457 self.assertEqual(t.write_through, False)
3458 t.reconfigure(write_through=True)
3459 t.reconfigure()
3460 t.reconfigure(write_through=None)
3461 self.assertEqual(t.write_through, True)
3462
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003463 def test_read_nonbytes(self):
3464 # Issue #17106
3465 # Crash when underlying read() returns non-bytes
Inada Naoki58cffba2021-04-01 11:25:04 +09003466 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003467 self.assertRaises(TypeError, t.read, 1)
Inada Naoki58cffba2021-04-01 11:25:04 +09003468 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003469 self.assertRaises(TypeError, t.readline)
Inada Naoki58cffba2021-04-01 11:25:04 +09003470 t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003471 self.assertRaises(TypeError, t.read)
3472
Oren Milmana5b4ea12017-08-25 21:14:54 +03003473 def test_illegal_encoder(self):
3474 # Issue 31271: Calling write() while the return value of encoder's
3475 # encode() is invalid shouldn't cause an assertion failure.
3476 rot13 = codecs.lookup("rot13")
3477 with support.swap_attr(rot13, '_is_text_encoding', True):
3478 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3479 self.assertRaises(TypeError, t.write, 'bar')
3480
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003481 def test_illegal_decoder(self):
3482 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003483 # Bypass the early encoding check added in issue 20404
3484 def _make_illegal_wrapper():
3485 quopri = codecs.lookup("quopri")
3486 quopri._is_text_encoding = True
3487 try:
3488 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3489 newline='\n', encoding="quopri")
3490 finally:
3491 quopri._is_text_encoding = False
3492 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003493 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003494 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003495 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003496 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003497 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003498 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003499 self.assertRaises(TypeError, t.read)
3500
Oren Milmanba7d7362017-08-29 11:58:27 +03003501 # Issue 31243: calling read() while the return value of decoder's
3502 # getstate() is invalid should neither crash the interpreter nor
3503 # raise a SystemError.
3504 def _make_very_illegal_wrapper(getstate_ret_val):
3505 class BadDecoder:
3506 def getstate(self):
3507 return getstate_ret_val
3508 def _get_bad_decoder(dummy):
3509 return BadDecoder()
3510 quopri = codecs.lookup("quopri")
3511 with support.swap_attr(quopri, 'incrementaldecoder',
3512 _get_bad_decoder):
3513 return _make_illegal_wrapper()
3514 t = _make_very_illegal_wrapper(42)
3515 self.assertRaises(TypeError, t.read, 42)
3516 t = _make_very_illegal_wrapper(())
3517 self.assertRaises(TypeError, t.read, 42)
3518 t = _make_very_illegal_wrapper((1, 2))
3519 self.assertRaises(TypeError, t.read, 42)
3520
Antoine Pitrou712cb732013-12-21 15:51:54 +01003521 def _check_create_at_shutdown(self, **kwargs):
3522 # Issue #20037: creating a TextIOWrapper at shutdown
3523 # shouldn't crash the interpreter.
3524 iomod = self.io.__name__
3525 code = """if 1:
3526 import codecs
3527 import {iomod} as io
3528
3529 # Avoid looking up codecs at shutdown
3530 codecs.lookup('utf-8')
3531
3532 class C:
3533 def __init__(self):
3534 self.buf = io.BytesIO()
3535 def __del__(self):
3536 io.TextIOWrapper(self.buf, **{kwargs})
3537 print("ok")
3538 c = C()
3539 """.format(iomod=iomod, kwargs=kwargs)
3540 return assert_python_ok("-c", code)
3541
3542 def test_create_at_shutdown_without_encoding(self):
3543 rc, out, err = self._check_create_at_shutdown()
3544 if err:
3545 # Can error out with a RuntimeError if the module state
3546 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003547 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003548 else:
3549 self.assertEqual("ok", out.decode().strip())
3550
3551 def test_create_at_shutdown_with_encoding(self):
3552 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3553 errors='strict')
3554 self.assertFalse(err)
3555 self.assertEqual("ok", out.decode().strip())
3556
Antoine Pitroub8503892014-04-29 10:14:02 +02003557 def test_read_byteslike(self):
3558 r = MemviewBytesIO(b'Just some random string\n')
3559 t = self.TextIOWrapper(r, 'utf-8')
3560
3561 # TextIOwrapper will not read the full string, because
3562 # we truncate it to a multiple of the native int size
3563 # so that we can construct a more complex memoryview.
3564 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3565
3566 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3567
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003568 def test_issue22849(self):
3569 class F(object):
3570 def readable(self): return True
3571 def writable(self): return True
3572 def seekable(self): return True
3573
3574 for i in range(10):
3575 try:
3576 self.TextIOWrapper(F(), encoding='utf-8')
3577 except Exception:
3578 pass
3579
3580 F.tell = lambda x: 0
3581 t = self.TextIOWrapper(F(), encoding='utf-8')
3582
INADA Naoki507434f2017-12-21 09:59:53 +09003583 def test_reconfigure_encoding_read(self):
3584 # latin1 -> utf8
3585 # (latin1 can decode utf-8 encoded string)
3586 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3587 raw = self.BytesIO(data)
3588 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3589 self.assertEqual(txt.readline(), 'abc\xe9\n')
3590 with self.assertRaises(self.UnsupportedOperation):
3591 txt.reconfigure(encoding='utf-8')
3592 with self.assertRaises(self.UnsupportedOperation):
3593 txt.reconfigure(newline=None)
3594
3595 def test_reconfigure_write_fromascii(self):
3596 # ascii has a specific encodefunc in the C implementation,
3597 # but utf-8-sig has not. Make sure that we get rid of the
3598 # cached encodefunc when we switch encoders.
3599 raw = self.BytesIO()
3600 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3601 txt.write('foo\n')
3602 txt.reconfigure(encoding='utf-8-sig')
3603 txt.write('\xe9\n')
3604 txt.flush()
3605 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3606
3607 def test_reconfigure_write(self):
3608 # latin -> utf8
3609 raw = self.BytesIO()
3610 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3611 txt.write('abc\xe9\n')
3612 txt.reconfigure(encoding='utf-8')
3613 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3614 txt.write('d\xe9f\n')
3615 txt.flush()
3616 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3617
3618 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3619 # the file
3620 raw = self.BytesIO()
3621 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3622 txt.write('abc\n')
3623 txt.reconfigure(encoding='utf-8-sig')
3624 txt.write('d\xe9f\n')
3625 txt.flush()
3626 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3627
3628 def test_reconfigure_write_non_seekable(self):
3629 raw = self.BytesIO()
3630 raw.seekable = lambda: False
3631 raw.seek = None
3632 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3633 txt.write('abc\n')
3634 txt.reconfigure(encoding='utf-8-sig')
3635 txt.write('d\xe9f\n')
3636 txt.flush()
3637
3638 # If the raw stream is not seekable, there'll be a BOM
3639 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3640
3641 def test_reconfigure_defaults(self):
3642 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3643 txt.reconfigure(encoding=None)
3644 self.assertEqual(txt.encoding, 'ascii')
3645 self.assertEqual(txt.errors, 'replace')
3646 txt.write('LF\n')
3647
3648 txt.reconfigure(newline='\r\n')
3649 self.assertEqual(txt.encoding, 'ascii')
3650 self.assertEqual(txt.errors, 'replace')
3651
3652 txt.reconfigure(errors='ignore')
3653 self.assertEqual(txt.encoding, 'ascii')
3654 self.assertEqual(txt.errors, 'ignore')
3655 txt.write('CRLF\n')
3656
3657 txt.reconfigure(encoding='utf-8', newline=None)
3658 self.assertEqual(txt.errors, 'strict')
3659 txt.seek(0)
3660 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3661
3662 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3663
3664 def test_reconfigure_newline(self):
3665 raw = self.BytesIO(b'CR\rEOF')
3666 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3667 txt.reconfigure(newline=None)
3668 self.assertEqual(txt.readline(), 'CR\n')
3669 raw = self.BytesIO(b'CR\rEOF')
3670 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3671 txt.reconfigure(newline='')
3672 self.assertEqual(txt.readline(), 'CR\r')
3673 raw = self.BytesIO(b'CR\rLF\nEOF')
3674 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3675 txt.reconfigure(newline='\n')
3676 self.assertEqual(txt.readline(), 'CR\rLF\n')
3677 raw = self.BytesIO(b'LF\nCR\rEOF')
3678 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3679 txt.reconfigure(newline='\r')
3680 self.assertEqual(txt.readline(), 'LF\nCR\r')
3681 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3682 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3683 txt.reconfigure(newline='\r\n')
3684 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3685
3686 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3687 txt.reconfigure(newline=None)
3688 txt.write('linesep\n')
3689 txt.reconfigure(newline='')
3690 txt.write('LF\n')
3691 txt.reconfigure(newline='\n')
3692 txt.write('LF\n')
3693 txt.reconfigure(newline='\r')
3694 txt.write('CR\n')
3695 txt.reconfigure(newline='\r\n')
3696 txt.write('CRLF\n')
3697 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3698 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3699
Zackery Spytz23db9352018-06-29 04:14:58 -06003700 def test_issue25862(self):
3701 # Assertion failures occurred in tell() after read() and write().
3702 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3703 t.read(1)
3704 t.read()
3705 t.tell()
3706 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3707 t.read(1)
3708 t.write('x')
3709 t.tell()
3710
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003711
Antoine Pitroub8503892014-04-29 10:14:02 +02003712class MemviewBytesIO(io.BytesIO):
3713 '''A BytesIO object whose read method returns memoryviews
3714 rather than bytes'''
3715
3716 def read1(self, len_):
3717 return _to_memoryview(super().read1(len_))
3718
3719 def read(self, len_):
3720 return _to_memoryview(super().read(len_))
3721
3722def _to_memoryview(buf):
3723 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3724
3725 arr = array.array('i')
3726 idx = len(buf) - len(buf) % arr.itemsize
3727 arr.frombytes(buf[:idx])
3728 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003729
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003731class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003732 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003733 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003734
3735 def test_initialization(self):
3736 r = self.BytesIO(b"\xc3\xa9\n\n")
3737 b = self.BufferedReader(r, 1000)
Inada Naoki58cffba2021-04-01 11:25:04 +09003738 t = self.TextIOWrapper(b, encoding="utf-8")
Inada Naokifb786922021-04-06 11:18:41 +09003739 self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003740 self.assertRaises(ValueError, t.read)
3741
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003742 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3743 self.assertRaises(Exception, repr, t)
3744
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003745 def test_garbage_collection(self):
3746 # C TextIOWrapper objects are collected, and collecting them flushes
3747 # all data to disk.
3748 # The Python version has __del__, so it ends in gc.garbage instead.
Hai Shi883bc632020-07-06 17:12:49 +08003749 with warnings_helper.check_warnings(('', ResourceWarning)):
3750 rawio = io.FileIO(os_helper.TESTFN, "wb")
Antoine Pitrou796564c2013-07-30 19:59:21 +02003751 b = self.BufferedWriter(rawio)
3752 t = self.TextIOWrapper(b, encoding="ascii")
3753 t.write("456def")
3754 t.x = t
3755 wr = weakref.ref(t)
3756 del t
3757 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003758 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08003759 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003760 self.assertEqual(f.read(), b"456def")
3761
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003762 def test_rwpair_cleared_before_textio(self):
3763 # Issue 13070: TextIOWrapper's finalization would crash when called
3764 # after the reference to the underlying BufferedRWPair's writer got
3765 # cleared by the GC.
3766 for i in range(1000):
3767 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3768 t1 = self.TextIOWrapper(b1, encoding="ascii")
3769 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3770 t2 = self.TextIOWrapper(b2, encoding="ascii")
3771 # circular references
3772 t1.buddy = t2
3773 t2.buddy = t1
3774 support.gc_collect()
3775
Zackery Spytz842acaa2018-12-17 07:52:45 -07003776 def test_del__CHUNK_SIZE_SystemError(self):
3777 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3778 with self.assertRaises(AttributeError):
3779 del t._CHUNK_SIZE
3780
Inada Naoki01806d52021-02-22 08:29:30 +09003781 def test_internal_buffer_size(self):
3782 # bpo-43260: TextIOWrapper's internal buffer should not store
3783 # data larger than chunk size.
3784 chunk_size = 8192 # default chunk size, updated later
3785
3786 class MockIO(self.MockRawIO):
3787 def write(self, data):
3788 if len(data) > chunk_size:
3789 raise RuntimeError
3790 return super().write(data)
3791
3792 buf = MockIO()
3793 t = self.TextIOWrapper(buf, encoding="ascii")
3794 chunk_size = t._CHUNK_SIZE
3795 t.write("abc")
3796 t.write("def")
3797 # default chunk size is 8192 bytes so t don't write data to buf.
3798 self.assertEqual([], buf._write_stack)
3799
3800 with self.assertRaises(RuntimeError):
3801 t.write("x"*(chunk_size+1))
3802
3803 self.assertEqual([b"abcdef"], buf._write_stack)
3804 t.write("ghi")
3805 t.write("x"*chunk_size)
3806 self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
3807
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003808
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003809class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003810 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003811 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003812
3813
3814class IncrementalNewlineDecoderTest(unittest.TestCase):
3815
3816 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003817 # UTF-8 specific tests for a newline decoder
3818 def _check_decode(b, s, **kwargs):
3819 # We exercise getstate() / setstate() as well as decode()
3820 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003821 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003822 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003823 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003824
Antoine Pitrou180a3362008-12-14 16:36:46 +00003825 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003826
Antoine Pitrou180a3362008-12-14 16:36:46 +00003827 _check_decode(b'\xe8', "")
3828 _check_decode(b'\xa2', "")
3829 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003830
Antoine Pitrou180a3362008-12-14 16:36:46 +00003831 _check_decode(b'\xe8', "")
3832 _check_decode(b'\xa2', "")
3833 _check_decode(b'\x88', "\u8888")
3834
3835 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003836 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3837
Antoine Pitrou180a3362008-12-14 16:36:46 +00003838 decoder.reset()
3839 _check_decode(b'\n', "\n")
3840 _check_decode(b'\r', "")
3841 _check_decode(b'', "\n", final=True)
3842 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003843
Antoine Pitrou180a3362008-12-14 16:36:46 +00003844 _check_decode(b'\r', "")
3845 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003846
Antoine Pitrou180a3362008-12-14 16:36:46 +00003847 _check_decode(b'\r\r\n', "\n\n")
3848 _check_decode(b'\r', "")
3849 _check_decode(b'\r', "\n")
3850 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003851
Antoine Pitrou180a3362008-12-14 16:36:46 +00003852 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3853 _check_decode(b'\xe8\xa2\x88', "\u8888")
3854 _check_decode(b'\n', "\n")
3855 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3856 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003857
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003858 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003859 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003860 if encoding is not None:
3861 encoder = codecs.getincrementalencoder(encoding)()
3862 def _decode_bytewise(s):
3863 # Decode one byte at a time
3864 for b in encoder.encode(s):
3865 result.append(decoder.decode(bytes([b])))
3866 else:
3867 encoder = None
3868 def _decode_bytewise(s):
3869 # Decode one char at a time
3870 for c in s:
3871 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003872 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003873 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003874 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003875 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003876 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003877 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003878 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003879 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003880 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003881 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003882 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003883 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003884 input = "abc"
3885 if encoder is not None:
3886 encoder.reset()
3887 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003888 self.assertEqual(decoder.decode(input), "abc")
3889 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003890
3891 def test_newline_decoder(self):
3892 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003893 # None meaning the IncrementalNewlineDecoder takes unicode input
3894 # rather than bytes input
3895 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003896 'utf-16', 'utf-16-le', 'utf-16-be',
3897 'utf-32', 'utf-32-le', 'utf-32-be',
3898 )
3899 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003900 decoder = enc and codecs.getincrementaldecoder(enc)()
3901 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3902 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003903 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003904 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3905 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003906 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003907
Antoine Pitrou66913e22009-03-06 23:40:56 +00003908 def test_newline_bytes(self):
3909 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3910 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003911 self.assertEqual(dec.newlines, None)
3912 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3913 self.assertEqual(dec.newlines, None)
3914 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3915 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003916 dec = self.IncrementalNewlineDecoder(None, translate=False)
3917 _check(dec)
3918 dec = self.IncrementalNewlineDecoder(None, translate=True)
3919 _check(dec)
3920
Xiang Zhangb08746b2018-10-31 19:49:16 +08003921 def test_translate(self):
3922 # issue 35062
3923 for translate in (-2, -1, 1, 2):
3924 decoder = codecs.getincrementaldecoder("utf-8")()
3925 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3926 self.check_newline_decoding_utf8(decoder)
3927 decoder = codecs.getincrementaldecoder("utf-8")()
3928 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3929 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3930
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003931class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3932 pass
3933
3934class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3935 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003936
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003937
Guido van Rossum01a27522007-03-07 01:00:12 +00003938# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003939
Guido van Rossum5abbf752007-08-27 17:39:33 +00003940class MiscIOTest(unittest.TestCase):
3941
Barry Warsaw40e82462008-11-20 20:14:50 +00003942 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08003943 os_helper.unlink(os_helper.TESTFN)
Barry Warsaw40e82462008-11-20 20:14:50 +00003944
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003945 def test___all__(self):
3946 for name in self.io.__all__:
3947 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003948 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003949 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003950 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003951 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003952 self.assertTrue(issubclass(obj, Exception), name)
3953 elif not name.startswith("SEEK_"):
3954 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003955
Barry Warsaw40e82462008-11-20 20:14:50 +00003956 def test_attributes(self):
Hai Shi883bc632020-07-06 17:12:49 +08003957 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003958 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003959 f.close()
3960
Hai Shi883bc632020-07-06 17:12:49 +08003961 with warnings_helper.check_warnings(('', DeprecationWarning)):
Inada Naoki58cffba2021-04-01 11:25:04 +09003962 f = self.open(os_helper.TESTFN, "U", encoding="utf-8")
Hai Shi883bc632020-07-06 17:12:49 +08003963 self.assertEqual(f.name, os_helper.TESTFN)
3964 self.assertEqual(f.buffer.name, os_helper.TESTFN)
3965 self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
Victor Stinner942f7a22020-03-04 18:50:22 +01003966 self.assertEqual(f.mode, "U")
3967 self.assertEqual(f.buffer.mode, "rb")
3968 self.assertEqual(f.buffer.raw.mode, "rb")
3969 f.close()
3970
Inada Naoki58cffba2021-04-01 11:25:04 +09003971 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003972 self.assertEqual(f.mode, "w+")
3973 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3974 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003976 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003977 self.assertEqual(g.mode, "wb")
3978 self.assertEqual(g.raw.mode, "wb")
3979 self.assertEqual(g.name, f.fileno())
3980 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003981 f.close()
3982 g.close()
3983
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003984 def test_open_pipe_with_append(self):
3985 # bpo-27805: Ignore ESPIPE from lseek() in open().
3986 r, w = os.pipe()
3987 self.addCleanup(os.close, r)
Inada Naoki58cffba2021-04-01 11:25:04 +09003988 f = self.open(w, 'a', encoding="utf-8")
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003989 self.addCleanup(f.close)
3990 # Check that the file is marked non-seekable. On Windows, however, lseek
3991 # somehow succeeds on pipes.
3992 if sys.platform != 'win32':
3993 self.assertFalse(f.seekable())
3994
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003995 def test_io_after_close(self):
3996 for kwargs in [
3997 {"mode": "w"},
3998 {"mode": "wb"},
3999 {"mode": "w", "buffering": 1},
4000 {"mode": "w", "buffering": 2},
4001 {"mode": "wb", "buffering": 0},
4002 {"mode": "r"},
4003 {"mode": "rb"},
4004 {"mode": "r", "buffering": 1},
4005 {"mode": "r", "buffering": 2},
4006 {"mode": "rb", "buffering": 0},
4007 {"mode": "w+"},
4008 {"mode": "w+b"},
4009 {"mode": "w+", "buffering": 1},
4010 {"mode": "w+", "buffering": 2},
4011 {"mode": "w+b", "buffering": 0},
4012 ]:
Inada Naoki58cffba2021-04-01 11:25:04 +09004013 if "b" not in kwargs["mode"]:
4014 kwargs["encoding"] = "utf-8"
Hai Shi883bc632020-07-06 17:12:49 +08004015 f = self.open(os_helper.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004016 f.close()
4017 self.assertRaises(ValueError, f.flush)
4018 self.assertRaises(ValueError, f.fileno)
4019 self.assertRaises(ValueError, f.isatty)
4020 self.assertRaises(ValueError, f.__iter__)
4021 if hasattr(f, "peek"):
4022 self.assertRaises(ValueError, f.peek, 1)
4023 self.assertRaises(ValueError, f.read)
4024 if hasattr(f, "read1"):
4025 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00004026 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02004027 if hasattr(f, "readall"):
4028 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004029 if hasattr(f, "readinto"):
4030 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07004031 if hasattr(f, "readinto1"):
4032 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004033 self.assertRaises(ValueError, f.readline)
4034 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08004035 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004036 self.assertRaises(ValueError, f.seek, 0)
4037 self.assertRaises(ValueError, f.tell)
4038 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004039 self.assertRaises(ValueError, f.write,
4040 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004041 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004042 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004043
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004044 def test_blockingioerror(self):
4045 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004046 class C(str):
4047 pass
4048 c = C("")
4049 b = self.BlockingIOError(1, c)
4050 c.b = b
4051 b.c = c
4052 wr = weakref.ref(c)
4053 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00004054 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004055 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004056
4057 def test_abcs(self):
4058 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00004059 self.assertIsInstance(self.IOBase, abc.ABCMeta)
4060 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4061 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4062 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004063
4064 def _check_abc_inheritance(self, abcmodule):
Hai Shi883bc632020-07-06 17:12:49 +08004065 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004066 self.assertIsInstance(f, abcmodule.IOBase)
4067 self.assertIsInstance(f, abcmodule.RawIOBase)
4068 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4069 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004070 with self.open(os_helper.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004071 self.assertIsInstance(f, abcmodule.IOBase)
4072 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4073 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4074 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Inada Naoki58cffba2021-04-01 11:25:04 +09004075 with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004076 self.assertIsInstance(f, abcmodule.IOBase)
4077 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4078 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4079 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004080
4081 def test_abc_inheritance(self):
4082 # Test implementations inherit from their respective ABCs
4083 self._check_abc_inheritance(self)
4084
4085 def test_abc_inheritance_official(self):
4086 # Test implementations inherit from the official ABCs of the
4087 # baseline "io" module.
4088 self._check_abc_inheritance(io)
4089
Antoine Pitroue033e062010-10-29 10:38:18 +00004090 def _check_warn_on_dealloc(self, *args, **kwargs):
4091 f = open(*args, **kwargs)
4092 r = repr(f)
4093 with self.assertWarns(ResourceWarning) as cm:
4094 f = None
4095 support.gc_collect()
4096 self.assertIn(r, str(cm.warning.args[0]))
4097
4098 def test_warn_on_dealloc(self):
Hai Shi883bc632020-07-06 17:12:49 +08004099 self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4100 self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
Inada Naoki58cffba2021-04-01 11:25:04 +09004101 self._check_warn_on_dealloc(os_helper.TESTFN, "w", encoding="utf-8")
Antoine Pitroue033e062010-10-29 10:38:18 +00004102
4103 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4104 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004105 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004106 for fd in fds:
4107 try:
4108 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004109 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004110 if e.errno != errno.EBADF:
4111 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004112 self.addCleanup(cleanup_fds)
4113 r, w = os.pipe()
4114 fds += r, w
4115 self._check_warn_on_dealloc(r, *args, **kwargs)
4116 # When using closefd=False, there's no warning
4117 r, w = os.pipe()
4118 fds += r, w
Hai Shi883bc632020-07-06 17:12:49 +08004119 with warnings_helper.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004120 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004121
4122 def test_warn_on_dealloc_fd(self):
4123 self._check_warn_on_dealloc_fd("rb", buffering=0)
4124 self._check_warn_on_dealloc_fd("rb")
Inada Naoki58cffba2021-04-01 11:25:04 +09004125 self._check_warn_on_dealloc_fd("r", encoding="utf-8")
Antoine Pitroue033e062010-10-29 10:38:18 +00004126
4127
Antoine Pitrou243757e2010-11-05 21:15:39 +00004128 def test_pickling(self):
4129 # Pickling file objects is forbidden
4130 for kwargs in [
4131 {"mode": "w"},
4132 {"mode": "wb"},
4133 {"mode": "wb", "buffering": 0},
4134 {"mode": "r"},
4135 {"mode": "rb"},
4136 {"mode": "rb", "buffering": 0},
4137 {"mode": "w+"},
4138 {"mode": "w+b"},
4139 {"mode": "w+b", "buffering": 0},
4140 ]:
Inada Naoki58cffba2021-04-01 11:25:04 +09004141 if "b" not in kwargs["mode"]:
4142 kwargs["encoding"] = "utf-8"
Antoine Pitrou243757e2010-11-05 21:15:39 +00004143 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
Hai Shi883bc632020-07-06 17:12:49 +08004144 with self.open(os_helper.TESTFN, **kwargs) as f:
Antoine Pitrou243757e2010-11-05 21:15:39 +00004145 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4146
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004147 def test_nonblock_pipe_write_bigbuf(self):
4148 self._test_nonblock_pipe_write(16*1024)
4149
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004150 def test_nonblock_pipe_write_smallbuf(self):
4151 self._test_nonblock_pipe_write(1024)
4152
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004153 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4154 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004155 def _test_nonblock_pipe_write(self, bufsize):
4156 sent = []
4157 received = []
4158 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004159 os.set_blocking(r, False)
4160 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004161
4162 # To exercise all code paths in the C implementation we need
4163 # to play with buffer sizes. For instance, if we choose a
4164 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4165 # then we will never get a partial write of the buffer.
4166 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4167 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4168
4169 with rf, wf:
4170 for N in 9999, 73, 7574:
4171 try:
4172 i = 0
4173 while True:
4174 msg = bytes([i % 26 + 97]) * N
4175 sent.append(msg)
4176 wf.write(msg)
4177 i += 1
4178
4179 except self.BlockingIOError as e:
4180 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004181 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004182 sent[-1] = sent[-1][:e.characters_written]
4183 received.append(rf.read())
4184 msg = b'BLOCKED'
4185 wf.write(msg)
4186 sent.append(msg)
4187
4188 while True:
4189 try:
4190 wf.flush()
4191 break
4192 except self.BlockingIOError as e:
4193 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004194 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004195 self.assertEqual(e.characters_written, 0)
4196 received.append(rf.read())
4197
4198 received += iter(rf.read, None)
4199
4200 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004201 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004202 self.assertTrue(wf.closed)
4203 self.assertTrue(rf.closed)
4204
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004205 def test_create_fail(self):
4206 # 'x' mode fails if file is existing
Inada Naoki58cffba2021-04-01 11:25:04 +09004207 with self.open(os_helper.TESTFN, 'w', encoding="utf-8"):
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004208 pass
Inada Naoki58cffba2021-04-01 11:25:04 +09004209 self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x', encoding="utf-8")
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004210
4211 def test_create_writes(self):
4212 # 'x' mode opens for writing
Hai Shi883bc632020-07-06 17:12:49 +08004213 with self.open(os_helper.TESTFN, 'xb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004214 f.write(b"spam")
Hai Shi883bc632020-07-06 17:12:49 +08004215 with self.open(os_helper.TESTFN, 'rb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004216 self.assertEqual(b"spam", f.read())
4217
Christian Heimes7b648752012-09-10 14:48:43 +02004218 def test_open_allargs(self):
4219 # there used to be a buffer overflow in the parser for rawmode
Inada Naoki58cffba2021-04-01 11:25:04 +09004220 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+', encoding="utf-8")
Christian Heimes7b648752012-09-10 14:48:43 +02004221
Victor Stinner22eb6892019-06-26 00:51:05 +02004222 def test_check_encoding_errors(self):
4223 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4224 # arguments in dev mode
4225 mod = self.io.__name__
4226 filename = __file__
4227 invalid = 'Boom, Shaka Laka, Boom!'
4228 code = textwrap.dedent(f'''
4229 import sys
4230 from {mod} import open, TextIOWrapper
4231
4232 try:
4233 open({filename!r}, encoding={invalid!r})
4234 except LookupError:
4235 pass
4236 else:
4237 sys.exit(21)
4238
4239 try:
4240 open({filename!r}, errors={invalid!r})
4241 except LookupError:
4242 pass
4243 else:
4244 sys.exit(22)
4245
4246 fp = open({filename!r}, "rb")
4247 with fp:
4248 try:
4249 TextIOWrapper(fp, encoding={invalid!r})
4250 except LookupError:
4251 pass
4252 else:
4253 sys.exit(23)
4254
4255 try:
4256 TextIOWrapper(fp, errors={invalid!r})
4257 except LookupError:
4258 pass
4259 else:
4260 sys.exit(24)
4261
4262 sys.exit(10)
4263 ''')
4264 proc = assert_python_failure('-X', 'dev', '-c', code)
4265 self.assertEqual(proc.rc, 10, proc)
4266
Inada Naoki48274832021-03-29 12:28:14 +09004267 def test_check_encoding_warning(self):
4268 # PEP 597: Raise warning when encoding is not specified
4269 # and sys.flags.warn_default_encoding is set.
4270 mod = self.io.__name__
4271 filename = __file__
4272 code = textwrap.dedent(f'''\
4273 import sys
4274 from {mod} import open, TextIOWrapper
4275 import pathlib
4276
4277 with open({filename!r}) as f: # line 5
4278 pass
4279
4280 pathlib.Path({filename!r}).read_text() # line 8
4281 ''')
4282 proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
4283 warnings = proc.err.splitlines()
4284 self.assertEqual(len(warnings), 2)
4285 self.assertTrue(
4286 warnings[0].startswith(b"<string>:5: EncodingWarning: "))
4287 self.assertTrue(
4288 warnings[1].startswith(b"<string>:8: EncodingWarning: "))
4289
Victor Stinner3bc694d2021-04-14 03:24:33 +02004290 @support.cpython_only
4291 # Depending if OpenWrapper was already created or not, the warning is
4292 # emitted or not. For example, the attribute is already created when this
4293 # test is run multiple times.
4294 @warnings_helper.ignore_warnings(category=DeprecationWarning)
4295 def test_openwrapper(self):
4296 self.assertIs(self.io.OpenWrapper, self.io.open)
4297
Christian Heimes7b648752012-09-10 14:48:43 +02004298
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004299class CMiscIOTest(MiscIOTest):
4300 io = io
4301
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004302 def test_readinto_buffer_overflow(self):
4303 # Issue #18025
4304 class BadReader(self.io.BufferedIOBase):
4305 def read(self, n=-1):
4306 return b'x' * 10**6
4307 bufio = BadReader()
4308 b = bytearray(2)
4309 self.assertRaises(ValueError, bufio.readinto, b)
4310
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004311 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4312 # Issue #23309: deadlocks at shutdown should be avoided when a
4313 # daemon thread and the main thread both write to a file.
4314 code = """if 1:
4315 import sys
4316 import time
4317 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004318 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004319
4320 file = sys.{stream_name}
4321
4322 def run():
4323 while True:
4324 file.write('.')
4325 file.flush()
4326
Victor Stinner2a1aed02017-04-21 17:59:23 +02004327 crash = SuppressCrashReport()
4328 crash.__enter__()
4329 # don't call __exit__(): the crash occurs at Python shutdown
4330
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004331 thread = threading.Thread(target=run)
4332 thread.daemon = True
4333 thread.start()
4334
4335 time.sleep(0.5)
4336 file.write('!')
4337 file.flush()
4338 """.format_map(locals())
4339 res, _ = run_python_until_end("-c", code)
4340 err = res.err.decode()
4341 if res.rc != 0:
4342 # Failure: should be a fatal error
Victor Stinner9e5d30c2020-03-07 00:54:20 +01004343 pattern = (r"Fatal Python error: _enter_buffered_busy: "
4344 r"could not acquire lock "
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004345 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4346 r"at interpreter shutdown, possibly due to "
4347 r"daemon threads".format_map(locals()))
4348 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004349 else:
4350 self.assertFalse(err.strip('.!'))
4351
4352 def test_daemon_threads_shutdown_stdout_deadlock(self):
4353 self.check_daemon_threads_shutdown_deadlock('stdout')
4354
4355 def test_daemon_threads_shutdown_stderr_deadlock(self):
4356 self.check_daemon_threads_shutdown_deadlock('stderr')
4357
4358
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004359class PyMiscIOTest(MiscIOTest):
4360 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004361
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004362
4363@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4364class SignalsTest(unittest.TestCase):
4365
4366 def setUp(self):
4367 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4368
4369 def tearDown(self):
4370 signal.signal(signal.SIGALRM, self.oldalrm)
4371
4372 def alarm_interrupt(self, sig, frame):
4373 1/0
4374
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004375 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4376 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004377 invokes the signal handler, and bubbles up the exception raised
4378 in the latter."""
Serhiy Storchaka462c1f02021-09-08 18:08:57 +03004379
4380 # XXX This test has three flaws that appear when objects are
4381 # XXX not reference counted.
4382
4383 # - if wio.write() happens to trigger a garbage collection,
4384 # the signal exception may be raised when some __del__
4385 # method is running; it will not reach the assertRaises()
4386 # call.
4387
4388 # - more subtle, if the wio object is not destroyed at once
4389 # and survives this function, the next opened file is likely
4390 # to have the same fileno (since the file descriptor was
4391 # actively closed). When wio.__del__ is finally called, it
4392 # will close the other's test file... To trigger this with
4393 # CPython, try adding "global wio" in this function.
4394
4395 # - This happens only for streams created by the _pyio module,
4396 # because a wio.close() that fails still consider that the
4397 # file needs to be closed again. You can try adding an
4398 # "assert wio.closed" at the end of the function.
4399
4400 # Fortunately, a little gc.collect() seems to be enough to
4401 # work around all these issues.
4402 support.gc_collect() # For PyPy or other GCs.
4403
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004404 read_results = []
4405 def _read():
4406 s = os.read(r, 1)
4407 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004408
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004409 t = threading.Thread(target=_read)
4410 t.daemon = True
4411 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004412 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004413 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004414 try:
4415 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004416 if hasattr(signal, 'pthread_sigmask'):
4417 # create the thread with SIGALRM signal blocked
4418 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4419 t.start()
4420 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4421 else:
4422 t.start()
4423
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004424 # Fill the pipe enough that the write will be blocking.
4425 # It will be interrupted by the timer armed above. Since the
4426 # other thread has read one byte, the low-level write will
4427 # return with a successful (partial) result rather than an EINTR.
4428 # The buffered IO layer must check for pending signal
4429 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004430 signal.alarm(1)
4431 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004432 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004433 finally:
4434 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004435 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004436 # We got one byte, get another one and check that it isn't a
4437 # repeat of the first one.
4438 read_results.append(os.read(r, 1))
4439 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4440 finally:
4441 os.close(w)
4442 os.close(r)
4443 # This is deliberate. If we didn't close the file descriptor
4444 # before closing wio, wio would try to flush its internal
4445 # buffer, and block again.
4446 try:
4447 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004448 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004449 if e.errno != errno.EBADF:
4450 raise
4451
4452 def test_interrupted_write_unbuffered(self):
4453 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4454
4455 def test_interrupted_write_buffered(self):
4456 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4457
4458 def test_interrupted_write_text(self):
4459 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4460
Brett Cannon31f59292011-02-21 19:29:56 +00004461 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004462 def check_reentrant_write(self, data, **fdopen_kwargs):
4463 def on_alarm(*args):
4464 # Will be called reentrantly from the same thread
4465 wio.write(data)
4466 1/0
4467 signal.signal(signal.SIGALRM, on_alarm)
4468 r, w = os.pipe()
4469 wio = self.io.open(w, **fdopen_kwargs)
4470 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004471 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004472 # Either the reentrant call to wio.write() fails with RuntimeError,
4473 # or the signal handler raises ZeroDivisionError.
4474 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4475 while 1:
4476 for i in range(100):
4477 wio.write(data)
4478 wio.flush()
4479 # Make sure the buffer doesn't fill up and block further writes
4480 os.read(r, len(data) * 100)
4481 exc = cm.exception
4482 if isinstance(exc, RuntimeError):
4483 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4484 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004485 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004486 wio.close()
4487 os.close(r)
4488
4489 def test_reentrant_write_buffered(self):
4490 self.check_reentrant_write(b"xy", mode="wb")
4491
4492 def test_reentrant_write_text(self):
4493 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4494
Antoine Pitrou707ce822011-02-25 21:24:11 +00004495 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4496 """Check that a buffered read, when it gets interrupted (either
4497 returning a partial result or EINTR), properly invokes the signal
4498 handler and retries if the latter returned successfully."""
4499 r, w = os.pipe()
4500 fdopen_kwargs["closefd"] = False
4501 def alarm_handler(sig, frame):
4502 os.write(w, b"bar")
4503 signal.signal(signal.SIGALRM, alarm_handler)
4504 try:
4505 rio = self.io.open(r, **fdopen_kwargs)
4506 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004507 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004508 # Expected behaviour:
4509 # - first raw read() returns partial b"foo"
4510 # - second raw read() returns EINTR
4511 # - third raw read() returns b"bar"
4512 self.assertEqual(decode(rio.read(6)), "foobar")
4513 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004514 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004515 rio.close()
4516 os.close(w)
4517 os.close(r)
4518
Antoine Pitrou20db5112011-08-19 20:32:34 +02004519 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004520 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4521 mode="rb")
4522
Antoine Pitrou20db5112011-08-19 20:32:34 +02004523 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004524 self.check_interrupted_read_retry(lambda x: x,
Inada Naoki58cffba2021-04-01 11:25:04 +09004525 mode="r", encoding="latin1")
Antoine Pitrou707ce822011-02-25 21:24:11 +00004526
Antoine Pitrou707ce822011-02-25 21:24:11 +00004527 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4528 """Check that a buffered write, when it gets interrupted (either
4529 returning a partial result or EINTR), properly invokes the signal
4530 handler and retries if the latter returned successfully."""
Hai Shi883bc632020-07-06 17:12:49 +08004531 select = import_helper.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004532
Antoine Pitrou707ce822011-02-25 21:24:11 +00004533 # A quantity that exceeds the buffer size of an anonymous pipe's
4534 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004535 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004536 r, w = os.pipe()
4537 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004538
Antoine Pitrou707ce822011-02-25 21:24:11 +00004539 # We need a separate thread to read from the pipe and allow the
4540 # write() to finish. This thread is started after the SIGALRM is
4541 # received (forcing a first EINTR in write()).
4542 read_results = []
4543 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004544 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004545 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004546 try:
4547 while not write_finished:
4548 while r in select.select([r], [], [], 1.0)[0]:
4549 s = os.read(r, 1024)
4550 read_results.append(s)
4551 except BaseException as exc:
4552 nonlocal error
4553 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004554 t = threading.Thread(target=_read)
4555 t.daemon = True
4556 def alarm1(sig, frame):
4557 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004558 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004559 def alarm2(sig, frame):
4560 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004561
4562 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004563 signal.signal(signal.SIGALRM, alarm1)
4564 try:
4565 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004566 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004567 # Expected behaviour:
4568 # - first raw write() is partial (because of the limited pipe buffer
4569 # and the first alarm)
4570 # - second raw write() returns EINTR (because of the second alarm)
4571 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004572 written = wio.write(large_data)
4573 self.assertEqual(N, written)
4574
Antoine Pitrou707ce822011-02-25 21:24:11 +00004575 wio.flush()
4576 write_finished = True
4577 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004578
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004579 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004580 self.assertEqual(N, sum(len(x) for x in read_results))
4581 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004582 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004583 write_finished = True
4584 os.close(w)
4585 os.close(r)
4586 # This is deliberate. If we didn't close the file descriptor
4587 # before closing wio, wio would try to flush its internal
4588 # buffer, and could block (in case of failure).
4589 try:
4590 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004591 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004592 if e.errno != errno.EBADF:
4593 raise
4594
Antoine Pitrou20db5112011-08-19 20:32:34 +02004595 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004596 self.check_interrupted_write_retry(b"x", mode="wb")
4597
Antoine Pitrou20db5112011-08-19 20:32:34 +02004598 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004599 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4600
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004601
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004602class CSignalsTest(SignalsTest):
4603 io = io
4604
4605class PySignalsTest(SignalsTest):
4606 io = pyio
4607
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004608 # Handling reentrancy issues would slow down _pyio even more, so the
4609 # tests are disabled.
4610 test_reentrant_write_buffered = None
4611 test_reentrant_write_text = None
4612
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004613
Ezio Melottidaa42c72013-03-23 16:30:16 +02004614def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004615 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004616 CBufferedReaderTest, PyBufferedReaderTest,
4617 CBufferedWriterTest, PyBufferedWriterTest,
4618 CBufferedRWPairTest, PyBufferedRWPairTest,
4619 CBufferedRandomTest, PyBufferedRandomTest,
4620 StatefulIncrementalDecoderTest,
4621 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4622 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004623 CMiscIOTest, PyMiscIOTest,
4624 CSignalsTest, PySignalsTest,
4625 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004626
4627 # Put the namespaces of the IO module we are testing and some useful mock
4628 # classes in the __dict__ of each test.
4629 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004630 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4631 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004632 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4633 c_io_ns = {name : getattr(io, name) for name in all_members}
4634 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4635 globs = globals()
4636 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4637 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004638 for test in tests:
4639 if test.__name__.startswith("C"):
4640 for name, obj in c_io_ns.items():
4641 setattr(test, name, obj)
4642 elif test.__name__.startswith("Py"):
4643 for name, obj in py_io_ns.items():
4644 setattr(test, name, obj)
4645
Ezio Melottidaa42c72013-03-23 16:30:16 +02004646 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4647 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004648
4649if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004650 unittest.main()