blob: fbaea3aaec3cbe48f35e23483484f8db8b8550ec [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Hai Shi883bc632020-07-06 17:12:49 +080043from test.support import import_helper
44from test.support import os_helper
Hai Shie80697d2020-05-28 06:10:27 +080045from test.support import threading_helper
Hai Shi883bc632020-07-06 17:12:49 +080046from test.support import warnings_helper
47from test.support.os_helper import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000048
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000049import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000050import io # C implementation of io
51import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000052
Martin Panter6bb91f32016-05-28 00:41:57 +000053try:
54 import ctypes
55except ImportError:
56 def byteslike(*pos, **kw):
57 return array.array("b", bytes(*pos, **kw))
58else:
59 def byteslike(*pos, **kw):
60 """Create a bytes-like object having no string or sequence methods"""
61 data = bytes(*pos, **kw)
62 obj = EmptyStruct()
63 ctypes.resize(obj, len(data))
64 memoryview(obj).cast("B")[:] = data
65 return obj
66 class EmptyStruct(ctypes.Structure):
67 pass
68
Gregory P. Smithe5796c42018-12-30 20:17:57 -080069_cflags = sysconfig.get_config_var('CFLAGS') or ''
70_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
71MEMORY_SANITIZER = (
72 '-fsanitize=memory' in _cflags or
73 '--with-memory-sanitizer' in _config_args
74)
75
Victor Stinnerbc2aa812019-05-23 03:45:09 +020076# Does io.IOBase finalizer log the exception if the close() method fails?
77# The exception is ignored silently by default in release build.
78IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020079
80
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081def _default_chunk_size():
82 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000083 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000084 return f._CHUNK_SIZE
85
86
Antoine Pitrou328ec742010-09-14 18:37:24 +000087class MockRawIOWithoutRead:
88 """A RawIO implementation without read(), so as to exercise the default
89 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000090
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000091 def __init__(self, read_stack=()):
92 self._read_stack = list(read_stack)
93 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000095 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000096
Guido van Rossum01a27522007-03-07 01:00:12 +000097 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000098 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000099 return len(b)
100
101 def writable(self):
102 return True
103
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000104 def fileno(self):
105 return 42
106
107 def readable(self):
108 return True
109
Guido van Rossum01a27522007-03-07 01:00:12 +0000110 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111 return True
112
Guido van Rossum01a27522007-03-07 01:00:12 +0000113 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000115
116 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000117 return 0 # same comment as above
118
119 def readinto(self, buf):
120 self._reads += 1
121 max_len = len(buf)
122 try:
123 data = self._read_stack[0]
124 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000125 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000126 return 0
127 if data is None:
128 del self._read_stack[0]
129 return None
130 n = len(data)
131 if len(data) <= max_len:
132 del self._read_stack[0]
133 buf[:n] = data
134 return n
135 else:
136 buf[:] = data[:max_len]
137 self._read_stack[0] = data[max_len:]
138 return max_len
139
140 def truncate(self, pos=None):
141 return pos
142
Antoine Pitrou328ec742010-09-14 18:37:24 +0000143class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
144 pass
145
146class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
147 pass
148
149
150class MockRawIO(MockRawIOWithoutRead):
151
152 def read(self, n=None):
153 self._reads += 1
154 try:
155 return self._read_stack.pop(0)
156 except:
157 self._extraneous_reads += 1
158 return b""
159
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000160class CMockRawIO(MockRawIO, io.RawIOBase):
161 pass
162
163class PyMockRawIO(MockRawIO, pyio.RawIOBase):
164 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000165
Guido van Rossuma9e20242007-03-08 00:43:48 +0000166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000167class MisbehavedRawIO(MockRawIO):
168 def write(self, b):
169 return super().write(b) * 2
170
171 def read(self, n=None):
172 return super().read(n) * 2
173
174 def seek(self, pos, whence):
175 return -123
176
177 def tell(self):
178 return -456
179
180 def readinto(self, buf):
181 super().readinto(buf)
182 return len(buf) * 5
183
184class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
185 pass
186
187class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
188 pass
189
190
benfogle9703f092017-11-10 16:03:40 -0500191class SlowFlushRawIO(MockRawIO):
192 def __init__(self):
193 super().__init__()
194 self.in_flush = threading.Event()
195
196 def flush(self):
197 self.in_flush.set()
198 time.sleep(0.25)
199
200class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
201 pass
202
203class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
204 pass
205
206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000207class CloseFailureIO(MockRawIO):
208 closed = 0
209
210 def close(self):
211 if not self.closed:
212 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200213 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000214
215class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
216 pass
217
218class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
219 pass
220
221
222class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000223
224 def __init__(self, data):
225 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000226 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000227
228 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000229 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000230 self.read_history.append(None if res is None else len(res))
231 return res
232
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000233 def readinto(self, b):
234 res = super().readinto(b)
235 self.read_history.append(res)
236 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238class CMockFileIO(MockFileIO, io.BytesIO):
239 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000241class PyMockFileIO(MockFileIO, pyio.BytesIO):
242 pass
243
244
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000245class MockUnseekableIO:
246 def seekable(self):
247 return False
248
249 def seek(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
252 def tell(self, *args):
253 raise self.UnsupportedOperation("not seekable")
254
Martin Panter754aab22016-03-31 07:21:56 +0000255 def truncate(self, *args):
256 raise self.UnsupportedOperation("not seekable")
257
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000258class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
259 UnsupportedOperation = io.UnsupportedOperation
260
261class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
262 UnsupportedOperation = pyio.UnsupportedOperation
263
264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265class MockNonBlockWriterIO:
266
267 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000268 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000269 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000271 def pop_written(self):
272 s = b"".join(self._write_stack)
273 self._write_stack[:] = []
274 return s
275
276 def block_on(self, char):
277 """Block when a given char is encountered."""
278 self._blocker_char = char
279
280 def readable(self):
281 return True
282
283 def seekable(self):
284 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000285
Victor Stinnerb589cef2019-06-11 03:10:59 +0200286 def seek(self, pos, whence=0):
287 # naive implementation, enough for tests
288 return 0
289
Guido van Rossum01a27522007-03-07 01:00:12 +0000290 def writable(self):
291 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000292
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000293 def write(self, b):
294 b = bytes(b)
295 n = -1
296 if self._blocker_char:
297 try:
298 n = b.index(self._blocker_char)
299 except ValueError:
300 pass
301 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100302 if n > 0:
303 # write data up to the first blocker
304 self._write_stack.append(b[:n])
305 return n
306 else:
307 # cancel blocker and indicate would block
308 self._blocker_char = None
309 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000310 self._write_stack.append(b)
311 return len(b)
312
313class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
314 BlockingIOError = io.BlockingIOError
315
316class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
317 BlockingIOError = pyio.BlockingIOError
318
Guido van Rossuma9e20242007-03-08 00:43:48 +0000319
Guido van Rossum28524c72007-02-27 05:47:44 +0000320class IOTest(unittest.TestCase):
321
Neal Norwitze7789b12008-03-24 06:18:09 +0000322 def setUp(self):
Hai Shi883bc632020-07-06 17:12:49 +0800323 os_helper.unlink(os_helper.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000324
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000325 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +0800326 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000327
Guido van Rossum28524c72007-02-27 05:47:44 +0000328 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000330 f.truncate(0)
331 self.assertEqual(f.tell(), 5)
332 f.seek(0)
333
334 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.seek(0), 0)
336 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000337 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000338 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000339 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000340 buffer = bytearray(b" world\n\n\n")
341 self.assertEqual(f.write(buffer), 9)
342 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000343 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000344 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000345 self.assertEqual(f.seek(-1, 2), 13)
346 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000347
Guido van Rossum87429772007-04-10 21:06:59 +0000348 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000349 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000350 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000351
Guido van Rossum9b76da62007-04-11 01:09:03 +0000352 def read_ops(self, f, buffered=False):
353 data = f.read(5)
354 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000355 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000357 self.assertEqual(bytes(data), b" worl")
358 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000359 self.assertEqual(f.readinto(data), 2)
360 self.assertEqual(len(data), 5)
361 self.assertEqual(data[:2], b"d\n")
362 self.assertEqual(f.seek(0), 0)
363 self.assertEqual(f.read(20), b"hello world\n")
364 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000365 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000366 self.assertEqual(f.seek(-6, 2), 6)
367 self.assertEqual(f.read(5), b"world")
368 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000369 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000370 self.assertEqual(f.seek(-6, 1), 5)
371 self.assertEqual(f.read(5), b" worl")
372 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000373 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000374 if buffered:
375 f.seek(0)
376 self.assertEqual(f.read(), b"hello world\n")
377 f.seek(6)
378 self.assertEqual(f.read(), b"world\n")
379 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000380 f.seek(0)
381 data = byteslike(5)
382 self.assertEqual(f.readinto1(data), 5)
383 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000384
Guido van Rossum34d69e52007-04-10 20:08:41 +0000385 LARGE = 2**31
386
Guido van Rossum53807da2007-04-10 19:01:47 +0000387 def large_file_ops(self, f):
388 assert f.readable()
389 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100390 try:
391 self.assertEqual(f.seek(self.LARGE), self.LARGE)
392 except (OverflowError, ValueError):
393 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000394 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000395 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000396 self.assertEqual(f.tell(), self.LARGE + 3)
397 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000398 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000399 self.assertEqual(f.tell(), self.LARGE + 2)
400 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000401 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000402 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000403 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
404 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000405 self.assertEqual(f.read(2), b"x")
406
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000407 def test_invalid_operations(self):
408 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000409 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000410 for mode in ("w", "wb"):
Hai Shi883bc632020-07-06 17:12:49 +0800411 with self.open(os_helper.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000412 self.assertRaises(exc, fp.read)
413 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800414 with self.open(os_helper.TESTFN, "wb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000415 self.assertRaises(exc, fp.read)
416 self.assertRaises(exc, fp.readline)
Hai Shi883bc632020-07-06 17:12:49 +0800417 with self.open(os_helper.TESTFN, "rb", buffering=0) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, b"blah")
419 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800420 with self.open(os_helper.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000421 self.assertRaises(exc, fp.write, b"blah")
422 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hai Shi883bc632020-07-06 17:12:49 +0800423 with self.open(os_helper.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000424 self.assertRaises(exc, fp.write, "blah")
425 self.assertRaises(exc, fp.writelines, ["blah\n"])
426 # Non-zero seeking from current or end pos
427 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
428 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000429
Martin Panter754aab22016-03-31 07:21:56 +0000430 def test_optional_abilities(self):
431 # Test for OSError when optional APIs are not supported
432 # The purpose of this test is to try fileno(), reading, writing and
433 # seeking operations with various objects that indicate they do not
434 # support these operations.
435
436 def pipe_reader():
437 [r, w] = os.pipe()
438 os.close(w) # So that read() is harmless
439 return self.FileIO(r, "r")
440
441 def pipe_writer():
442 [r, w] = os.pipe()
443 self.addCleanup(os.close, r)
444 # Guarantee that we can write into the pipe without blocking
445 thread = threading.Thread(target=os.read, args=(r, 100))
446 thread.start()
447 self.addCleanup(thread.join)
448 return self.FileIO(w, "w")
449
450 def buffered_reader():
451 return self.BufferedReader(self.MockUnseekableIO())
452
453 def buffered_writer():
454 return self.BufferedWriter(self.MockUnseekableIO())
455
456 def buffered_random():
457 return self.BufferedRandom(self.BytesIO())
458
459 def buffered_rw_pair():
460 return self.BufferedRWPair(self.MockUnseekableIO(),
461 self.MockUnseekableIO())
462
463 def text_reader():
464 class UnseekableReader(self.MockUnseekableIO):
465 writable = self.BufferedIOBase.writable
466 write = self.BufferedIOBase.write
467 return self.TextIOWrapper(UnseekableReader(), "ascii")
468
469 def text_writer():
470 class UnseekableWriter(self.MockUnseekableIO):
471 readable = self.BufferedIOBase.readable
472 read = self.BufferedIOBase.read
473 return self.TextIOWrapper(UnseekableWriter(), "ascii")
474
475 tests = (
476 (pipe_reader, "fr"), (pipe_writer, "fw"),
477 (buffered_reader, "r"), (buffered_writer, "w"),
478 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
479 (text_reader, "r"), (text_writer, "w"),
480 (self.BytesIO, "rws"), (self.StringIO, "rws"),
481 )
482 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000483 with self.subTest(test), test() as obj:
484 readable = "r" in abilities
485 self.assertEqual(obj.readable(), readable)
486 writable = "w" in abilities
487 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000488
489 if isinstance(obj, self.TextIOBase):
490 data = "3"
491 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
492 data = b"3"
493 else:
494 self.fail("Unknown base class")
495
496 if "f" in abilities:
497 obj.fileno()
498 else:
499 self.assertRaises(OSError, obj.fileno)
500
501 if readable:
502 obj.read(1)
503 obj.read()
504 else:
505 self.assertRaises(OSError, obj.read, 1)
506 self.assertRaises(OSError, obj.read)
507
508 if writable:
509 obj.write(data)
510 else:
511 self.assertRaises(OSError, obj.write, data)
512
Martin Panter3ee147f2016-03-31 21:05:31 +0000513 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000514 pipe_reader, pipe_writer):
515 # Pipes seem to appear as seekable on Windows
516 continue
517 seekable = "s" in abilities
518 self.assertEqual(obj.seekable(), seekable)
519
Martin Panter754aab22016-03-31 07:21:56 +0000520 if seekable:
521 obj.tell()
522 obj.seek(0)
523 else:
524 self.assertRaises(OSError, obj.tell)
525 self.assertRaises(OSError, obj.seek, 0)
526
527 if writable and seekable:
528 obj.truncate()
529 obj.truncate(0)
530 else:
531 self.assertRaises(OSError, obj.truncate)
532 self.assertRaises(OSError, obj.truncate, 0)
533
Antoine Pitrou13348842012-01-29 18:36:34 +0100534 def test_open_handles_NUL_chars(self):
535 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300536 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100537
538 bytes_fn = bytes(fn_with_NUL, 'ascii')
539 with warnings.catch_warnings():
540 warnings.simplefilter("ignore", DeprecationWarning)
541 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100542
Guido van Rossum28524c72007-02-27 05:47:44 +0000543 def test_raw_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800544 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000545 self.assertEqual(f.readable(), False)
546 self.assertEqual(f.writable(), True)
547 self.assertEqual(f.seekable(), True)
548 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800549 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000550 self.assertEqual(f.readable(), True)
551 self.assertEqual(f.writable(), False)
552 self.assertEqual(f.seekable(), True)
553 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000554
Guido van Rossum87429772007-04-10 21:06:59 +0000555 def test_buffered_file_io(self):
Hai Shi883bc632020-07-06 17:12:49 +0800556 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000557 self.assertEqual(f.readable(), False)
558 self.assertEqual(f.writable(), True)
559 self.assertEqual(f.seekable(), True)
560 self.write_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800561 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000562 self.assertEqual(f.readable(), True)
563 self.assertEqual(f.writable(), False)
564 self.assertEqual(f.seekable(), True)
565 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000566
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000567 def test_readline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800568 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000569 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
Hai Shi883bc632020-07-06 17:12:49 +0800570 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000571 self.assertEqual(f.readline(), b"abc\n")
572 self.assertEqual(f.readline(10), b"def\n")
573 self.assertEqual(f.readline(2), b"xy")
574 self.assertEqual(f.readline(4), b"zzy\n")
575 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000576 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000577 self.assertRaises(TypeError, f.readline, 5.3)
Hai Shi883bc632020-07-06 17:12:49 +0800578 with self.open(os_helper.TESTFN, "r") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000579 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000580
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300581 def test_readline_nonsizeable(self):
582 # Issue #30061
583 # Crash when readline() returns an object without __len__
584 class R(self.IOBase):
585 def readline(self):
586 return None
587 self.assertRaises((TypeError, StopIteration), next, R())
588
589 def test_next_nonsizeable(self):
590 # Issue #30061
591 # Crash when __next__() returns an object without __len__
592 class R(self.IOBase):
593 def __next__(self):
594 return None
595 self.assertRaises(TypeError, R().readlines, 1)
596
Guido van Rossum28524c72007-02-27 05:47:44 +0000597 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000599 self.write_ops(f)
600 data = f.getvalue()
601 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000602 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000603 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000604
Guido van Rossum53807da2007-04-10 19:01:47 +0000605 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300606 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800607 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000608 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200609 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600610 support.requires(
611 'largefile',
612 'test requires %s bytes and a long time to run' % self.LARGE)
Hai Shi883bc632020-07-06 17:12:49 +0800613 with self.open(os_helper.TESTFN, "w+b", 0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000614 self.large_file_ops(f)
Hai Shi883bc632020-07-06 17:12:49 +0800615 with self.open(os_helper.TESTFN, "w+b") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000616 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000617
618 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300619 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000620 f = None
Hai Shi883bc632020-07-06 17:12:49 +0800621 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000622 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000623 self.assertEqual(f.closed, True)
624 f = None
625 try:
Hai Shi883bc632020-07-06 17:12:49 +0800626 with self.open(os_helper.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000627 1/0
628 except ZeroDivisionError:
629 self.assertEqual(f.closed, True)
630 else:
631 self.fail("1/0 didn't raise an exception")
632
Antoine Pitrou08838b62009-01-21 00:55:13 +0000633 # issue 5008
634 def test_append_mode_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +0800635 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000636 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800637 with self.open(os_helper.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000638 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800639 with self.open(os_helper.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000640 self.assertEqual(f.tell(), 3)
Hai Shi883bc632020-07-06 17:12:49 +0800641 with self.open(os_helper.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300642 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000643
Guido van Rossum87429772007-04-10 21:06:59 +0000644 def test_destructor(self):
645 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000647 def __del__(self):
648 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000649 try:
650 f = super().__del__
651 except AttributeError:
652 pass
653 else:
654 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000655 def close(self):
656 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000658 def flush(self):
659 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000660 super().flush()
Hai Shi883bc632020-07-06 17:12:49 +0800661 with warnings_helper.check_warnings(('', ResourceWarning)):
662 f = MyFileIO(os_helper.TESTFN, "wb")
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000663 f.write(b"xxx")
664 del f
665 support.gc_collect()
666 self.assertEqual(record, [1, 2, 3])
Hai Shi883bc632020-07-06 17:12:49 +0800667 with self.open(os_helper.TESTFN, "rb") as f:
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000668 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000669
670 def _check_base_destructor(self, base):
671 record = []
672 class MyIO(base):
673 def __init__(self):
674 # This exercises the availability of attributes on object
675 # destruction.
676 # (in the C version, close() is called by the tp_dealloc
677 # function, not by __del__)
678 self.on_del = 1
679 self.on_close = 2
680 self.on_flush = 3
681 def __del__(self):
682 record.append(self.on_del)
683 try:
684 f = super().__del__
685 except AttributeError:
686 pass
687 else:
688 f()
689 def close(self):
690 record.append(self.on_close)
691 super().close()
692 def flush(self):
693 record.append(self.on_flush)
694 super().flush()
695 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000696 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000697 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000698 self.assertEqual(record, [1, 2, 3])
699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000700 def test_IOBase_destructor(self):
701 self._check_base_destructor(self.IOBase)
702
703 def test_RawIOBase_destructor(self):
704 self._check_base_destructor(self.RawIOBase)
705
706 def test_BufferedIOBase_destructor(self):
707 self._check_base_destructor(self.BufferedIOBase)
708
709 def test_TextIOBase_destructor(self):
710 self._check_base_destructor(self.TextIOBase)
711
Guido van Rossum87429772007-04-10 21:06:59 +0000712 def test_close_flushes(self):
Hai Shi883bc632020-07-06 17:12:49 +0800713 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000714 f.write(b"xxx")
Hai Shi883bc632020-07-06 17:12:49 +0800715 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson45cec322009-04-24 23:14:50 +0000716 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000717
Guido van Rossumd4103952007-04-12 05:44:49 +0000718 def test_array_writes(self):
719 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000720 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000721 def check(f):
722 with f:
723 self.assertEqual(f.write(a), n)
724 f.writelines((a,))
725 check(self.BytesIO())
Hai Shi883bc632020-07-06 17:12:49 +0800726 check(self.FileIO(os_helper.TESTFN, "w"))
Martin Panter6bb91f32016-05-28 00:41:57 +0000727 check(self.BufferedWriter(self.MockRawIO()))
728 check(self.BufferedRandom(self.MockRawIO()))
729 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000730
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000731 def test_closefd(self):
Hai Shi883bc632020-07-06 17:12:49 +0800732 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000733 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 def test_read_closed(self):
Hai Shi883bc632020-07-06 17:12:49 +0800736 with self.open(os_helper.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000737 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800738 with self.open(os_helper.TESTFN, "r") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000739 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000740 self.assertEqual(file.read(), "egg\n")
741 file.seek(0)
742 file.close()
743 self.assertRaises(ValueError, file.read)
Hai Shi883bc632020-07-06 17:12:49 +0800744 with self.open(os_helper.TESTFN, "rb") as f:
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100745 file = self.open(f.fileno(), "rb", closefd=False)
746 self.assertEqual(file.read()[:3], b"egg")
747 file.close()
748 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000749
750 def test_no_closefd_with_filename(self):
751 # can't use closefd in combination with a file name
Hai Shi883bc632020-07-06 17:12:49 +0800752 self.assertRaises(ValueError, self.open, os_helper.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000753
754 def test_closefd_attr(self):
Hai Shi883bc632020-07-06 17:12:49 +0800755 with self.open(os_helper.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000756 f.write(b"egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800757 with self.open(os_helper.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000758 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000760 self.assertEqual(file.buffer.raw.closefd, False)
761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000762 def test_garbage_collection(self):
763 # FileIO objects are collected, and collecting them flushes
764 # all data to disk.
Hai Shi883bc632020-07-06 17:12:49 +0800765 with warnings_helper.check_warnings(('', ResourceWarning)):
766 f = self.FileIO(os_helper.TESTFN, "wb")
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000767 f.write(b"abcxxx")
768 f.f = f
769 wr = weakref.ref(f)
770 del f
771 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300772 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +0800773 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000774 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000775
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000776 def test_unbounded_file(self):
777 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
778 zero = "/dev/zero"
779 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000780 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000781 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000782 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000783 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800784 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000785 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000786 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000787 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000788 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000789 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000790 self.assertRaises(OverflowError, f.read)
791
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200792 def check_flush_error_on_close(self, *args, **kwargs):
793 # Test that the file is closed despite failed flush
794 # and that flush() is called before file closed.
795 f = self.open(*args, **kwargs)
796 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000797 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200798 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200799 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000800 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200801 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600802 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200803 self.assertTrue(closed) # flush() called
804 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200805 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200806
807 def test_flush_error_on_close(self):
808 # raw file
809 # Issue #5700: io.FileIO calls flush() after file closed
Hai Shi883bc632020-07-06 17:12:49 +0800810 self.check_flush_error_on_close(os_helper.TESTFN, 'wb', buffering=0)
811 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200812 self.check_flush_error_on_close(fd, 'wb', buffering=0)
Hai Shi883bc632020-07-06 17:12:49 +0800813 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200814 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
815 os.close(fd)
816 # buffered io
Hai Shi883bc632020-07-06 17:12:49 +0800817 self.check_flush_error_on_close(os_helper.TESTFN, 'wb')
818 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200819 self.check_flush_error_on_close(fd, 'wb')
Hai Shi883bc632020-07-06 17:12:49 +0800820 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200821 self.check_flush_error_on_close(fd, 'wb', closefd=False)
822 os.close(fd)
823 # text io
Hai Shi883bc632020-07-06 17:12:49 +0800824 self.check_flush_error_on_close(os_helper.TESTFN, 'w')
825 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200826 self.check_flush_error_on_close(fd, 'w')
Hai Shi883bc632020-07-06 17:12:49 +0800827 fd = os.open(os_helper.TESTFN, os.O_WRONLY|os.O_CREAT)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200828 self.check_flush_error_on_close(fd, 'w', closefd=False)
829 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000830
831 def test_multi_close(self):
Hai Shi883bc632020-07-06 17:12:49 +0800832 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000833 f.close()
834 f.close()
835 f.close()
836 self.assertRaises(ValueError, f.flush)
837
Antoine Pitrou328ec742010-09-14 18:37:24 +0000838 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530839 # Exercise the default limited RawIOBase.read(n) implementation (which
840 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000841 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
842 self.assertEqual(rawio.read(2), b"ab")
843 self.assertEqual(rawio.read(2), b"c")
844 self.assertEqual(rawio.read(2), b"d")
845 self.assertEqual(rawio.read(2), None)
846 self.assertEqual(rawio.read(2), b"ef")
847 self.assertEqual(rawio.read(2), b"g")
848 self.assertEqual(rawio.read(2), None)
849 self.assertEqual(rawio.read(2), b"")
850
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400851 def test_types_have_dict(self):
852 test = (
853 self.IOBase(),
854 self.RawIOBase(),
855 self.TextIOBase(),
856 self.StringIO(),
857 self.BytesIO()
858 )
859 for obj in test:
860 self.assertTrue(hasattr(obj, "__dict__"))
861
Ross Lagerwall59142db2011-10-31 20:34:46 +0200862 def test_opener(self):
Hai Shi883bc632020-07-06 17:12:49 +0800863 with self.open(os_helper.TESTFN, "w") as f:
Ross Lagerwall59142db2011-10-31 20:34:46 +0200864 f.write("egg\n")
Hai Shi883bc632020-07-06 17:12:49 +0800865 fd = os.open(os_helper.TESTFN, os.O_RDONLY)
Ross Lagerwall59142db2011-10-31 20:34:46 +0200866 def opener(path, flags):
867 return fd
868 with self.open("non-existent", "r", opener=opener) as f:
869 self.assertEqual(f.read(), "egg\n")
870
Barry Warsaw480e2852016-06-08 17:47:26 -0400871 def test_bad_opener_negative_1(self):
872 # Issue #27066.
873 def badopener(fname, flags):
874 return -1
875 with self.assertRaises(ValueError) as cm:
876 open('non-existent', 'r', opener=badopener)
877 self.assertEqual(str(cm.exception), 'opener returned -1')
878
879 def test_bad_opener_other_negative(self):
880 # Issue #27066.
881 def badopener(fname, flags):
882 return -2
883 with self.assertRaises(ValueError) as cm:
884 open('non-existent', 'r', opener=badopener)
885 self.assertEqual(str(cm.exception), 'opener returned -2')
886
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200887 def test_fileio_closefd(self):
888 # Issue #4841
889 with self.open(__file__, 'rb') as f1, \
890 self.open(__file__, 'rb') as f2:
891 fileio = self.FileIO(f1.fileno(), closefd=False)
892 # .__init__() must not close f1
893 fileio.__init__(f2.fileno(), closefd=False)
894 f1.readline()
895 # .close() must not close f2
896 fileio.close()
897 f2.readline()
898
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300899 def test_nonbuffered_textio(self):
Hai Shi883bc632020-07-06 17:12:49 +0800900 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300901 with self.assertRaises(ValueError):
Hai Shi883bc632020-07-06 17:12:49 +0800902 self.open(os_helper.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300903
904 def test_invalid_newline(self):
Hai Shi883bc632020-07-06 17:12:49 +0800905 with warnings_helper.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300906 with self.assertRaises(ValueError):
Hai Shi883bc632020-07-06 17:12:49 +0800907 self.open(os_helper.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300908
Martin Panter6bb91f32016-05-28 00:41:57 +0000909 def test_buffered_readinto_mixin(self):
910 # Test the implementation provided by BufferedIOBase
911 class Stream(self.BufferedIOBase):
912 def read(self, size):
913 return b"12345"
914 read1 = read
915 stream = Stream()
916 for method in ("readinto", "readinto1"):
917 with self.subTest(method):
918 buffer = byteslike(5)
919 self.assertEqual(getattr(stream, method)(buffer), 5)
920 self.assertEqual(bytes(buffer), b"12345")
921
Ethan Furmand62548a2016-06-04 14:38:43 -0700922 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700923 def check_path_succeeds(path):
924 with self.open(path, "w") as f:
925 f.write("egg\n")
926
927 with self.open(path, "r") as f:
928 self.assertEqual(f.read(), "egg\n")
929
Hai Shi883bc632020-07-06 17:12:49 +0800930 check_path_succeeds(FakePath(os_helper.TESTFN))
Serhiy Storchaka67987ac2020-07-27 20:58:35 +0300931 check_path_succeeds(FakePath(os.fsencode(os_helper.TESTFN)))
Ethan Furmand62548a2016-06-04 14:38:43 -0700932
Hai Shi883bc632020-07-06 17:12:49 +0800933 with self.open(os_helper.TESTFN, "w") as f:
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200934 bad_path = FakePath(f.fileno())
935 with self.assertRaises(TypeError):
936 self.open(bad_path, 'w')
937
938 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700939 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700940 self.open(bad_path, 'w')
941
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200942 bad_path = FakePath(FloatingPointError)
943 with self.assertRaises(FloatingPointError):
944 self.open(bad_path, 'w')
945
Ethan Furmand62548a2016-06-04 14:38:43 -0700946 # ensure that refcounting is correct with some error conditions
947 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Hai Shi883bc632020-07-06 17:12:49 +0800948 self.open(FakePath(os_helper.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700949
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530950 def test_RawIOBase_readall(self):
951 # Exercise the default unlimited RawIOBase.read() and readall()
952 # implementations.
953 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
954 self.assertEqual(rawio.read(), b"abcdefg")
955 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
956 self.assertEqual(rawio.readall(), b"abcdefg")
957
958 def test_BufferedIOBase_readinto(self):
959 # Exercise the default BufferedIOBase.readinto() and readinto1()
960 # implementations (which call read() or read1() internally).
961 class Reader(self.BufferedIOBase):
962 def __init__(self, avail):
963 self.avail = avail
964 def read(self, size):
965 result = self.avail[:size]
966 self.avail = self.avail[size:]
967 return result
968 def read1(self, size):
969 """Returns no more than 5 bytes at once"""
970 return self.read(min(size, 5))
971 tests = (
972 # (test method, total data available, read buffer size, expected
973 # read size)
974 ("readinto", 10, 5, 5),
975 ("readinto", 10, 6, 6), # More than read1() can return
976 ("readinto", 5, 6, 5), # Buffer larger than total available
977 ("readinto", 6, 7, 6),
978 ("readinto", 10, 0, 0), # Empty buffer
979 ("readinto1", 10, 5, 5), # Result limited to single read1() call
980 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
981 ("readinto1", 5, 6, 5), # Buffer larger than total available
982 ("readinto1", 6, 7, 5),
983 ("readinto1", 10, 0, 0), # Empty buffer
984 )
985 UNUSED_BYTE = 0x81
986 for test in tests:
987 with self.subTest(test):
988 method, avail, request, result = test
989 reader = Reader(bytes(range(avail)))
990 buffer = bytearray((UNUSED_BYTE,) * request)
991 method = getattr(reader, method)
992 self.assertEqual(method(buffer), result)
993 self.assertEqual(len(buffer), request)
994 self.assertSequenceEqual(buffer[:result], range(result))
995 unused = (UNUSED_BYTE,) * (request - result)
996 self.assertSequenceEqual(buffer[result:], unused)
997 self.assertEqual(len(reader.avail), avail - result)
998
Zackery Spytz28f07362018-07-17 00:31:44 -0600999 def test_close_assert(self):
1000 class R(self.IOBase):
1001 def __setattr__(self, name, value):
1002 pass
1003 def flush(self):
1004 raise OSError()
1005 f = R()
1006 # This would cause an assertion failure.
1007 self.assertRaises(OSError, f.close)
1008
Victor Stinner472f7942019-04-12 21:58:24 +02001009 # Silence destructor error
1010 R.flush = lambda self: None
1011
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001012
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001013class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001014
1015 def test_IOBase_finalize(self):
1016 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1017 # class which inherits IOBase and an object of this class are caught
1018 # in a reference cycle and close() is already in the method cache.
1019 class MyIO(self.IOBase):
1020 def close(self):
1021 pass
1022
1023 # create an instance to populate the method cache
1024 MyIO()
1025 obj = MyIO()
1026 obj.obj = obj
1027 wr = weakref.ref(obj)
1028 del MyIO
1029 del obj
1030 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001031 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001032
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001033class PyIOTest(IOTest):
1034 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001035
Guido van Rossuma9e20242007-03-08 00:43:48 +00001036
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001037@support.cpython_only
1038class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001039
Gregory P. Smith054b0652015-04-14 12:58:05 -07001040 def test_RawIOBase_io_in_pyio_match(self):
1041 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001042 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1043 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001044 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1045
1046 def test_RawIOBase_pyio_in_io_match(self):
1047 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1048 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1049 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1050
1051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052class CommonBufferedTests:
1053 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1054
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001055 def test_detach(self):
1056 raw = self.MockRawIO()
1057 buf = self.tp(raw)
1058 self.assertIs(buf.detach(), raw)
1059 self.assertRaises(ValueError, buf.detach)
1060
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001061 repr(buf) # Should still work
1062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 def test_fileno(self):
1064 rawio = self.MockRawIO()
1065 bufio = self.tp(rawio)
1066
Ezio Melottib3aedd42010-11-20 19:04:17 +00001067 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001068
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001069 def test_invalid_args(self):
1070 rawio = self.MockRawIO()
1071 bufio = self.tp(rawio)
1072 # Invalid whence
1073 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001074 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001075
1076 def test_override_destructor(self):
1077 tp = self.tp
1078 record = []
1079 class MyBufferedIO(tp):
1080 def __del__(self):
1081 record.append(1)
1082 try:
1083 f = super().__del__
1084 except AttributeError:
1085 pass
1086 else:
1087 f()
1088 def close(self):
1089 record.append(2)
1090 super().close()
1091 def flush(self):
1092 record.append(3)
1093 super().flush()
1094 rawio = self.MockRawIO()
1095 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001097 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001098 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001099
1100 def test_context_manager(self):
1101 # Test usability as a context manager
1102 rawio = self.MockRawIO()
1103 bufio = self.tp(rawio)
1104 def _with():
1105 with bufio:
1106 pass
1107 _with()
1108 # bufio should now be closed, and using it a second time should raise
1109 # a ValueError.
1110 self.assertRaises(ValueError, _with)
1111
1112 def test_error_through_destructor(self):
1113 # Test that the exception state is not modified by a destructor,
1114 # even if close() fails.
1115 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001116 with support.catch_unraisable_exception() as cm:
1117 with self.assertRaises(AttributeError):
1118 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001119
1120 if not IOBASE_EMITS_UNRAISABLE:
1121 self.assertIsNone(cm.unraisable)
1122 elif cm.unraisable is not None:
1123 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001124
Antoine Pitrou716c4442009-05-23 19:04:03 +00001125 def test_repr(self):
1126 raw = self.MockRawIO()
1127 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001128 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1129 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001130 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001131 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001132 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001133 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001134
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001135 def test_recursive_repr(self):
1136 # Issue #25455
1137 raw = self.MockRawIO()
1138 b = self.tp(raw)
1139 with support.swap_attr(raw, 'name', b):
1140 try:
1141 repr(b) # Should not crash
1142 except RuntimeError:
1143 pass
1144
Antoine Pitrou6be88762010-05-03 16:48:20 +00001145 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001146 # Test that buffered file is closed despite failed flush
1147 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001148 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001149 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001150 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001151 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001152 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001153 raw.flush = bad_flush
1154 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001155 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001156 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001157 self.assertTrue(raw.closed)
1158 self.assertTrue(closed) # flush() called
1159 self.assertFalse(closed[0]) # flush() called before file closed
1160 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001161 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001162
1163 def test_close_error_on_close(self):
1164 raw = self.MockRawIO()
1165 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001166 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001167 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001168 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001169 raw.close = bad_close
1170 b = self.tp(raw)
1171 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001172 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001173 b.close()
1174 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001175 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001176 self.assertEqual(err.exception.__context__.args, ('flush',))
1177 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001178
Victor Stinner472f7942019-04-12 21:58:24 +02001179 # Silence destructor error
1180 raw.close = lambda: None
1181 b.flush = lambda: None
1182
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001183 def test_nonnormalized_close_error_on_close(self):
1184 # Issue #21677
1185 raw = self.MockRawIO()
1186 def bad_flush():
1187 raise non_existing_flush
1188 def bad_close():
1189 raise non_existing_close
1190 raw.close = bad_close
1191 b = self.tp(raw)
1192 b.flush = bad_flush
1193 with self.assertRaises(NameError) as err: # exception not swallowed
1194 b.close()
1195 self.assertIn('non_existing_close', str(err.exception))
1196 self.assertIsInstance(err.exception.__context__, NameError)
1197 self.assertIn('non_existing_flush', str(err.exception.__context__))
1198 self.assertFalse(b.closed)
1199
Victor Stinner472f7942019-04-12 21:58:24 +02001200 # Silence destructor error
1201 b.flush = lambda: None
1202 raw.close = lambda: None
1203
Antoine Pitrou6be88762010-05-03 16:48:20 +00001204 def test_multi_close(self):
1205 raw = self.MockRawIO()
1206 b = self.tp(raw)
1207 b.close()
1208 b.close()
1209 b.close()
1210 self.assertRaises(ValueError, b.flush)
1211
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001212 def test_unseekable(self):
1213 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1214 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1215 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1216
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001217 def test_readonly_attributes(self):
1218 raw = self.MockRawIO()
1219 buf = self.tp(raw)
1220 x = self.MockRawIO()
1221 with self.assertRaises(AttributeError):
1222 buf.raw = x
1223
Guido van Rossum78892e42007-04-06 17:31:18 +00001224
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001225class SizeofTest:
1226
1227 @support.cpython_only
1228 def test_sizeof(self):
1229 bufsize1 = 4096
1230 bufsize2 = 8192
1231 rawio = self.MockRawIO()
1232 bufio = self.tp(rawio, buffer_size=bufsize1)
1233 size = sys.getsizeof(bufio) - bufsize1
1234 rawio = self.MockRawIO()
1235 bufio = self.tp(rawio, buffer_size=bufsize2)
1236 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1237
Jesus Ceadc469452012-10-04 12:37:56 +02001238 @support.cpython_only
1239 def test_buffer_freeing(self) :
1240 bufsize = 4096
1241 rawio = self.MockRawIO()
1242 bufio = self.tp(rawio, buffer_size=bufsize)
1243 size = sys.getsizeof(bufio) - bufsize
1244 bufio.close()
1245 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1248 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001249
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001250 def test_constructor(self):
1251 rawio = self.MockRawIO([b"abc"])
1252 bufio = self.tp(rawio)
1253 bufio.__init__(rawio)
1254 bufio.__init__(rawio, buffer_size=1024)
1255 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001256 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001257 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1258 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1259 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1260 rawio = self.MockRawIO([b"abc"])
1261 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001262 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001263
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001264 def test_uninitialized(self):
1265 bufio = self.tp.__new__(self.tp)
1266 del bufio
1267 bufio = self.tp.__new__(self.tp)
1268 self.assertRaisesRegex((ValueError, AttributeError),
1269 'uninitialized|has no attribute',
1270 bufio.read, 0)
1271 bufio.__init__(self.MockRawIO())
1272 self.assertEqual(bufio.read(0), b'')
1273
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001274 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001275 for arg in (None, 7):
1276 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1277 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001278 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 # Invalid args
1280 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001281
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001282 def test_read1(self):
1283 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1284 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001285 self.assertEqual(b"a", bufio.read(1))
1286 self.assertEqual(b"b", bufio.read1(1))
1287 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001288 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001289 self.assertEqual(b"c", bufio.read1(100))
1290 self.assertEqual(rawio._reads, 1)
1291 self.assertEqual(b"d", bufio.read1(100))
1292 self.assertEqual(rawio._reads, 2)
1293 self.assertEqual(b"efg", bufio.read1(100))
1294 self.assertEqual(rawio._reads, 3)
1295 self.assertEqual(b"", bufio.read1(100))
1296 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001297
1298 def test_read1_arbitrary(self):
1299 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1300 bufio = self.tp(rawio)
1301 self.assertEqual(b"a", bufio.read(1))
1302 self.assertEqual(b"bc", bufio.read1())
1303 self.assertEqual(b"d", bufio.read1())
1304 self.assertEqual(b"efg", bufio.read1(-1))
1305 self.assertEqual(rawio._reads, 3)
1306 self.assertEqual(b"", bufio.read1())
1307 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001308
1309 def test_readinto(self):
1310 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1311 bufio = self.tp(rawio)
1312 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001313 self.assertEqual(bufio.readinto(b), 2)
1314 self.assertEqual(b, b"ab")
1315 self.assertEqual(bufio.readinto(b), 2)
1316 self.assertEqual(b, b"cd")
1317 self.assertEqual(bufio.readinto(b), 2)
1318 self.assertEqual(b, b"ef")
1319 self.assertEqual(bufio.readinto(b), 1)
1320 self.assertEqual(b, b"gf")
1321 self.assertEqual(bufio.readinto(b), 0)
1322 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001323 rawio = self.MockRawIO((b"abc", None))
1324 bufio = self.tp(rawio)
1325 self.assertEqual(bufio.readinto(b), 2)
1326 self.assertEqual(b, b"ab")
1327 self.assertEqual(bufio.readinto(b), 1)
1328 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001329
Benjamin Petersona96fea02014-06-22 14:17:44 -07001330 def test_readinto1(self):
1331 buffer_size = 10
1332 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1333 bufio = self.tp(rawio, buffer_size=buffer_size)
1334 b = bytearray(2)
1335 self.assertEqual(bufio.peek(3), b'abc')
1336 self.assertEqual(rawio._reads, 1)
1337 self.assertEqual(bufio.readinto1(b), 2)
1338 self.assertEqual(b, b"ab")
1339 self.assertEqual(rawio._reads, 1)
1340 self.assertEqual(bufio.readinto1(b), 1)
1341 self.assertEqual(b[:1], b"c")
1342 self.assertEqual(rawio._reads, 1)
1343 self.assertEqual(bufio.readinto1(b), 2)
1344 self.assertEqual(b, b"de")
1345 self.assertEqual(rawio._reads, 2)
1346 b = bytearray(2*buffer_size)
1347 self.assertEqual(bufio.peek(3), b'fgh')
1348 self.assertEqual(rawio._reads, 3)
1349 self.assertEqual(bufio.readinto1(b), 6)
1350 self.assertEqual(b[:6], b"fghjkl")
1351 self.assertEqual(rawio._reads, 4)
1352
1353 def test_readinto_array(self):
1354 buffer_size = 60
1355 data = b"a" * 26
1356 rawio = self.MockRawIO((data,))
1357 bufio = self.tp(rawio, buffer_size=buffer_size)
1358
1359 # Create an array with element size > 1 byte
1360 b = array.array('i', b'x' * 32)
1361 assert len(b) != 16
1362
1363 # Read into it. We should get as many *bytes* as we can fit into b
1364 # (which is more than the number of elements)
1365 n = bufio.readinto(b)
1366 self.assertGreater(n, len(b))
1367
1368 # Check that old contents of b are preserved
1369 bm = memoryview(b).cast('B')
1370 self.assertLess(n, len(bm))
1371 self.assertEqual(bm[:n], data[:n])
1372 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1373
1374 def test_readinto1_array(self):
1375 buffer_size = 60
1376 data = b"a" * 26
1377 rawio = self.MockRawIO((data,))
1378 bufio = self.tp(rawio, buffer_size=buffer_size)
1379
1380 # Create an array with element size > 1 byte
1381 b = array.array('i', b'x' * 32)
1382 assert len(b) != 16
1383
1384 # Read into it. We should get as many *bytes* as we can fit into b
1385 # (which is more than the number of elements)
1386 n = bufio.readinto1(b)
1387 self.assertGreater(n, len(b))
1388
1389 # Check that old contents of b are preserved
1390 bm = memoryview(b).cast('B')
1391 self.assertLess(n, len(bm))
1392 self.assertEqual(bm[:n], data[:n])
1393 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1394
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001395 def test_readlines(self):
1396 def bufio():
1397 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1398 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001399 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1400 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1401 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001402
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001404 data = b"abcdefghi"
1405 dlen = len(data)
1406
1407 tests = [
1408 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1409 [ 100, [ 3, 3, 3], [ dlen ] ],
1410 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1411 ]
1412
1413 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 rawio = self.MockFileIO(data)
1415 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001416 pos = 0
1417 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001418 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001419 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001420 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001422
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001423 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001424 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001425 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1426 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001427 self.assertEqual(b"abcd", bufio.read(6))
1428 self.assertEqual(b"e", bufio.read(1))
1429 self.assertEqual(b"fg", bufio.read())
1430 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001431 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001432 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001433
Victor Stinnera80987f2011-05-25 22:47:16 +02001434 rawio = self.MockRawIO((b"a", None, None))
1435 self.assertEqual(b"a", rawio.readall())
1436 self.assertIsNone(rawio.readall())
1437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001438 def test_read_past_eof(self):
1439 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1440 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001441
Ezio Melottib3aedd42010-11-20 19:04:17 +00001442 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001443
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001444 def test_read_all(self):
1445 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1446 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001447
Ezio Melottib3aedd42010-11-20 19:04:17 +00001448 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001449
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001450 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001451 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001452 try:
1453 # Write out many bytes with exactly the same number of 0's,
1454 # 1's... 255's. This will help us check that concurrent reading
1455 # doesn't duplicate or forget contents.
1456 N = 1000
1457 l = list(range(256)) * N
1458 random.shuffle(l)
1459 s = bytes(bytearray(l))
Hai Shi883bc632020-07-06 17:12:49 +08001460 with self.open(os_helper.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001461 f.write(s)
Hai Shi883bc632020-07-06 17:12:49 +08001462 with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001463 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001464 errors = []
1465 results = []
1466 def f():
1467 try:
1468 # Intra-buffer read then buffer-flushing read
1469 for n in cycle([1, 19]):
1470 s = bufio.read(n)
1471 if not s:
1472 break
1473 # list.append() is atomic
1474 results.append(s)
1475 except Exception as e:
1476 errors.append(e)
1477 raise
1478 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001479 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001480 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001481 self.assertFalse(errors,
1482 "the following exceptions were caught: %r" % errors)
1483 s = b''.join(results)
1484 for i in range(256):
1485 c = bytes(bytearray([i]))
1486 self.assertEqual(s.count(c), N)
1487 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001488 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001489
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001490 def test_unseekable(self):
1491 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1492 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1493 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1494 bufio.read(1)
1495 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1496 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1497
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001498 def test_misbehaved_io(self):
1499 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1500 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001501 self.assertRaises(OSError, bufio.seek, 0)
1502 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001503
Victor Stinnerb589cef2019-06-11 03:10:59 +02001504 # Silence destructor error
1505 bufio.close = lambda: None
1506
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001507 def test_no_extraneous_read(self):
1508 # Issue #9550; when the raw IO object has satisfied the read request,
1509 # we should not issue any additional reads, otherwise it may block
1510 # (e.g. socket).
1511 bufsize = 16
1512 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1513 rawio = self.MockRawIO([b"x" * n])
1514 bufio = self.tp(rawio, bufsize)
1515 self.assertEqual(bufio.read(n), b"x" * n)
1516 # Simple case: one raw read is enough to satisfy the request.
1517 self.assertEqual(rawio._extraneous_reads, 0,
1518 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1519 # A more complex case where two raw reads are needed to satisfy
1520 # the request.
1521 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1522 bufio = self.tp(rawio, bufsize)
1523 self.assertEqual(bufio.read(n), b"x" * n)
1524 self.assertEqual(rawio._extraneous_reads, 0,
1525 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1526
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001527 def test_read_on_closed(self):
1528 # Issue #23796
1529 b = io.BufferedReader(io.BytesIO(b"12"))
1530 b.read(1)
1531 b.close()
1532 self.assertRaises(ValueError, b.peek)
1533 self.assertRaises(ValueError, b.read1, 1)
1534
Berker Peksagfd5116c2020-02-21 20:57:26 +03001535 def test_truncate_on_read_only(self):
1536 rawio = self.MockFileIO(b"abc")
1537 bufio = self.tp(rawio)
1538 self.assertFalse(bufio.writable())
1539 self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1540 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1541
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001542
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001543class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 tp = io.BufferedReader
1545
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001546 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1547 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548 def test_constructor(self):
1549 BufferedReaderTest.test_constructor(self)
1550 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001551 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001552 if sys.maxsize > 0x7FFFFFFF:
1553 rawio = self.MockRawIO()
1554 bufio = self.tp(rawio)
1555 self.assertRaises((OverflowError, MemoryError, ValueError),
1556 bufio.__init__, rawio, sys.maxsize)
1557
1558 def test_initialization(self):
1559 rawio = self.MockRawIO([b"abc"])
1560 bufio = self.tp(rawio)
1561 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1562 self.assertRaises(ValueError, bufio.read)
1563 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1564 self.assertRaises(ValueError, bufio.read)
1565 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1566 self.assertRaises(ValueError, bufio.read)
1567
1568 def test_misbehaved_io_read(self):
1569 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1570 bufio = self.tp(rawio)
1571 # _pyio.BufferedReader seems to implement reading different, so that
1572 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001573 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001574
1575 def test_garbage_collection(self):
1576 # C BufferedReader objects are collected.
1577 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001578 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1579 with warnings_helper.check_warnings(('', ResourceWarning)):
1580 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001581 f = self.tp(rawio)
1582 f.f = f
1583 wr = weakref.ref(f)
1584 del f
1585 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001586 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001587
R David Murray67bfe802013-02-23 21:51:05 -05001588 def test_args_error(self):
1589 # Issue #17275
1590 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1591 self.tp(io.BytesIO(), 1024, 1024, 1024)
1592
David Szotten86663562020-06-16 00:53:57 +01001593 def test_bad_readinto_value(self):
1594 rawio = io.BufferedReader(io.BytesIO(b"12"))
1595 rawio.readinto = lambda buf: -1
1596 bufio = self.tp(rawio)
1597 with self.assertRaises(OSError) as cm:
1598 bufio.readline()
1599 self.assertIsNone(cm.exception.__cause__)
1600
1601 def test_bad_readinto_type(self):
1602 rawio = io.BufferedReader(io.BytesIO(b"12"))
1603 rawio.readinto = lambda buf: b''
1604 bufio = self.tp(rawio)
1605 with self.assertRaises(OSError) as cm:
1606 bufio.readline()
1607 self.assertIsInstance(cm.exception.__cause__, TypeError)
1608
R David Murray67bfe802013-02-23 21:51:05 -05001609
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610class PyBufferedReaderTest(BufferedReaderTest):
1611 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001612
Guido van Rossuma9e20242007-03-08 00:43:48 +00001613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001614class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1615 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617 def test_constructor(self):
1618 rawio = self.MockRawIO()
1619 bufio = self.tp(rawio)
1620 bufio.__init__(rawio)
1621 bufio.__init__(rawio, buffer_size=1024)
1622 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001623 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 bufio.flush()
1625 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1626 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1627 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1628 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001629 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001630 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001631 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001632
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001633 def test_uninitialized(self):
1634 bufio = self.tp.__new__(self.tp)
1635 del bufio
1636 bufio = self.tp.__new__(self.tp)
1637 self.assertRaisesRegex((ValueError, AttributeError),
1638 'uninitialized|has no attribute',
1639 bufio.write, b'')
1640 bufio.__init__(self.MockRawIO())
1641 self.assertEqual(bufio.write(b''), 0)
1642
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001643 def test_detach_flush(self):
1644 raw = self.MockRawIO()
1645 buf = self.tp(raw)
1646 buf.write(b"howdy!")
1647 self.assertFalse(raw._write_stack)
1648 buf.detach()
1649 self.assertEqual(raw._write_stack, [b"howdy!"])
1650
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001651 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001652 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001653 writer = self.MockRawIO()
1654 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001655 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001656 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001657 buffer = bytearray(b"def")
1658 bufio.write(buffer)
1659 buffer[:] = b"***" # Overwrite our copy of the data
1660 bufio.flush()
1661 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 def test_write_overflow(self):
1664 writer = self.MockRawIO()
1665 bufio = self.tp(writer, 8)
1666 contents = b"abcdefghijklmnop"
1667 for n in range(0, len(contents), 3):
1668 bufio.write(contents[n:n+3])
1669 flushed = b"".join(writer._write_stack)
1670 # At least (total - 8) bytes were implicitly flushed, perhaps more
1671 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001672 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 def check_writes(self, intermediate_func):
1675 # Lots of writes, test the flushed output is as expected.
1676 contents = bytes(range(256)) * 1000
1677 n = 0
1678 writer = self.MockRawIO()
1679 bufio = self.tp(writer, 13)
1680 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1681 def gen_sizes():
1682 for size in count(1):
1683 for i in range(15):
1684 yield size
1685 sizes = gen_sizes()
1686 while n < len(contents):
1687 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001688 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 intermediate_func(bufio)
1690 n += size
1691 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001692 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 def test_writes(self):
1695 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 def test_writes_and_flushes(self):
1698 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 def test_writes_and_seeks(self):
1701 def _seekabs(bufio):
1702 pos = bufio.tell()
1703 bufio.seek(pos + 1, 0)
1704 bufio.seek(pos - 1, 0)
1705 bufio.seek(pos, 0)
1706 self.check_writes(_seekabs)
1707 def _seekrel(bufio):
1708 pos = bufio.seek(0, 1)
1709 bufio.seek(+1, 1)
1710 bufio.seek(-1, 1)
1711 bufio.seek(pos, 0)
1712 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001714 def test_writes_and_truncates(self):
1715 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 def test_write_non_blocking(self):
1718 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001719 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001720
Ezio Melottib3aedd42010-11-20 19:04:17 +00001721 self.assertEqual(bufio.write(b"abcd"), 4)
1722 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 # 1 byte will be written, the rest will be buffered
1724 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001725 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001726
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001727 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1728 raw.block_on(b"0")
1729 try:
1730 bufio.write(b"opqrwxyz0123456789")
1731 except self.BlockingIOError as e:
1732 written = e.characters_written
1733 else:
1734 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001735 self.assertEqual(written, 16)
1736 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001738
Ezio Melottib3aedd42010-11-20 19:04:17 +00001739 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001740 s = raw.pop_written()
1741 # Previously buffered bytes were flushed
1742 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001743
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001744 def test_write_and_rewind(self):
1745 raw = io.BytesIO()
1746 bufio = self.tp(raw, 4)
1747 self.assertEqual(bufio.write(b"abcdef"), 6)
1748 self.assertEqual(bufio.tell(), 6)
1749 bufio.seek(0, 0)
1750 self.assertEqual(bufio.write(b"XY"), 2)
1751 bufio.seek(6, 0)
1752 self.assertEqual(raw.getvalue(), b"XYcdef")
1753 self.assertEqual(bufio.write(b"123456"), 6)
1754 bufio.flush()
1755 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001756
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001757 def test_flush(self):
1758 writer = self.MockRawIO()
1759 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001760 bufio.write(b"abc")
1761 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001762 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001763
Antoine Pitrou131a4892012-10-16 22:57:11 +02001764 def test_writelines(self):
1765 l = [b'ab', b'cd', b'ef']
1766 writer = self.MockRawIO()
1767 bufio = self.tp(writer, 8)
1768 bufio.writelines(l)
1769 bufio.flush()
1770 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1771
1772 def test_writelines_userlist(self):
1773 l = UserList([b'ab', b'cd', b'ef'])
1774 writer = self.MockRawIO()
1775 bufio = self.tp(writer, 8)
1776 bufio.writelines(l)
1777 bufio.flush()
1778 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1779
1780 def test_writelines_error(self):
1781 writer = self.MockRawIO()
1782 bufio = self.tp(writer, 8)
1783 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1784 self.assertRaises(TypeError, bufio.writelines, None)
1785 self.assertRaises(TypeError, bufio.writelines, 'abc')
1786
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001787 def test_destructor(self):
1788 writer = self.MockRawIO()
1789 bufio = self.tp(writer, 8)
1790 bufio.write(b"abc")
1791 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001792 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001793 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001794
1795 def test_truncate(self):
1796 # Truncate implicitly flushes the buffer.
Hai Shi883bc632020-07-06 17:12:49 +08001797 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1798 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001799 bufio = self.tp(raw, 8)
1800 bufio.write(b"abcdef")
1801 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001802 self.assertEqual(bufio.tell(), 6)
Hai Shi883bc632020-07-06 17:12:49 +08001803 with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001804 self.assertEqual(f.read(), b"abc")
1805
Nitish Chandra059f58c2018-01-28 21:30:09 +05301806 def test_truncate_after_write(self):
1807 # Ensure that truncate preserves the file position after
1808 # writes longer than the buffer size.
1809 # Issue: https://bugs.python.org/issue32228
Hai Shi883bc632020-07-06 17:12:49 +08001810 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1811 with self.open(os_helper.TESTFN, "wb") as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301812 # Fill with some buffer
1813 f.write(b'\x00' * 10000)
1814 buffer_sizes = [8192, 4096, 200]
1815 for buffer_size in buffer_sizes:
Hai Shi883bc632020-07-06 17:12:49 +08001816 with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
Nitish Chandra059f58c2018-01-28 21:30:09 +05301817 f.write(b'\x00' * (buffer_size + 1))
1818 # After write write_pos and write_end are set to 0
1819 f.read(1)
1820 # read operation makes sure that pos != raw_pos
1821 f.truncate()
1822 self.assertEqual(f.tell(), buffer_size + 2)
1823
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001824 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001826 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001827 # Write out many bytes from many threads and test they were
1828 # all flushed.
1829 N = 1000
1830 contents = bytes(range(256)) * N
1831 sizes = cycle([1, 19])
1832 n = 0
1833 queue = deque()
1834 while n < len(contents):
1835 size = next(sizes)
1836 queue.append(contents[n:n+size])
1837 n += size
1838 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001839 # We use a real file object because it allows us to
1840 # exercise situations where the GIL is released before
1841 # writing the buffer to the raw streams. This is in addition
1842 # to concurrency issues due to switching threads in the middle
1843 # of Python code.
Hai Shi883bc632020-07-06 17:12:49 +08001844 with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001846 errors = []
1847 def f():
1848 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001849 while True:
1850 try:
1851 s = queue.popleft()
1852 except IndexError:
1853 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001854 bufio.write(s)
1855 except Exception as e:
1856 errors.append(e)
1857 raise
1858 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001859 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001860 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001861 self.assertFalse(errors,
1862 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001863 bufio.close()
Hai Shi883bc632020-07-06 17:12:49 +08001864 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001865 s = f.read()
1866 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001867 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001868 finally:
Hai Shi883bc632020-07-06 17:12:49 +08001869 os_helper.unlink(os_helper.TESTFN)
Antoine Pitrou87695762008-08-14 22:44:29 +00001870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001871 def test_misbehaved_io(self):
1872 rawio = self.MisbehavedRawIO()
1873 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001874 self.assertRaises(OSError, bufio.seek, 0)
1875 self.assertRaises(OSError, bufio.tell)
1876 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877
Victor Stinnerb589cef2019-06-11 03:10:59 +02001878 # Silence destructor error
1879 bufio.close = lambda: None
1880
Florent Xicluna109d5732012-07-07 17:03:22 +02001881 def test_max_buffer_size_removal(self):
1882 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001883 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001884
Benjamin Peterson68623612012-12-20 11:53:11 -06001885 def test_write_error_on_close(self):
1886 raw = self.MockRawIO()
1887 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001888 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001889 raw.write = bad_write
1890 b = self.tp(raw)
1891 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001892 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001893 self.assertTrue(b.closed)
1894
benfogle9703f092017-11-10 16:03:40 -05001895 def test_slow_close_from_thread(self):
1896 # Issue #31976
1897 rawio = self.SlowFlushRawIO()
1898 bufio = self.tp(rawio, 8)
1899 t = threading.Thread(target=bufio.close)
1900 t.start()
1901 rawio.in_flush.wait()
1902 self.assertRaises(ValueError, bufio.write, b'spam')
1903 self.assertTrue(bufio.closed)
1904 t.join()
1905
1906
Benjamin Peterson59406a92009-03-26 17:10:29 +00001907
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001908class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001909 tp = io.BufferedWriter
1910
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001911 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1912 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001913 def test_constructor(self):
1914 BufferedWriterTest.test_constructor(self)
1915 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001916 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001917 if sys.maxsize > 0x7FFFFFFF:
1918 rawio = self.MockRawIO()
1919 bufio = self.tp(rawio)
1920 self.assertRaises((OverflowError, MemoryError, ValueError),
1921 bufio.__init__, rawio, sys.maxsize)
1922
1923 def test_initialization(self):
1924 rawio = self.MockRawIO()
1925 bufio = self.tp(rawio)
1926 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1927 self.assertRaises(ValueError, bufio.write, b"def")
1928 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1929 self.assertRaises(ValueError, bufio.write, b"def")
1930 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1931 self.assertRaises(ValueError, bufio.write, b"def")
1932
1933 def test_garbage_collection(self):
1934 # C BufferedWriter objects are collected, and collecting them flushes
1935 # all data to disk.
1936 # The Python version has __del__, so it ends into gc.garbage instead
Hai Shi883bc632020-07-06 17:12:49 +08001937 self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1938 with warnings_helper.check_warnings(('', ResourceWarning)):
1939 rawio = self.FileIO(os_helper.TESTFN, "w+b")
Antoine Pitrou796564c2013-07-30 19:59:21 +02001940 f = self.tp(rawio)
1941 f.write(b"123xxx")
1942 f.x = f
1943 wr = weakref.ref(f)
1944 del f
1945 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001946 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08001947 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001948 self.assertEqual(f.read(), b"123xxx")
1949
R David Murray67bfe802013-02-23 21:51:05 -05001950 def test_args_error(self):
1951 # Issue #17275
1952 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1953 self.tp(io.BytesIO(), 1024, 1024, 1024)
1954
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001955
1956class PyBufferedWriterTest(BufferedWriterTest):
1957 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001958
Guido van Rossum01a27522007-03-07 01:00:12 +00001959class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001960
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001961 def test_constructor(self):
1962 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001963 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001964
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001965 def test_uninitialized(self):
1966 pair = self.tp.__new__(self.tp)
1967 del pair
1968 pair = self.tp.__new__(self.tp)
1969 self.assertRaisesRegex((ValueError, AttributeError),
1970 'uninitialized|has no attribute',
1971 pair.read, 0)
1972 self.assertRaisesRegex((ValueError, AttributeError),
1973 'uninitialized|has no attribute',
1974 pair.write, b'')
1975 pair.__init__(self.MockRawIO(), self.MockRawIO())
1976 self.assertEqual(pair.read(0), b'')
1977 self.assertEqual(pair.write(b''), 0)
1978
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001979 def test_detach(self):
1980 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1981 self.assertRaises(self.UnsupportedOperation, pair.detach)
1982
Florent Xicluna109d5732012-07-07 17:03:22 +02001983 def test_constructor_max_buffer_size_removal(self):
1984 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001985 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001986
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001987 def test_constructor_with_not_readable(self):
1988 class NotReadable(MockRawIO):
1989 def readable(self):
1990 return False
1991
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001992 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001993
1994 def test_constructor_with_not_writeable(self):
1995 class NotWriteable(MockRawIO):
1996 def writable(self):
1997 return False
1998
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001999 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002000
2001 def test_read(self):
2002 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2003
2004 self.assertEqual(pair.read(3), b"abc")
2005 self.assertEqual(pair.read(1), b"d")
2006 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002007 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2008 self.assertEqual(pair.read(None), b"abc")
2009
2010 def test_readlines(self):
2011 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2012 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2013 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2014 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002015
2016 def test_read1(self):
2017 # .read1() is delegated to the underlying reader object, so this test
2018 # can be shallow.
2019 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2020
2021 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00002022 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002023
2024 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00002025 for method in ("readinto", "readinto1"):
2026 with self.subTest(method):
2027 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002028
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002029 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002030 self.assertEqual(getattr(pair, method)(data), 5)
2031 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002032
2033 def test_write(self):
2034 w = self.MockRawIO()
2035 pair = self.tp(self.MockRawIO(), w)
2036
2037 pair.write(b"abc")
2038 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002039 buffer = bytearray(b"def")
2040 pair.write(buffer)
2041 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002042 pair.flush()
2043 self.assertEqual(w._write_stack, [b"abc", b"def"])
2044
2045 def test_peek(self):
2046 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2047
2048 self.assertTrue(pair.peek(3).startswith(b"abc"))
2049 self.assertEqual(pair.read(3), b"abc")
2050
2051 def test_readable(self):
2052 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2053 self.assertTrue(pair.readable())
2054
2055 def test_writeable(self):
2056 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2057 self.assertTrue(pair.writable())
2058
2059 def test_seekable(self):
2060 # BufferedRWPairs are never seekable, even if their readers and writers
2061 # are.
2062 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2063 self.assertFalse(pair.seekable())
2064
2065 # .flush() is delegated to the underlying writer object and has been
2066 # tested in the test_write method.
2067
2068 def test_close_and_closed(self):
2069 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2070 self.assertFalse(pair.closed)
2071 pair.close()
2072 self.assertTrue(pair.closed)
2073
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002074 def test_reader_close_error_on_close(self):
2075 def reader_close():
2076 reader_non_existing
2077 reader = self.MockRawIO()
2078 reader.close = reader_close
2079 writer = self.MockRawIO()
2080 pair = self.tp(reader, writer)
2081 with self.assertRaises(NameError) as err:
2082 pair.close()
2083 self.assertIn('reader_non_existing', str(err.exception))
2084 self.assertTrue(pair.closed)
2085 self.assertFalse(reader.closed)
2086 self.assertTrue(writer.closed)
2087
Victor Stinner472f7942019-04-12 21:58:24 +02002088 # Silence destructor error
2089 reader.close = lambda: None
2090
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002091 def test_writer_close_error_on_close(self):
2092 def writer_close():
2093 writer_non_existing
2094 reader = self.MockRawIO()
2095 writer = self.MockRawIO()
2096 writer.close = writer_close
2097 pair = self.tp(reader, writer)
2098 with self.assertRaises(NameError) as err:
2099 pair.close()
2100 self.assertIn('writer_non_existing', str(err.exception))
2101 self.assertFalse(pair.closed)
2102 self.assertTrue(reader.closed)
2103 self.assertFalse(writer.closed)
2104
Victor Stinner472f7942019-04-12 21:58:24 +02002105 # Silence destructor error
2106 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002107 writer = None
2108
Victor Stinner212646c2019-06-14 18:03:22 +02002109 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002110 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002111 # Ignore BufferedRWPair unraisable exception
2112 with support.catch_unraisable_exception():
2113 pair = None
2114 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002115 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002116
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002117 def test_reader_writer_close_error_on_close(self):
2118 def reader_close():
2119 reader_non_existing
2120 def writer_close():
2121 writer_non_existing
2122 reader = self.MockRawIO()
2123 reader.close = reader_close
2124 writer = self.MockRawIO()
2125 writer.close = writer_close
2126 pair = self.tp(reader, writer)
2127 with self.assertRaises(NameError) as err:
2128 pair.close()
2129 self.assertIn('reader_non_existing', str(err.exception))
2130 self.assertIsInstance(err.exception.__context__, NameError)
2131 self.assertIn('writer_non_existing', str(err.exception.__context__))
2132 self.assertFalse(pair.closed)
2133 self.assertFalse(reader.closed)
2134 self.assertFalse(writer.closed)
2135
Victor Stinner472f7942019-04-12 21:58:24 +02002136 # Silence destructor error
2137 reader.close = lambda: None
2138 writer.close = lambda: None
2139
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002140 def test_isatty(self):
2141 class SelectableIsAtty(MockRawIO):
2142 def __init__(self, isatty):
2143 MockRawIO.__init__(self)
2144 self._isatty = isatty
2145
2146 def isatty(self):
2147 return self._isatty
2148
2149 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2150 self.assertFalse(pair.isatty())
2151
2152 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2153 self.assertTrue(pair.isatty())
2154
2155 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2156 self.assertTrue(pair.isatty())
2157
2158 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2159 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002160
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002161 def test_weakref_clearing(self):
2162 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2163 ref = weakref.ref(brw)
2164 brw = None
2165 ref = None # Shouldn't segfault.
2166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167class CBufferedRWPairTest(BufferedRWPairTest):
2168 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170class PyBufferedRWPairTest(BufferedRWPairTest):
2171 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002172
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002173
2174class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2175 read_mode = "rb+"
2176 write_mode = "wb+"
2177
2178 def test_constructor(self):
2179 BufferedReaderTest.test_constructor(self)
2180 BufferedWriterTest.test_constructor(self)
2181
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002182 def test_uninitialized(self):
2183 BufferedReaderTest.test_uninitialized(self)
2184 BufferedWriterTest.test_uninitialized(self)
2185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002186 def test_read_and_write(self):
2187 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002188 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002189
2190 self.assertEqual(b"as", rw.read(2))
2191 rw.write(b"ddd")
2192 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002193 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002196
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 def test_seek_and_tell(self):
2198 raw = self.BytesIO(b"asdfghjkl")
2199 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002200
Ezio Melottib3aedd42010-11-20 19:04:17 +00002201 self.assertEqual(b"as", rw.read(2))
2202 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002203 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002205
Antoine Pitroue05565e2011-08-20 14:39:23 +02002206 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002207 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002208 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002209 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002210 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002211 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002212 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002213 self.assertEqual(7, rw.tell())
2214 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002215 rw.flush()
2216 self.assertEqual(b"asdf123fl", raw.getvalue())
2217
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002218 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002219
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002220 def check_flush_and_read(self, read_func):
2221 raw = self.BytesIO(b"abcdefghi")
2222 bufio = self.tp(raw)
2223
Ezio Melottib3aedd42010-11-20 19:04:17 +00002224 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002226 self.assertEqual(b"ef", read_func(bufio, 2))
2227 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002229 self.assertEqual(6, bufio.tell())
2230 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002231 raw.seek(0, 0)
2232 raw.write(b"XYZ")
2233 # flush() resets the read buffer
2234 bufio.flush()
2235 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002237
2238 def test_flush_and_read(self):
2239 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2240
2241 def test_flush_and_readinto(self):
2242 def _readinto(bufio, n=-1):
2243 b = bytearray(n if n >= 0 else 9999)
2244 n = bufio.readinto(b)
2245 return bytes(b[:n])
2246 self.check_flush_and_read(_readinto)
2247
2248 def test_flush_and_peek(self):
2249 def _peek(bufio, n=-1):
2250 # This relies on the fact that the buffer can contain the whole
2251 # raw stream, otherwise peek() can return less.
2252 b = bufio.peek(n)
2253 if n != -1:
2254 b = b[:n]
2255 bufio.seek(len(b), 1)
2256 return b
2257 self.check_flush_and_read(_peek)
2258
2259 def test_flush_and_write(self):
2260 raw = self.BytesIO(b"abcdefghi")
2261 bufio = self.tp(raw)
2262
2263 bufio.write(b"123")
2264 bufio.flush()
2265 bufio.write(b"45")
2266 bufio.flush()
2267 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002268 self.assertEqual(b"12345fghi", raw.getvalue())
2269 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002270
2271 def test_threads(self):
2272 BufferedReaderTest.test_threads(self)
2273 BufferedWriterTest.test_threads(self)
2274
2275 def test_writes_and_peek(self):
2276 def _peek(bufio):
2277 bufio.peek(1)
2278 self.check_writes(_peek)
2279 def _peek(bufio):
2280 pos = bufio.tell()
2281 bufio.seek(-1, 1)
2282 bufio.peek(1)
2283 bufio.seek(pos, 0)
2284 self.check_writes(_peek)
2285
2286 def test_writes_and_reads(self):
2287 def _read(bufio):
2288 bufio.seek(-1, 1)
2289 bufio.read(1)
2290 self.check_writes(_read)
2291
2292 def test_writes_and_read1s(self):
2293 def _read1(bufio):
2294 bufio.seek(-1, 1)
2295 bufio.read1(1)
2296 self.check_writes(_read1)
2297
2298 def test_writes_and_readintos(self):
2299 def _read(bufio):
2300 bufio.seek(-1, 1)
2301 bufio.readinto(bytearray(1))
2302 self.check_writes(_read)
2303
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002304 def test_write_after_readahead(self):
2305 # Issue #6629: writing after the buffer was filled by readahead should
2306 # first rewind the raw stream.
2307 for overwrite_size in [1, 5]:
2308 raw = self.BytesIO(b"A" * 10)
2309 bufio = self.tp(raw, 4)
2310 # Trigger readahead
2311 self.assertEqual(bufio.read(1), b"A")
2312 self.assertEqual(bufio.tell(), 1)
2313 # Overwriting should rewind the raw stream if it needs so
2314 bufio.write(b"B" * overwrite_size)
2315 self.assertEqual(bufio.tell(), overwrite_size + 1)
2316 # If the write size was smaller than the buffer size, flush() and
2317 # check that rewind happens.
2318 bufio.flush()
2319 self.assertEqual(bufio.tell(), overwrite_size + 1)
2320 s = raw.getvalue()
2321 self.assertEqual(s,
2322 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2323
Antoine Pitrou7c404892011-05-13 00:13:33 +02002324 def test_write_rewind_write(self):
2325 # Various combinations of reading / writing / seeking backwards / writing again
2326 def mutate(bufio, pos1, pos2):
2327 assert pos2 >= pos1
2328 # Fill the buffer
2329 bufio.seek(pos1)
2330 bufio.read(pos2 - pos1)
2331 bufio.write(b'\x02')
2332 # This writes earlier than the previous write, but still inside
2333 # the buffer.
2334 bufio.seek(pos1)
2335 bufio.write(b'\x01')
2336
2337 b = b"\x80\x81\x82\x83\x84"
2338 for i in range(0, len(b)):
2339 for j in range(i, len(b)):
2340 raw = self.BytesIO(b)
2341 bufio = self.tp(raw, 100)
2342 mutate(bufio, i, j)
2343 bufio.flush()
2344 expected = bytearray(b)
2345 expected[j] = 2
2346 expected[i] = 1
2347 self.assertEqual(raw.getvalue(), expected,
2348 "failed result for i=%d, j=%d" % (i, j))
2349
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002350 def test_truncate_after_read_or_write(self):
2351 raw = self.BytesIO(b"A" * 10)
2352 bufio = self.tp(raw, 100)
2353 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2354 self.assertEqual(bufio.truncate(), 2)
2355 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2356 self.assertEqual(bufio.truncate(), 4)
2357
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002358 def test_misbehaved_io(self):
2359 BufferedReaderTest.test_misbehaved_io(self)
2360 BufferedWriterTest.test_misbehaved_io(self)
2361
Antoine Pitroue05565e2011-08-20 14:39:23 +02002362 def test_interleaved_read_write(self):
2363 # Test for issue #12213
2364 with self.BytesIO(b'abcdefgh') as raw:
2365 with self.tp(raw, 100) as f:
2366 f.write(b"1")
2367 self.assertEqual(f.read(1), b'b')
2368 f.write(b'2')
2369 self.assertEqual(f.read1(1), b'd')
2370 f.write(b'3')
2371 buf = bytearray(1)
2372 f.readinto(buf)
2373 self.assertEqual(buf, b'f')
2374 f.write(b'4')
2375 self.assertEqual(f.peek(1), b'h')
2376 f.flush()
2377 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2378
2379 with self.BytesIO(b'abc') as raw:
2380 with self.tp(raw, 100) as f:
2381 self.assertEqual(f.read(1), b'a')
2382 f.write(b"2")
2383 self.assertEqual(f.read(1), b'c')
2384 f.flush()
2385 self.assertEqual(raw.getvalue(), b'a2c')
2386
2387 def test_interleaved_readline_write(self):
2388 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2389 with self.tp(raw) as f:
2390 f.write(b'1')
2391 self.assertEqual(f.readline(), b'b\n')
2392 f.write(b'2')
2393 self.assertEqual(f.readline(), b'def\n')
2394 f.write(b'3')
2395 self.assertEqual(f.readline(), b'\n')
2396 f.flush()
2397 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2398
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002399 # You can't construct a BufferedRandom over a non-seekable stream.
2400 test_unseekable = None
2401
Berker Peksagfd5116c2020-02-21 20:57:26 +03002402 # writable() returns True, so there's no point to test it over
2403 # a writable stream.
2404 test_truncate_on_read_only = None
2405
R David Murray67bfe802013-02-23 21:51:05 -05002406
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002407class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002408 tp = io.BufferedRandom
2409
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002410 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2411 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002412 def test_constructor(self):
2413 BufferedRandomTest.test_constructor(self)
2414 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002415 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002416 if sys.maxsize > 0x7FFFFFFF:
2417 rawio = self.MockRawIO()
2418 bufio = self.tp(rawio)
2419 self.assertRaises((OverflowError, MemoryError, ValueError),
2420 bufio.__init__, rawio, sys.maxsize)
2421
2422 def test_garbage_collection(self):
2423 CBufferedReaderTest.test_garbage_collection(self)
2424 CBufferedWriterTest.test_garbage_collection(self)
2425
R David Murray67bfe802013-02-23 21:51:05 -05002426 def test_args_error(self):
2427 # Issue #17275
2428 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2429 self.tp(io.BytesIO(), 1024, 1024, 1024)
2430
2431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002432class PyBufferedRandomTest(BufferedRandomTest):
2433 tp = pyio.BufferedRandom
2434
2435
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002436# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2437# properties:
2438# - A single output character can correspond to many bytes of input.
2439# - The number of input bytes to complete the character can be
2440# undetermined until the last input byte is received.
2441# - The number of input bytes can vary depending on previous input.
2442# - A single input byte can correspond to many characters of output.
2443# - The number of output characters can be undetermined until the
2444# last input byte is received.
2445# - The number of output characters can vary depending on previous input.
2446
2447class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2448 """
2449 For testing seek/tell behavior with a stateful, buffering decoder.
2450
2451 Input is a sequence of words. Words may be fixed-length (length set
2452 by input) or variable-length (period-terminated). In variable-length
2453 mode, extra periods are ignored. Possible words are:
2454 - 'i' followed by a number sets the input length, I (maximum 99).
2455 When I is set to 0, words are space-terminated.
2456 - 'o' followed by a number sets the output length, O (maximum 99).
2457 - Any other word is converted into a word followed by a period on
2458 the output. The output word consists of the input word truncated
2459 or padded out with hyphens to make its length equal to O. If O
2460 is 0, the word is output verbatim without truncating or padding.
2461 I and O are initially set to 1. When I changes, any buffered input is
2462 re-scanned according to the new I. EOF also terminates the last word.
2463 """
2464
2465 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002466 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002467 self.reset()
2468
2469 def __repr__(self):
2470 return '<SID %x>' % id(self)
2471
2472 def reset(self):
2473 self.i = 1
2474 self.o = 1
2475 self.buffer = bytearray()
2476
2477 def getstate(self):
2478 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2479 return bytes(self.buffer), i*100 + o
2480
2481 def setstate(self, state):
2482 buffer, io = state
2483 self.buffer = bytearray(buffer)
2484 i, o = divmod(io, 100)
2485 self.i, self.o = i ^ 1, o ^ 1
2486
2487 def decode(self, input, final=False):
2488 output = ''
2489 for b in input:
2490 if self.i == 0: # variable-length, terminated with period
2491 if b == ord('.'):
2492 if self.buffer:
2493 output += self.process_word()
2494 else:
2495 self.buffer.append(b)
2496 else: # fixed-length, terminate after self.i bytes
2497 self.buffer.append(b)
2498 if len(self.buffer) == self.i:
2499 output += self.process_word()
2500 if final and self.buffer: # EOF terminates the last word
2501 output += self.process_word()
2502 return output
2503
2504 def process_word(self):
2505 output = ''
2506 if self.buffer[0] == ord('i'):
2507 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2508 elif self.buffer[0] == ord('o'):
2509 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2510 else:
2511 output = self.buffer.decode('ascii')
2512 if len(output) < self.o:
2513 output += '-'*self.o # pad out with hyphens
2514 if self.o:
2515 output = output[:self.o] # truncate to output length
2516 output += '.'
2517 self.buffer = bytearray()
2518 return output
2519
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002520 codecEnabled = False
2521
2522 @classmethod
2523 def lookupTestDecoder(cls, name):
2524 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002525 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002526 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002527 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002528 incrementalencoder=None,
2529 streamreader=None, streamwriter=None,
2530 incrementaldecoder=cls)
2531
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002532
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002533class StatefulIncrementalDecoderTest(unittest.TestCase):
2534 """
2535 Make sure the StatefulIncrementalDecoder actually works.
2536 """
2537
2538 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002539 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002540 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002541 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002542 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002543 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002544 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002545 # I=0, O=6 (variable-length input, fixed-length output)
2546 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2547 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002548 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002549 # I=6, O=3 (fixed-length input > fixed-length output)
2550 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2551 # I=0, then 3; O=29, then 15 (with longer output)
2552 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2553 'a----------------------------.' +
2554 'b----------------------------.' +
2555 'cde--------------------------.' +
2556 'abcdefghijabcde.' +
2557 'a.b------------.' +
2558 '.c.------------.' +
2559 'd.e------------.' +
2560 'k--------------.' +
2561 'l--------------.' +
2562 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002563 ]
2564
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002565 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002566 # Try a few one-shot test cases.
2567 for input, eof, output in self.test_cases:
2568 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002569 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002570
2571 # Also test an unfinished decode, followed by forcing EOF.
2572 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002573 self.assertEqual(d.decode(b'oiabcd'), '')
2574 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002575
2576class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002577
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002578 def setUp(self):
2579 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2580 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Hai Shi883bc632020-07-06 17:12:49 +08002581 os_helper.unlink(os_helper.TESTFN)
Hai Shic9f696c2020-10-16 16:34:15 +08002582 codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2583 self.addCleanup(codecs.unregister,
2584 StatefulIncrementalDecoder.lookupTestDecoder)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002585
Guido van Rossumd0712812007-04-11 16:32:43 +00002586 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08002587 os_helper.unlink(os_helper.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002588
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002589 def test_constructor(self):
2590 r = self.BytesIO(b"\xc3\xa9\n\n")
2591 b = self.BufferedReader(r, 1000)
2592 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002593 t.__init__(b, encoding="latin-1", newline="\r\n")
2594 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002595 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002596 t.__init__(b, encoding="utf-8", line_buffering=True)
2597 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002598 self.assertEqual(t.line_buffering, True)
2599 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002600 self.assertRaises(TypeError, t.__init__, b, newline=42)
2601 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2602
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002603 def test_uninitialized(self):
2604 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2605 del t
2606 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2607 self.assertRaises(Exception, repr, t)
2608 self.assertRaisesRegex((ValueError, AttributeError),
2609 'uninitialized|has no attribute',
2610 t.read, 0)
2611 t.__init__(self.MockRawIO())
2612 self.assertEqual(t.read(0), '')
2613
Nick Coghlana9b15242014-02-04 22:11:18 +10002614 def test_non_text_encoding_codecs_are_rejected(self):
2615 # Ensure the constructor complains if passed a codec that isn't
2616 # marked as a text encoding
2617 # http://bugs.python.org/issue20404
2618 r = self.BytesIO()
2619 b = self.BufferedWriter(r)
2620 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2621 self.TextIOWrapper(b, encoding="hex")
2622
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002623 def test_detach(self):
2624 r = self.BytesIO()
2625 b = self.BufferedWriter(r)
2626 t = self.TextIOWrapper(b)
2627 self.assertIs(t.detach(), b)
2628
2629 t = self.TextIOWrapper(b, encoding="ascii")
2630 t.write("howdy")
2631 self.assertFalse(r.getvalue())
2632 t.detach()
2633 self.assertEqual(r.getvalue(), b"howdy")
2634 self.assertRaises(ValueError, t.detach)
2635
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002636 # Operations independent of the detached stream should still work
2637 repr(t)
2638 self.assertEqual(t.encoding, "ascii")
2639 self.assertEqual(t.errors, "strict")
2640 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002641 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002642
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002643 def test_repr(self):
2644 raw = self.BytesIO("hello".encode("utf-8"))
2645 b = self.BufferedReader(raw)
2646 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002647 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002648 self.assertRegex(repr(t),
2649 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002650 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002651 self.assertRegex(repr(t),
2652 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002653 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002654 self.assertRegex(repr(t),
2655 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002656 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002657 self.assertRegex(repr(t),
2658 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002659
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002660 t.buffer.detach()
2661 repr(t) # Should not raise an exception
2662
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002663 def test_recursive_repr(self):
2664 # Issue #25455
2665 raw = self.BytesIO()
2666 t = self.TextIOWrapper(raw)
2667 with support.swap_attr(raw, 'name', t):
2668 try:
2669 repr(t) # Should not crash
2670 except RuntimeError:
2671 pass
2672
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002673 def test_line_buffering(self):
2674 r = self.BytesIO()
2675 b = self.BufferedWriter(r, 1000)
2676 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002677 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002678 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002679 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002680 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002681 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002682 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002683
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002684 def test_reconfigure_line_buffering(self):
2685 r = self.BytesIO()
2686 b = self.BufferedWriter(r, 1000)
2687 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2688 t.write("AB\nC")
2689 self.assertEqual(r.getvalue(), b"")
2690
2691 t.reconfigure(line_buffering=True) # implicit flush
2692 self.assertEqual(r.getvalue(), b"AB\nC")
2693 t.write("DEF\nG")
2694 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2695 t.write("H")
2696 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2697 t.reconfigure(line_buffering=False) # implicit flush
2698 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2699 t.write("IJ")
2700 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2701
2702 # Keeping default value
2703 t.reconfigure()
2704 t.reconfigure(line_buffering=None)
2705 self.assertEqual(t.line_buffering, False)
2706 t.reconfigure(line_buffering=True)
2707 t.reconfigure()
2708 t.reconfigure(line_buffering=None)
2709 self.assertEqual(t.line_buffering, True)
2710
Victor Stinner91106cd2017-12-13 12:29:09 +01002711 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002712 def test_default_encoding(self):
2713 old_environ = dict(os.environ)
2714 try:
2715 # try to get a user preferred encoding different than the current
2716 # locale encoding to check that TextIOWrapper() uses the current
2717 # locale encoding and not the user preferred encoding
2718 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2719 if key in os.environ:
2720 del os.environ[key]
2721
2722 current_locale_encoding = locale.getpreferredencoding(False)
2723 b = self.BytesIO()
2724 t = self.TextIOWrapper(b)
2725 self.assertEqual(t.encoding, current_locale_encoding)
2726 finally:
2727 os.environ.clear()
2728 os.environ.update(old_environ)
2729
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002730 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002731 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002732 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002733 # Issue 15989
2734 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002735 b = self.BytesIO()
2736 b.fileno = lambda: _testcapi.INT_MAX + 1
2737 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2738 b.fileno = lambda: _testcapi.UINT_MAX + 1
2739 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002741 def test_encoding(self):
2742 # Check the encoding attribute is always set, and valid
2743 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002744 t = self.TextIOWrapper(b, encoding="utf-8")
2745 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002747 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002748 codecs.lookup(t.encoding)
2749
2750 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002751 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002752 b = self.BytesIO(b"abc\n\xff\n")
2753 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002754 self.assertRaises(UnicodeError, t.read)
2755 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002756 b = self.BytesIO(b"abc\n\xff\n")
2757 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002758 self.assertRaises(UnicodeError, t.read)
2759 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 b = self.BytesIO(b"abc\n\xff\n")
2761 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002762 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002763 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002764 b = self.BytesIO(b"abc\n\xff\n")
2765 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002766 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002767
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002768 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002769 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002770 b = self.BytesIO()
2771 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002772 self.assertRaises(UnicodeError, t.write, "\xff")
2773 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002774 b = self.BytesIO()
2775 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002776 self.assertRaises(UnicodeError, t.write, "\xff")
2777 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002778 b = self.BytesIO()
2779 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002780 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002781 t.write("abc\xffdef\n")
2782 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002783 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002784 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002785 b = self.BytesIO()
2786 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002787 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002788 t.write("abc\xffdef\n")
2789 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002790 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002791
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002792 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002793 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2794
2795 tests = [
2796 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002797 [ '', input_lines ],
2798 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2799 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2800 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002801 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002802 encodings = (
2803 'utf-8', 'latin-1',
2804 'utf-16', 'utf-16-le', 'utf-16-be',
2805 'utf-32', 'utf-32-le', 'utf-32-be',
2806 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002807
Guido van Rossum8358db22007-08-18 21:39:55 +00002808 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002809 # character in TextIOWrapper._pending_line.
2810 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002811 # XXX: str.encode() should return bytes
2812 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002813 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002814 for bufsize in range(1, 10):
2815 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002816 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2817 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002818 encoding=encoding)
2819 if do_reads:
2820 got_lines = []
2821 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002822 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002823 if c2 == '':
2824 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002825 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002826 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002827 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002828 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002829
2830 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002831 self.assertEqual(got_line, exp_line)
2832 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002833
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002834 def test_newlines_input(self):
2835 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002836 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2837 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002838 (None, normalized.decode("ascii").splitlines(keepends=True)),
2839 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002840 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2841 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2842 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002843 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002844 buf = self.BytesIO(testdata)
2845 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002846 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002847 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002848 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002849
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002850 def test_newlines_output(self):
2851 testdict = {
2852 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2853 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2854 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2855 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2856 }
2857 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2858 for newline, expected in tests:
2859 buf = self.BytesIO()
2860 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2861 txt.write("AAA\nB")
2862 txt.write("BB\nCCC\n")
2863 txt.write("X\rY\r\nZ")
2864 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002865 self.assertEqual(buf.closed, False)
2866 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002867
2868 def test_destructor(self):
2869 l = []
2870 base = self.BytesIO
2871 class MyBytesIO(base):
2872 def close(self):
2873 l.append(self.getvalue())
2874 base.close(self)
2875 b = MyBytesIO()
2876 t = self.TextIOWrapper(b, encoding="ascii")
2877 t.write("abc")
2878 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002879 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002880 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002881
2882 def test_override_destructor(self):
2883 record = []
2884 class MyTextIO(self.TextIOWrapper):
2885 def __del__(self):
2886 record.append(1)
2887 try:
2888 f = super().__del__
2889 except AttributeError:
2890 pass
2891 else:
2892 f()
2893 def close(self):
2894 record.append(2)
2895 super().close()
2896 def flush(self):
2897 record.append(3)
2898 super().flush()
2899 b = self.BytesIO()
2900 t = MyTextIO(b, encoding="ascii")
2901 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002902 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002903 self.assertEqual(record, [1, 2, 3])
2904
2905 def test_error_through_destructor(self):
2906 # Test that the exception state is not modified by a destructor,
2907 # even if close() fails.
2908 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002909 with support.catch_unraisable_exception() as cm:
2910 with self.assertRaises(AttributeError):
2911 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002912
2913 if not IOBASE_EMITS_UNRAISABLE:
2914 self.assertIsNone(cm.unraisable)
2915 elif cm.unraisable is not None:
2916 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002917
Guido van Rossum9b76da62007-04-11 01:09:03 +00002918 # Systematic tests of the text I/O API
2919
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002920 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002921 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002922 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Hai Shi883bc632020-07-06 17:12:49 +08002923 f = self.open(os_helper.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002924 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002925 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002926 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08002927 f = self.open(os_helper.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002928 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002929 self.assertEqual(f.tell(), 0)
2930 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002931 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002932 self.assertEqual(f.seek(0), 0)
2933 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002934 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002935 self.assertEqual(f.read(2), "ab")
2936 self.assertEqual(f.read(1), "c")
2937 self.assertEqual(f.read(1), "")
2938 self.assertEqual(f.read(), "")
2939 self.assertEqual(f.tell(), cookie)
2940 self.assertEqual(f.seek(0), 0)
2941 self.assertEqual(f.seek(0, 2), cookie)
2942 self.assertEqual(f.write("def"), 3)
2943 self.assertEqual(f.seek(cookie), cookie)
2944 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002945 if enc.startswith("utf"):
2946 self.multi_line_test(f, enc)
2947 f.close()
2948
2949 def multi_line_test(self, f, enc):
2950 f.seek(0)
2951 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002952 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002953 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002954 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002955 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002956 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002957 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002958 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002959 wlines.append((f.tell(), line))
2960 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002961 f.seek(0)
2962 rlines = []
2963 while True:
2964 pos = f.tell()
2965 line = f.readline()
2966 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002967 break
2968 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002969 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002970
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002971 def test_telling(self):
Hai Shi883bc632020-07-06 17:12:49 +08002972 f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002973 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002974 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002975 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002976 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002977 p2 = f.tell()
2978 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002979 self.assertEqual(f.tell(), p0)
2980 self.assertEqual(f.readline(), "\xff\n")
2981 self.assertEqual(f.tell(), p1)
2982 self.assertEqual(f.readline(), "\xff\n")
2983 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002984 f.seek(0)
2985 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002986 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002987 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002988 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002989 f.close()
2990
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002991 def test_seeking(self):
2992 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002993 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002994 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002995 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002996 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002997 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002998 suffix = bytes(u_suffix.encode("utf-8"))
2999 line = prefix + suffix
Hai Shi883bc632020-07-06 17:12:49 +08003000 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003001 f.write(line*2)
Hai Shi883bc632020-07-06 17:12:49 +08003002 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003003 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003004 self.assertEqual(s, str(prefix, "ascii"))
3005 self.assertEqual(f.tell(), prefix_size)
3006 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00003007
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003008 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00003009 # Regression test for a specific bug
3010 data = b'\xe0\xbf\xbf\n'
Hai Shi883bc632020-07-06 17:12:49 +08003011 with self.open(os_helper.TESTFN, "wb") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003012 f.write(data)
Hai Shi883bc632020-07-06 17:12:49 +08003013 with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003014 f._CHUNK_SIZE # Just test that it exists
3015 f._CHUNK_SIZE = 2
3016 f.readline()
3017 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003018
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003019 def test_seek_and_tell(self):
3020 #Test seek/tell using the StatefulIncrementalDecoder.
3021 # Make test faster by doing smaller seeks
3022 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003023
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003024 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003025 """Tell/seek to various points within a data stream and ensure
3026 that the decoded data returned by read() is consistent."""
Hai Shi883bc632020-07-06 17:12:49 +08003027 f = self.open(os_helper.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003028 f.write(data)
3029 f.close()
Hai Shi883bc632020-07-06 17:12:49 +08003030 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003031 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003032 decoded = f.read()
3033 f.close()
3034
Neal Norwitze2b07052008-03-18 19:52:05 +00003035 for i in range(min_pos, len(decoded) + 1): # seek positions
3036 for j in [1, 5, len(decoded) - i]: # read lengths
Hai Shi883bc632020-07-06 17:12:49 +08003037 f = self.open(os_helper.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003038 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003039 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003040 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003041 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003042 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003043 f.close()
3044
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003045 # Enable the test decoder.
3046 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003047
3048 # Run the tests.
3049 try:
3050 # Try each test case.
3051 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003052 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003053
3054 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003055 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3056 offset = CHUNK_SIZE - len(input)//2
3057 prefix = b'.'*offset
3058 # Don't bother seeking into the prefix (takes too long).
3059 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003060 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003061
3062 # Ensure our test decoder won't interfere with subsequent tests.
3063 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003064 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003065
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003066 def test_multibyte_seek_and_tell(self):
Hai Shi883bc632020-07-06 17:12:49 +08003067 f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003068 f.write("AB\n\u3046\u3048\n")
3069 f.close()
3070
Hai Shi883bc632020-07-06 17:12:49 +08003071 f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003072 self.assertEqual(f.readline(), "AB\n")
3073 p0 = f.tell()
3074 self.assertEqual(f.readline(), "\u3046\u3048\n")
3075 p1 = f.tell()
3076 f.seek(p0)
3077 self.assertEqual(f.readline(), "\u3046\u3048\n")
3078 self.assertEqual(f.tell(), p1)
3079 f.close()
3080
3081 def test_seek_with_encoder_state(self):
Hai Shi883bc632020-07-06 17:12:49 +08003082 f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003083 f.write("\u00e6\u0300")
3084 p0 = f.tell()
3085 f.write("\u00e6")
3086 f.seek(p0)
3087 f.write("\u0300")
3088 f.close()
3089
Hai Shi883bc632020-07-06 17:12:49 +08003090 f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003091 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3092 f.close()
3093
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003094 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003095 data = "1234567890"
3096 tests = ("utf-16",
3097 "utf-16-le",
3098 "utf-16-be",
3099 "utf-32",
3100 "utf-32-le",
3101 "utf-32-be")
3102 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003103 buf = self.BytesIO()
3104 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003105 # Check if the BOM is written only once (see issue1753).
3106 f.write(data)
3107 f.write(data)
3108 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003109 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003110 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003111 self.assertEqual(f.read(), data * 2)
3112 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003113
Benjamin Petersona1b49012009-03-31 23:11:32 +00003114 def test_unreadable(self):
3115 class UnReadable(self.BytesIO):
3116 def readable(self):
3117 return False
3118 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003119 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003120
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003121 def test_read_one_by_one(self):
3122 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003123 reads = ""
3124 while True:
3125 c = txt.read(1)
3126 if not c:
3127 break
3128 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003129 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003130
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003131 def test_readlines(self):
3132 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3133 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3134 txt.seek(0)
3135 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3136 txt.seek(0)
3137 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3138
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003139 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003140 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003141 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003142 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003143 reads = ""
3144 while True:
3145 c = txt.read(128)
3146 if not c:
3147 break
3148 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003149 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003150
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003151 def test_writelines(self):
3152 l = ['ab', 'cd', 'ef']
3153 buf = self.BytesIO()
3154 txt = self.TextIOWrapper(buf)
3155 txt.writelines(l)
3156 txt.flush()
3157 self.assertEqual(buf.getvalue(), b'abcdef')
3158
3159 def test_writelines_userlist(self):
3160 l = UserList(['ab', 'cd', 'ef'])
3161 buf = self.BytesIO()
3162 txt = self.TextIOWrapper(buf)
3163 txt.writelines(l)
3164 txt.flush()
3165 self.assertEqual(buf.getvalue(), b'abcdef')
3166
3167 def test_writelines_error(self):
3168 txt = self.TextIOWrapper(self.BytesIO())
3169 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3170 self.assertRaises(TypeError, txt.writelines, None)
3171 self.assertRaises(TypeError, txt.writelines, b'abc')
3172
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003173 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003174 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003175
3176 # read one char at a time
3177 reads = ""
3178 while True:
3179 c = txt.read(1)
3180 if not c:
3181 break
3182 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003183 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003184
3185 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003186 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003187 txt._CHUNK_SIZE = 4
3188
3189 reads = ""
3190 while True:
3191 c = txt.read(4)
3192 if not c:
3193 break
3194 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003195 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003196
3197 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003198 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003199 txt._CHUNK_SIZE = 4
3200
3201 reads = txt.read(4)
3202 reads += txt.read(4)
3203 reads += txt.readline()
3204 reads += txt.readline()
3205 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003206 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003207
3208 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003209 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003210 txt._CHUNK_SIZE = 4
3211
3212 reads = txt.read(4)
3213 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003214 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003215
3216 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003217 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003218 txt._CHUNK_SIZE = 4
3219
3220 reads = txt.read(4)
3221 pos = txt.tell()
3222 txt.seek(0)
3223 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003224 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003225
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003226 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003227 buffer = self.BytesIO(self.testdata)
3228 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003229
3230 self.assertEqual(buffer.seekable(), txt.seekable())
3231
Antoine Pitroue4501852009-05-14 18:55:55 +00003232 def test_append_bom(self):
3233 # The BOM is not written again when appending to a non-empty file
Hai Shi883bc632020-07-06 17:12:49 +08003234 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003235 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3236 with self.open(filename, 'w', encoding=charset) as f:
3237 f.write('aaa')
3238 pos = f.tell()
3239 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003240 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003241
3242 with self.open(filename, 'a', encoding=charset) as f:
3243 f.write('xxx')
3244 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003245 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003246
3247 def test_seek_bom(self):
3248 # Same test, but when seeking manually
Hai Shi883bc632020-07-06 17:12:49 +08003249 filename = os_helper.TESTFN
Antoine Pitroue4501852009-05-14 18:55:55 +00003250 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3251 with self.open(filename, 'w', encoding=charset) as f:
3252 f.write('aaa')
3253 pos = f.tell()
3254 with self.open(filename, 'r+', encoding=charset) as f:
3255 f.seek(pos)
3256 f.write('zzz')
3257 f.seek(0)
3258 f.write('bbb')
3259 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003260 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003261
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003262 def test_seek_append_bom(self):
3263 # Same test, but first seek to the start and then to the end
Hai Shi883bc632020-07-06 17:12:49 +08003264 filename = os_helper.TESTFN
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003265 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3266 with self.open(filename, 'w', encoding=charset) as f:
3267 f.write('aaa')
3268 with self.open(filename, 'a', encoding=charset) as f:
3269 f.seek(0)
3270 f.seek(0, self.SEEK_END)
3271 f.write('xxx')
3272 with self.open(filename, 'rb') as f:
3273 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3274
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003275 def test_errors_property(self):
Hai Shi883bc632020-07-06 17:12:49 +08003276 with self.open(os_helper.TESTFN, "w") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003277 self.assertEqual(f.errors, "strict")
Hai Shi883bc632020-07-06 17:12:49 +08003278 with self.open(os_helper.TESTFN, "w", errors="replace") as f:
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003279 self.assertEqual(f.errors, "replace")
3280
Brett Cannon31f59292011-02-21 19:29:56 +00003281 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003282 def test_threads_write(self):
3283 # Issue6750: concurrent writes could duplicate data
3284 event = threading.Event()
Hai Shi883bc632020-07-06 17:12:49 +08003285 with self.open(os_helper.TESTFN, "w", buffering=1) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003286 def run(n):
3287 text = "Thread%03d\n" % n
3288 event.wait()
3289 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003290 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003291 for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08003292 with threading_helper.start_threads(threads, event.set):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003293 time.sleep(0.02)
Hai Shi883bc632020-07-06 17:12:49 +08003294 with self.open(os_helper.TESTFN) as f:
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003295 content = f.read()
3296 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003297 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003298
Antoine Pitrou6be88762010-05-03 16:48:20 +00003299 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003300 # Test that text file is closed despite failed flush
3301 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003302 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003303 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003304 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003305 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003306 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003307 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003308 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003309 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003310 self.assertTrue(txt.buffer.closed)
3311 self.assertTrue(closed) # flush() called
3312 self.assertFalse(closed[0]) # flush() called before file closed
3313 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003314 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003315
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003316 def test_close_error_on_close(self):
3317 buffer = self.BytesIO(self.testdata)
3318 def bad_flush():
3319 raise OSError('flush')
3320 def bad_close():
3321 raise OSError('close')
3322 buffer.close = bad_close
3323 txt = self.TextIOWrapper(buffer, encoding="ascii")
3324 txt.flush = bad_flush
3325 with self.assertRaises(OSError) as err: # exception not swallowed
3326 txt.close()
3327 self.assertEqual(err.exception.args, ('close',))
3328 self.assertIsInstance(err.exception.__context__, OSError)
3329 self.assertEqual(err.exception.__context__.args, ('flush',))
3330 self.assertFalse(txt.closed)
3331
Victor Stinner472f7942019-04-12 21:58:24 +02003332 # Silence destructor error
3333 buffer.close = lambda: None
3334 txt.flush = lambda: None
3335
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003336 def test_nonnormalized_close_error_on_close(self):
3337 # Issue #21677
3338 buffer = self.BytesIO(self.testdata)
3339 def bad_flush():
3340 raise non_existing_flush
3341 def bad_close():
3342 raise non_existing_close
3343 buffer.close = bad_close
3344 txt = self.TextIOWrapper(buffer, encoding="ascii")
3345 txt.flush = bad_flush
3346 with self.assertRaises(NameError) as err: # exception not swallowed
3347 txt.close()
3348 self.assertIn('non_existing_close', str(err.exception))
3349 self.assertIsInstance(err.exception.__context__, NameError)
3350 self.assertIn('non_existing_flush', str(err.exception.__context__))
3351 self.assertFalse(txt.closed)
3352
Victor Stinner472f7942019-04-12 21:58:24 +02003353 # Silence destructor error
3354 buffer.close = lambda: None
3355 txt.flush = lambda: None
3356
Antoine Pitrou6be88762010-05-03 16:48:20 +00003357 def test_multi_close(self):
3358 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3359 txt.close()
3360 txt.close()
3361 txt.close()
3362 self.assertRaises(ValueError, txt.flush)
3363
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003364 def test_unseekable(self):
3365 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3366 self.assertRaises(self.UnsupportedOperation, txt.tell)
3367 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3368
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003369 def test_readonly_attributes(self):
3370 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3371 buf = self.BytesIO(self.testdata)
3372 with self.assertRaises(AttributeError):
3373 txt.buffer = buf
3374
Antoine Pitroue96ec682011-07-23 21:46:35 +02003375 def test_rawio(self):
3376 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3377 # that subprocess.Popen() can have the required unbuffered
3378 # semantics with universal_newlines=True.
3379 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3380 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3381 # Reads
3382 self.assertEqual(txt.read(4), 'abcd')
3383 self.assertEqual(txt.readline(), 'efghi\n')
3384 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3385
3386 def test_rawio_write_through(self):
3387 # Issue #12591: with write_through=True, writes don't need a flush
3388 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3389 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3390 write_through=True)
3391 txt.write('1')
3392 txt.write('23\n4')
3393 txt.write('5')
3394 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3395
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003396 def test_bufio_write_through(self):
3397 # Issue #21396: write_through=True doesn't force a flush()
3398 # on the underlying binary buffered object.
3399 flush_called, write_called = [], []
3400 class BufferedWriter(self.BufferedWriter):
3401 def flush(self, *args, **kwargs):
3402 flush_called.append(True)
3403 return super().flush(*args, **kwargs)
3404 def write(self, *args, **kwargs):
3405 write_called.append(True)
3406 return super().write(*args, **kwargs)
3407
3408 rawio = self.BytesIO()
3409 data = b"a"
3410 bufio = BufferedWriter(rawio, len(data)*2)
3411 textio = self.TextIOWrapper(bufio, encoding='ascii',
3412 write_through=True)
3413 # write to the buffered io but don't overflow the buffer
3414 text = data.decode('ascii')
3415 textio.write(text)
3416
3417 # buffer.flush is not called with write_through=True
3418 self.assertFalse(flush_called)
3419 # buffer.write *is* called with write_through=True
3420 self.assertTrue(write_called)
3421 self.assertEqual(rawio.getvalue(), b"") # no flush
3422
3423 write_called = [] # reset
3424 textio.write(text * 10) # total content is larger than bufio buffer
3425 self.assertTrue(write_called)
3426 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3427
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003428 def test_reconfigure_write_through(self):
3429 raw = self.MockRawIO([])
3430 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3431 t.write('1')
3432 t.reconfigure(write_through=True) # implied flush
3433 self.assertEqual(t.write_through, True)
3434 self.assertEqual(b''.join(raw._write_stack), b'1')
3435 t.write('23')
3436 self.assertEqual(b''.join(raw._write_stack), b'123')
3437 t.reconfigure(write_through=False)
3438 self.assertEqual(t.write_through, False)
3439 t.write('45')
3440 t.flush()
3441 self.assertEqual(b''.join(raw._write_stack), b'12345')
3442 # Keeping default value
3443 t.reconfigure()
3444 t.reconfigure(write_through=None)
3445 self.assertEqual(t.write_through, False)
3446 t.reconfigure(write_through=True)
3447 t.reconfigure()
3448 t.reconfigure(write_through=None)
3449 self.assertEqual(t.write_through, True)
3450
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003451 def test_read_nonbytes(self):
3452 # Issue #17106
3453 # Crash when underlying read() returns non-bytes
3454 t = self.TextIOWrapper(self.StringIO('a'))
3455 self.assertRaises(TypeError, t.read, 1)
3456 t = self.TextIOWrapper(self.StringIO('a'))
3457 self.assertRaises(TypeError, t.readline)
3458 t = self.TextIOWrapper(self.StringIO('a'))
3459 self.assertRaises(TypeError, t.read)
3460
Oren Milmana5b4ea12017-08-25 21:14:54 +03003461 def test_illegal_encoder(self):
3462 # Issue 31271: Calling write() while the return value of encoder's
3463 # encode() is invalid shouldn't cause an assertion failure.
3464 rot13 = codecs.lookup("rot13")
3465 with support.swap_attr(rot13, '_is_text_encoding', True):
3466 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3467 self.assertRaises(TypeError, t.write, 'bar')
3468
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003469 def test_illegal_decoder(self):
3470 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003471 # Bypass the early encoding check added in issue 20404
3472 def _make_illegal_wrapper():
3473 quopri = codecs.lookup("quopri")
3474 quopri._is_text_encoding = True
3475 try:
3476 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3477 newline='\n', encoding="quopri")
3478 finally:
3479 quopri._is_text_encoding = False
3480 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003481 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003482 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003483 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003484 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003485 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003486 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003487 self.assertRaises(TypeError, t.read)
3488
Oren Milmanba7d7362017-08-29 11:58:27 +03003489 # Issue 31243: calling read() while the return value of decoder's
3490 # getstate() is invalid should neither crash the interpreter nor
3491 # raise a SystemError.
3492 def _make_very_illegal_wrapper(getstate_ret_val):
3493 class BadDecoder:
3494 def getstate(self):
3495 return getstate_ret_val
3496 def _get_bad_decoder(dummy):
3497 return BadDecoder()
3498 quopri = codecs.lookup("quopri")
3499 with support.swap_attr(quopri, 'incrementaldecoder',
3500 _get_bad_decoder):
3501 return _make_illegal_wrapper()
3502 t = _make_very_illegal_wrapper(42)
3503 self.assertRaises(TypeError, t.read, 42)
3504 t = _make_very_illegal_wrapper(())
3505 self.assertRaises(TypeError, t.read, 42)
3506 t = _make_very_illegal_wrapper((1, 2))
3507 self.assertRaises(TypeError, t.read, 42)
3508
Antoine Pitrou712cb732013-12-21 15:51:54 +01003509 def _check_create_at_shutdown(self, **kwargs):
3510 # Issue #20037: creating a TextIOWrapper at shutdown
3511 # shouldn't crash the interpreter.
3512 iomod = self.io.__name__
3513 code = """if 1:
3514 import codecs
3515 import {iomod} as io
3516
3517 # Avoid looking up codecs at shutdown
3518 codecs.lookup('utf-8')
3519
3520 class C:
3521 def __init__(self):
3522 self.buf = io.BytesIO()
3523 def __del__(self):
3524 io.TextIOWrapper(self.buf, **{kwargs})
3525 print("ok")
3526 c = C()
3527 """.format(iomod=iomod, kwargs=kwargs)
3528 return assert_python_ok("-c", code)
3529
3530 def test_create_at_shutdown_without_encoding(self):
3531 rc, out, err = self._check_create_at_shutdown()
3532 if err:
3533 # Can error out with a RuntimeError if the module state
3534 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003535 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003536 else:
3537 self.assertEqual("ok", out.decode().strip())
3538
3539 def test_create_at_shutdown_with_encoding(self):
3540 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3541 errors='strict')
3542 self.assertFalse(err)
3543 self.assertEqual("ok", out.decode().strip())
3544
Antoine Pitroub8503892014-04-29 10:14:02 +02003545 def test_read_byteslike(self):
3546 r = MemviewBytesIO(b'Just some random string\n')
3547 t = self.TextIOWrapper(r, 'utf-8')
3548
3549 # TextIOwrapper will not read the full string, because
3550 # we truncate it to a multiple of the native int size
3551 # so that we can construct a more complex memoryview.
3552 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3553
3554 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3555
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003556 def test_issue22849(self):
3557 class F(object):
3558 def readable(self): return True
3559 def writable(self): return True
3560 def seekable(self): return True
3561
3562 for i in range(10):
3563 try:
3564 self.TextIOWrapper(F(), encoding='utf-8')
3565 except Exception:
3566 pass
3567
3568 F.tell = lambda x: 0
3569 t = self.TextIOWrapper(F(), encoding='utf-8')
3570
INADA Naoki507434f2017-12-21 09:59:53 +09003571 def test_reconfigure_encoding_read(self):
3572 # latin1 -> utf8
3573 # (latin1 can decode utf-8 encoded string)
3574 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3575 raw = self.BytesIO(data)
3576 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3577 self.assertEqual(txt.readline(), 'abc\xe9\n')
3578 with self.assertRaises(self.UnsupportedOperation):
3579 txt.reconfigure(encoding='utf-8')
3580 with self.assertRaises(self.UnsupportedOperation):
3581 txt.reconfigure(newline=None)
3582
3583 def test_reconfigure_write_fromascii(self):
3584 # ascii has a specific encodefunc in the C implementation,
3585 # but utf-8-sig has not. Make sure that we get rid of the
3586 # cached encodefunc when we switch encoders.
3587 raw = self.BytesIO()
3588 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3589 txt.write('foo\n')
3590 txt.reconfigure(encoding='utf-8-sig')
3591 txt.write('\xe9\n')
3592 txt.flush()
3593 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3594
3595 def test_reconfigure_write(self):
3596 # latin -> utf8
3597 raw = self.BytesIO()
3598 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3599 txt.write('abc\xe9\n')
3600 txt.reconfigure(encoding='utf-8')
3601 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3602 txt.write('d\xe9f\n')
3603 txt.flush()
3604 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3605
3606 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3607 # the file
3608 raw = self.BytesIO()
3609 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3610 txt.write('abc\n')
3611 txt.reconfigure(encoding='utf-8-sig')
3612 txt.write('d\xe9f\n')
3613 txt.flush()
3614 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3615
3616 def test_reconfigure_write_non_seekable(self):
3617 raw = self.BytesIO()
3618 raw.seekable = lambda: False
3619 raw.seek = None
3620 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3621 txt.write('abc\n')
3622 txt.reconfigure(encoding='utf-8-sig')
3623 txt.write('d\xe9f\n')
3624 txt.flush()
3625
3626 # If the raw stream is not seekable, there'll be a BOM
3627 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3628
3629 def test_reconfigure_defaults(self):
3630 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3631 txt.reconfigure(encoding=None)
3632 self.assertEqual(txt.encoding, 'ascii')
3633 self.assertEqual(txt.errors, 'replace')
3634 txt.write('LF\n')
3635
3636 txt.reconfigure(newline='\r\n')
3637 self.assertEqual(txt.encoding, 'ascii')
3638 self.assertEqual(txt.errors, 'replace')
3639
3640 txt.reconfigure(errors='ignore')
3641 self.assertEqual(txt.encoding, 'ascii')
3642 self.assertEqual(txt.errors, 'ignore')
3643 txt.write('CRLF\n')
3644
3645 txt.reconfigure(encoding='utf-8', newline=None)
3646 self.assertEqual(txt.errors, 'strict')
3647 txt.seek(0)
3648 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3649
3650 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3651
3652 def test_reconfigure_newline(self):
3653 raw = self.BytesIO(b'CR\rEOF')
3654 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3655 txt.reconfigure(newline=None)
3656 self.assertEqual(txt.readline(), 'CR\n')
3657 raw = self.BytesIO(b'CR\rEOF')
3658 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3659 txt.reconfigure(newline='')
3660 self.assertEqual(txt.readline(), 'CR\r')
3661 raw = self.BytesIO(b'CR\rLF\nEOF')
3662 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3663 txt.reconfigure(newline='\n')
3664 self.assertEqual(txt.readline(), 'CR\rLF\n')
3665 raw = self.BytesIO(b'LF\nCR\rEOF')
3666 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3667 txt.reconfigure(newline='\r')
3668 self.assertEqual(txt.readline(), 'LF\nCR\r')
3669 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3670 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3671 txt.reconfigure(newline='\r\n')
3672 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3673
3674 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3675 txt.reconfigure(newline=None)
3676 txt.write('linesep\n')
3677 txt.reconfigure(newline='')
3678 txt.write('LF\n')
3679 txt.reconfigure(newline='\n')
3680 txt.write('LF\n')
3681 txt.reconfigure(newline='\r')
3682 txt.write('CR\n')
3683 txt.reconfigure(newline='\r\n')
3684 txt.write('CRLF\n')
3685 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3686 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3687
Zackery Spytz23db9352018-06-29 04:14:58 -06003688 def test_issue25862(self):
3689 # Assertion failures occurred in tell() after read() and write().
3690 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3691 t.read(1)
3692 t.read()
3693 t.tell()
3694 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3695 t.read(1)
3696 t.write('x')
3697 t.tell()
3698
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003699
Antoine Pitroub8503892014-04-29 10:14:02 +02003700class MemviewBytesIO(io.BytesIO):
3701 '''A BytesIO object whose read method returns memoryviews
3702 rather than bytes'''
3703
3704 def read1(self, len_):
3705 return _to_memoryview(super().read1(len_))
3706
3707 def read(self, len_):
3708 return _to_memoryview(super().read(len_))
3709
3710def _to_memoryview(buf):
3711 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3712
3713 arr = array.array('i')
3714 idx = len(buf) - len(buf) % arr.itemsize
3715 arr.frombytes(buf[:idx])
3716 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003717
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003718
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003719class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003720 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003721 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003722
3723 def test_initialization(self):
3724 r = self.BytesIO(b"\xc3\xa9\n\n")
3725 b = self.BufferedReader(r, 1000)
3726 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003727 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3728 self.assertRaises(ValueError, t.read)
3729
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003730 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3731 self.assertRaises(Exception, repr, t)
3732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003733 def test_garbage_collection(self):
3734 # C TextIOWrapper objects are collected, and collecting them flushes
3735 # all data to disk.
3736 # The Python version has __del__, so it ends in gc.garbage instead.
Hai Shi883bc632020-07-06 17:12:49 +08003737 with warnings_helper.check_warnings(('', ResourceWarning)):
3738 rawio = io.FileIO(os_helper.TESTFN, "wb")
Antoine Pitrou796564c2013-07-30 19:59:21 +02003739 b = self.BufferedWriter(rawio)
3740 t = self.TextIOWrapper(b, encoding="ascii")
3741 t.write("456def")
3742 t.x = t
3743 wr = weakref.ref(t)
3744 del t
3745 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003746 self.assertIsNone(wr(), wr)
Hai Shi883bc632020-07-06 17:12:49 +08003747 with self.open(os_helper.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003748 self.assertEqual(f.read(), b"456def")
3749
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003750 def test_rwpair_cleared_before_textio(self):
3751 # Issue 13070: TextIOWrapper's finalization would crash when called
3752 # after the reference to the underlying BufferedRWPair's writer got
3753 # cleared by the GC.
3754 for i in range(1000):
3755 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3756 t1 = self.TextIOWrapper(b1, encoding="ascii")
3757 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3758 t2 = self.TextIOWrapper(b2, encoding="ascii")
3759 # circular references
3760 t1.buddy = t2
3761 t2.buddy = t1
3762 support.gc_collect()
3763
Zackery Spytz842acaa2018-12-17 07:52:45 -07003764 def test_del__CHUNK_SIZE_SystemError(self):
3765 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3766 with self.assertRaises(AttributeError):
3767 del t._CHUNK_SIZE
3768
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003769
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003770class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003771 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003772 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003773
3774
3775class IncrementalNewlineDecoderTest(unittest.TestCase):
3776
3777 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003778 # UTF-8 specific tests for a newline decoder
3779 def _check_decode(b, s, **kwargs):
3780 # We exercise getstate() / setstate() as well as decode()
3781 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003782 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003783 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003784 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003785
Antoine Pitrou180a3362008-12-14 16:36:46 +00003786 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003787
Antoine Pitrou180a3362008-12-14 16:36:46 +00003788 _check_decode(b'\xe8', "")
3789 _check_decode(b'\xa2', "")
3790 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003791
Antoine Pitrou180a3362008-12-14 16:36:46 +00003792 _check_decode(b'\xe8', "")
3793 _check_decode(b'\xa2', "")
3794 _check_decode(b'\x88', "\u8888")
3795
3796 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003797 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3798
Antoine Pitrou180a3362008-12-14 16:36:46 +00003799 decoder.reset()
3800 _check_decode(b'\n', "\n")
3801 _check_decode(b'\r', "")
3802 _check_decode(b'', "\n", final=True)
3803 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003804
Antoine Pitrou180a3362008-12-14 16:36:46 +00003805 _check_decode(b'\r', "")
3806 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003807
Antoine Pitrou180a3362008-12-14 16:36:46 +00003808 _check_decode(b'\r\r\n', "\n\n")
3809 _check_decode(b'\r', "")
3810 _check_decode(b'\r', "\n")
3811 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003812
Antoine Pitrou180a3362008-12-14 16:36:46 +00003813 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3814 _check_decode(b'\xe8\xa2\x88', "\u8888")
3815 _check_decode(b'\n', "\n")
3816 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3817 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003818
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003819 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003820 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003821 if encoding is not None:
3822 encoder = codecs.getincrementalencoder(encoding)()
3823 def _decode_bytewise(s):
3824 # Decode one byte at a time
3825 for b in encoder.encode(s):
3826 result.append(decoder.decode(bytes([b])))
3827 else:
3828 encoder = None
3829 def _decode_bytewise(s):
3830 # Decode one char at a time
3831 for c in s:
3832 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003833 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003834 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003835 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003836 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003837 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003838 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003839 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003840 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003841 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003842 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003843 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003844 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003845 input = "abc"
3846 if encoder is not None:
3847 encoder.reset()
3848 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003849 self.assertEqual(decoder.decode(input), "abc")
3850 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003851
3852 def test_newline_decoder(self):
3853 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003854 # None meaning the IncrementalNewlineDecoder takes unicode input
3855 # rather than bytes input
3856 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003857 'utf-16', 'utf-16-le', 'utf-16-be',
3858 'utf-32', 'utf-32-le', 'utf-32-be',
3859 )
3860 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003861 decoder = enc and codecs.getincrementaldecoder(enc)()
3862 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3863 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003864 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003865 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3866 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003867 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003868
Antoine Pitrou66913e22009-03-06 23:40:56 +00003869 def test_newline_bytes(self):
3870 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3871 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003872 self.assertEqual(dec.newlines, None)
3873 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3874 self.assertEqual(dec.newlines, None)
3875 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3876 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003877 dec = self.IncrementalNewlineDecoder(None, translate=False)
3878 _check(dec)
3879 dec = self.IncrementalNewlineDecoder(None, translate=True)
3880 _check(dec)
3881
Xiang Zhangb08746b2018-10-31 19:49:16 +08003882 def test_translate(self):
3883 # issue 35062
3884 for translate in (-2, -1, 1, 2):
3885 decoder = codecs.getincrementaldecoder("utf-8")()
3886 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3887 self.check_newline_decoding_utf8(decoder)
3888 decoder = codecs.getincrementaldecoder("utf-8")()
3889 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3890 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3891
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003892class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3893 pass
3894
3895class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3896 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003897
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003898
Guido van Rossum01a27522007-03-07 01:00:12 +00003899# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003900
Guido van Rossum5abbf752007-08-27 17:39:33 +00003901class MiscIOTest(unittest.TestCase):
3902
Barry Warsaw40e82462008-11-20 20:14:50 +00003903 def tearDown(self):
Hai Shi883bc632020-07-06 17:12:49 +08003904 os_helper.unlink(os_helper.TESTFN)
Barry Warsaw40e82462008-11-20 20:14:50 +00003905
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003906 def test___all__(self):
3907 for name in self.io.__all__:
3908 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003909 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003910 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003911 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003912 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003913 self.assertTrue(issubclass(obj, Exception), name)
3914 elif not name.startswith("SEEK_"):
3915 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003916
Barry Warsaw40e82462008-11-20 20:14:50 +00003917 def test_attributes(self):
Hai Shi883bc632020-07-06 17:12:49 +08003918 f = self.open(os_helper.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003919 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003920 f.close()
3921
Hai Shi883bc632020-07-06 17:12:49 +08003922 with warnings_helper.check_warnings(('', DeprecationWarning)):
3923 f = self.open(os_helper.TESTFN, "U")
3924 self.assertEqual(f.name, os_helper.TESTFN)
3925 self.assertEqual(f.buffer.name, os_helper.TESTFN)
3926 self.assertEqual(f.buffer.raw.name, os_helper.TESTFN)
Victor Stinner942f7a22020-03-04 18:50:22 +01003927 self.assertEqual(f.mode, "U")
3928 self.assertEqual(f.buffer.mode, "rb")
3929 self.assertEqual(f.buffer.raw.mode, "rb")
3930 f.close()
3931
Hai Shi883bc632020-07-06 17:12:49 +08003932 f = self.open(os_helper.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003933 self.assertEqual(f.mode, "w+")
3934 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3935 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003936
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003937 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003938 self.assertEqual(g.mode, "wb")
3939 self.assertEqual(g.raw.mode, "wb")
3940 self.assertEqual(g.name, f.fileno())
3941 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003942 f.close()
3943 g.close()
3944
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003945 def test_open_pipe_with_append(self):
3946 # bpo-27805: Ignore ESPIPE from lseek() in open().
3947 r, w = os.pipe()
3948 self.addCleanup(os.close, r)
3949 f = self.open(w, 'a')
3950 self.addCleanup(f.close)
3951 # Check that the file is marked non-seekable. On Windows, however, lseek
3952 # somehow succeeds on pipes.
3953 if sys.platform != 'win32':
3954 self.assertFalse(f.seekable())
3955
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003956 def test_io_after_close(self):
3957 for kwargs in [
3958 {"mode": "w"},
3959 {"mode": "wb"},
3960 {"mode": "w", "buffering": 1},
3961 {"mode": "w", "buffering": 2},
3962 {"mode": "wb", "buffering": 0},
3963 {"mode": "r"},
3964 {"mode": "rb"},
3965 {"mode": "r", "buffering": 1},
3966 {"mode": "r", "buffering": 2},
3967 {"mode": "rb", "buffering": 0},
3968 {"mode": "w+"},
3969 {"mode": "w+b"},
3970 {"mode": "w+", "buffering": 1},
3971 {"mode": "w+", "buffering": 2},
3972 {"mode": "w+b", "buffering": 0},
3973 ]:
Hai Shi883bc632020-07-06 17:12:49 +08003974 f = self.open(os_helper.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003975 f.close()
3976 self.assertRaises(ValueError, f.flush)
3977 self.assertRaises(ValueError, f.fileno)
3978 self.assertRaises(ValueError, f.isatty)
3979 self.assertRaises(ValueError, f.__iter__)
3980 if hasattr(f, "peek"):
3981 self.assertRaises(ValueError, f.peek, 1)
3982 self.assertRaises(ValueError, f.read)
3983 if hasattr(f, "read1"):
3984 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003985 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003986 if hasattr(f, "readall"):
3987 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003988 if hasattr(f, "readinto"):
3989 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003990 if hasattr(f, "readinto1"):
3991 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003992 self.assertRaises(ValueError, f.readline)
3993 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003994 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003995 self.assertRaises(ValueError, f.seek, 0)
3996 self.assertRaises(ValueError, f.tell)
3997 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003998 self.assertRaises(ValueError, f.write,
3999 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004000 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004001 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004002
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004003 def test_blockingioerror(self):
4004 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004005 class C(str):
4006 pass
4007 c = C("")
4008 b = self.BlockingIOError(1, c)
4009 c.b = b
4010 b.c = c
4011 wr = weakref.ref(c)
4012 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00004013 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004014 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004015
4016 def test_abcs(self):
4017 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00004018 self.assertIsInstance(self.IOBase, abc.ABCMeta)
4019 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4020 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4021 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004022
4023 def _check_abc_inheritance(self, abcmodule):
Hai Shi883bc632020-07-06 17:12:49 +08004024 with self.open(os_helper.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004025 self.assertIsInstance(f, abcmodule.IOBase)
4026 self.assertIsInstance(f, abcmodule.RawIOBase)
4027 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4028 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004029 with self.open(os_helper.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004030 self.assertIsInstance(f, abcmodule.IOBase)
4031 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4032 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4033 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Hai Shi883bc632020-07-06 17:12:49 +08004034 with self.open(os_helper.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004035 self.assertIsInstance(f, abcmodule.IOBase)
4036 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4037 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4038 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004039
4040 def test_abc_inheritance(self):
4041 # Test implementations inherit from their respective ABCs
4042 self._check_abc_inheritance(self)
4043
4044 def test_abc_inheritance_official(self):
4045 # Test implementations inherit from the official ABCs of the
4046 # baseline "io" module.
4047 self._check_abc_inheritance(io)
4048
Antoine Pitroue033e062010-10-29 10:38:18 +00004049 def _check_warn_on_dealloc(self, *args, **kwargs):
4050 f = open(*args, **kwargs)
4051 r = repr(f)
4052 with self.assertWarns(ResourceWarning) as cm:
4053 f = None
4054 support.gc_collect()
4055 self.assertIn(r, str(cm.warning.args[0]))
4056
4057 def test_warn_on_dealloc(self):
Hai Shi883bc632020-07-06 17:12:49 +08004058 self._check_warn_on_dealloc(os_helper.TESTFN, "wb", buffering=0)
4059 self._check_warn_on_dealloc(os_helper.TESTFN, "wb")
4060 self._check_warn_on_dealloc(os_helper.TESTFN, "w")
Antoine Pitroue033e062010-10-29 10:38:18 +00004061
4062 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4063 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004064 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004065 for fd in fds:
4066 try:
4067 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004068 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004069 if e.errno != errno.EBADF:
4070 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004071 self.addCleanup(cleanup_fds)
4072 r, w = os.pipe()
4073 fds += r, w
4074 self._check_warn_on_dealloc(r, *args, **kwargs)
4075 # When using closefd=False, there's no warning
4076 r, w = os.pipe()
4077 fds += r, w
Hai Shi883bc632020-07-06 17:12:49 +08004078 with warnings_helper.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004079 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004080
4081 def test_warn_on_dealloc_fd(self):
4082 self._check_warn_on_dealloc_fd("rb", buffering=0)
4083 self._check_warn_on_dealloc_fd("rb")
4084 self._check_warn_on_dealloc_fd("r")
4085
4086
Antoine Pitrou243757e2010-11-05 21:15:39 +00004087 def test_pickling(self):
4088 # Pickling file objects is forbidden
4089 for kwargs in [
4090 {"mode": "w"},
4091 {"mode": "wb"},
4092 {"mode": "wb", "buffering": 0},
4093 {"mode": "r"},
4094 {"mode": "rb"},
4095 {"mode": "rb", "buffering": 0},
4096 {"mode": "w+"},
4097 {"mode": "w+b"},
4098 {"mode": "w+b", "buffering": 0},
4099 ]:
4100 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
Hai Shi883bc632020-07-06 17:12:49 +08004101 with self.open(os_helper.TESTFN, **kwargs) as f:
Antoine Pitrou243757e2010-11-05 21:15:39 +00004102 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4103
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004104 def test_nonblock_pipe_write_bigbuf(self):
4105 self._test_nonblock_pipe_write(16*1024)
4106
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004107 def test_nonblock_pipe_write_smallbuf(self):
4108 self._test_nonblock_pipe_write(1024)
4109
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004110 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4111 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004112 def _test_nonblock_pipe_write(self, bufsize):
4113 sent = []
4114 received = []
4115 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004116 os.set_blocking(r, False)
4117 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004118
4119 # To exercise all code paths in the C implementation we need
4120 # to play with buffer sizes. For instance, if we choose a
4121 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4122 # then we will never get a partial write of the buffer.
4123 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4124 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4125
4126 with rf, wf:
4127 for N in 9999, 73, 7574:
4128 try:
4129 i = 0
4130 while True:
4131 msg = bytes([i % 26 + 97]) * N
4132 sent.append(msg)
4133 wf.write(msg)
4134 i += 1
4135
4136 except self.BlockingIOError as e:
4137 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004138 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004139 sent[-1] = sent[-1][:e.characters_written]
4140 received.append(rf.read())
4141 msg = b'BLOCKED'
4142 wf.write(msg)
4143 sent.append(msg)
4144
4145 while True:
4146 try:
4147 wf.flush()
4148 break
4149 except self.BlockingIOError as e:
4150 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004151 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004152 self.assertEqual(e.characters_written, 0)
4153 received.append(rf.read())
4154
4155 received += iter(rf.read, None)
4156
4157 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004158 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004159 self.assertTrue(wf.closed)
4160 self.assertTrue(rf.closed)
4161
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004162 def test_create_fail(self):
4163 # 'x' mode fails if file is existing
Hai Shi883bc632020-07-06 17:12:49 +08004164 with self.open(os_helper.TESTFN, 'w'):
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004165 pass
Hai Shi883bc632020-07-06 17:12:49 +08004166 self.assertRaises(FileExistsError, self.open, os_helper.TESTFN, 'x')
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004167
4168 def test_create_writes(self):
4169 # 'x' mode opens for writing
Hai Shi883bc632020-07-06 17:12:49 +08004170 with self.open(os_helper.TESTFN, 'xb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004171 f.write(b"spam")
Hai Shi883bc632020-07-06 17:12:49 +08004172 with self.open(os_helper.TESTFN, 'rb') as f:
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004173 self.assertEqual(b"spam", f.read())
4174
Christian Heimes7b648752012-09-10 14:48:43 +02004175 def test_open_allargs(self):
4176 # there used to be a buffer overflow in the parser for rawmode
Hai Shi883bc632020-07-06 17:12:49 +08004177 self.assertRaises(ValueError, self.open, os_helper.TESTFN, 'rwax+')
Christian Heimes7b648752012-09-10 14:48:43 +02004178
Victor Stinner22eb6892019-06-26 00:51:05 +02004179 def test_check_encoding_errors(self):
4180 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4181 # arguments in dev mode
4182 mod = self.io.__name__
4183 filename = __file__
4184 invalid = 'Boom, Shaka Laka, Boom!'
4185 code = textwrap.dedent(f'''
4186 import sys
4187 from {mod} import open, TextIOWrapper
4188
4189 try:
4190 open({filename!r}, encoding={invalid!r})
4191 except LookupError:
4192 pass
4193 else:
4194 sys.exit(21)
4195
4196 try:
4197 open({filename!r}, errors={invalid!r})
4198 except LookupError:
4199 pass
4200 else:
4201 sys.exit(22)
4202
4203 fp = open({filename!r}, "rb")
4204 with fp:
4205 try:
4206 TextIOWrapper(fp, encoding={invalid!r})
4207 except LookupError:
4208 pass
4209 else:
4210 sys.exit(23)
4211
4212 try:
4213 TextIOWrapper(fp, errors={invalid!r})
4214 except LookupError:
4215 pass
4216 else:
4217 sys.exit(24)
4218
4219 sys.exit(10)
4220 ''')
4221 proc = assert_python_failure('-X', 'dev', '-c', code)
4222 self.assertEqual(proc.rc, 10, proc)
4223
Christian Heimes7b648752012-09-10 14:48:43 +02004224
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004225class CMiscIOTest(MiscIOTest):
4226 io = io
4227
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004228 def test_readinto_buffer_overflow(self):
4229 # Issue #18025
4230 class BadReader(self.io.BufferedIOBase):
4231 def read(self, n=-1):
4232 return b'x' * 10**6
4233 bufio = BadReader()
4234 b = bytearray(2)
4235 self.assertRaises(ValueError, bufio.readinto, b)
4236
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004237 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4238 # Issue #23309: deadlocks at shutdown should be avoided when a
4239 # daemon thread and the main thread both write to a file.
4240 code = """if 1:
4241 import sys
4242 import time
4243 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004244 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004245
4246 file = sys.{stream_name}
4247
4248 def run():
4249 while True:
4250 file.write('.')
4251 file.flush()
4252
Victor Stinner2a1aed02017-04-21 17:59:23 +02004253 crash = SuppressCrashReport()
4254 crash.__enter__()
4255 # don't call __exit__(): the crash occurs at Python shutdown
4256
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004257 thread = threading.Thread(target=run)
4258 thread.daemon = True
4259 thread.start()
4260
4261 time.sleep(0.5)
4262 file.write('!')
4263 file.flush()
4264 """.format_map(locals())
4265 res, _ = run_python_until_end("-c", code)
4266 err = res.err.decode()
4267 if res.rc != 0:
4268 # Failure: should be a fatal error
Victor Stinner9e5d30c2020-03-07 00:54:20 +01004269 pattern = (r"Fatal Python error: _enter_buffered_busy: "
4270 r"could not acquire lock "
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004271 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4272 r"at interpreter shutdown, possibly due to "
4273 r"daemon threads".format_map(locals()))
4274 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004275 else:
4276 self.assertFalse(err.strip('.!'))
4277
4278 def test_daemon_threads_shutdown_stdout_deadlock(self):
4279 self.check_daemon_threads_shutdown_deadlock('stdout')
4280
4281 def test_daemon_threads_shutdown_stderr_deadlock(self):
4282 self.check_daemon_threads_shutdown_deadlock('stderr')
4283
4284
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004285class PyMiscIOTest(MiscIOTest):
4286 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004287
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004288
4289@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4290class SignalsTest(unittest.TestCase):
4291
4292 def setUp(self):
4293 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4294
4295 def tearDown(self):
4296 signal.signal(signal.SIGALRM, self.oldalrm)
4297
4298 def alarm_interrupt(self, sig, frame):
4299 1/0
4300
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004301 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4302 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004303 invokes the signal handler, and bubbles up the exception raised
4304 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004305 read_results = []
4306 def _read():
4307 s = os.read(r, 1)
4308 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004309
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004310 t = threading.Thread(target=_read)
4311 t.daemon = True
4312 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004313 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004314 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004315 try:
4316 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004317 if hasattr(signal, 'pthread_sigmask'):
4318 # create the thread with SIGALRM signal blocked
4319 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4320 t.start()
4321 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4322 else:
4323 t.start()
4324
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004325 # Fill the pipe enough that the write will be blocking.
4326 # It will be interrupted by the timer armed above. Since the
4327 # other thread has read one byte, the low-level write will
4328 # return with a successful (partial) result rather than an EINTR.
4329 # The buffered IO layer must check for pending signal
4330 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004331 signal.alarm(1)
4332 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004333 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004334 finally:
4335 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004336 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004337 # We got one byte, get another one and check that it isn't a
4338 # repeat of the first one.
4339 read_results.append(os.read(r, 1))
4340 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4341 finally:
4342 os.close(w)
4343 os.close(r)
4344 # This is deliberate. If we didn't close the file descriptor
4345 # before closing wio, wio would try to flush its internal
4346 # buffer, and block again.
4347 try:
4348 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004349 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004350 if e.errno != errno.EBADF:
4351 raise
4352
4353 def test_interrupted_write_unbuffered(self):
4354 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4355
4356 def test_interrupted_write_buffered(self):
4357 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4358
4359 def test_interrupted_write_text(self):
4360 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4361
Brett Cannon31f59292011-02-21 19:29:56 +00004362 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004363 def check_reentrant_write(self, data, **fdopen_kwargs):
4364 def on_alarm(*args):
4365 # Will be called reentrantly from the same thread
4366 wio.write(data)
4367 1/0
4368 signal.signal(signal.SIGALRM, on_alarm)
4369 r, w = os.pipe()
4370 wio = self.io.open(w, **fdopen_kwargs)
4371 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004372 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004373 # Either the reentrant call to wio.write() fails with RuntimeError,
4374 # or the signal handler raises ZeroDivisionError.
4375 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4376 while 1:
4377 for i in range(100):
4378 wio.write(data)
4379 wio.flush()
4380 # Make sure the buffer doesn't fill up and block further writes
4381 os.read(r, len(data) * 100)
4382 exc = cm.exception
4383 if isinstance(exc, RuntimeError):
4384 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4385 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004386 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004387 wio.close()
4388 os.close(r)
4389
4390 def test_reentrant_write_buffered(self):
4391 self.check_reentrant_write(b"xy", mode="wb")
4392
4393 def test_reentrant_write_text(self):
4394 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4395
Antoine Pitrou707ce822011-02-25 21:24:11 +00004396 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4397 """Check that a buffered read, when it gets interrupted (either
4398 returning a partial result or EINTR), properly invokes the signal
4399 handler and retries if the latter returned successfully."""
4400 r, w = os.pipe()
4401 fdopen_kwargs["closefd"] = False
4402 def alarm_handler(sig, frame):
4403 os.write(w, b"bar")
4404 signal.signal(signal.SIGALRM, alarm_handler)
4405 try:
4406 rio = self.io.open(r, **fdopen_kwargs)
4407 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004408 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004409 # Expected behaviour:
4410 # - first raw read() returns partial b"foo"
4411 # - second raw read() returns EINTR
4412 # - third raw read() returns b"bar"
4413 self.assertEqual(decode(rio.read(6)), "foobar")
4414 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004415 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004416 rio.close()
4417 os.close(w)
4418 os.close(r)
4419
Antoine Pitrou20db5112011-08-19 20:32:34 +02004420 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004421 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4422 mode="rb")
4423
Antoine Pitrou20db5112011-08-19 20:32:34 +02004424 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004425 self.check_interrupted_read_retry(lambda x: x,
4426 mode="r")
4427
Antoine Pitrou707ce822011-02-25 21:24:11 +00004428 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4429 """Check that a buffered write, when it gets interrupted (either
4430 returning a partial result or EINTR), properly invokes the signal
4431 handler and retries if the latter returned successfully."""
Hai Shi883bc632020-07-06 17:12:49 +08004432 select = import_helper.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004433
Antoine Pitrou707ce822011-02-25 21:24:11 +00004434 # A quantity that exceeds the buffer size of an anonymous pipe's
4435 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004436 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004437 r, w = os.pipe()
4438 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004439
Antoine Pitrou707ce822011-02-25 21:24:11 +00004440 # We need a separate thread to read from the pipe and allow the
4441 # write() to finish. This thread is started after the SIGALRM is
4442 # received (forcing a first EINTR in write()).
4443 read_results = []
4444 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004445 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004446 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004447 try:
4448 while not write_finished:
4449 while r in select.select([r], [], [], 1.0)[0]:
4450 s = os.read(r, 1024)
4451 read_results.append(s)
4452 except BaseException as exc:
4453 nonlocal error
4454 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004455 t = threading.Thread(target=_read)
4456 t.daemon = True
4457 def alarm1(sig, frame):
4458 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004459 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004460 def alarm2(sig, frame):
4461 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004462
4463 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004464 signal.signal(signal.SIGALRM, alarm1)
4465 try:
4466 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004467 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004468 # Expected behaviour:
4469 # - first raw write() is partial (because of the limited pipe buffer
4470 # and the first alarm)
4471 # - second raw write() returns EINTR (because of the second alarm)
4472 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004473 written = wio.write(large_data)
4474 self.assertEqual(N, written)
4475
Antoine Pitrou707ce822011-02-25 21:24:11 +00004476 wio.flush()
4477 write_finished = True
4478 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004479
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004480 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004481 self.assertEqual(N, sum(len(x) for x in read_results))
4482 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004483 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004484 write_finished = True
4485 os.close(w)
4486 os.close(r)
4487 # This is deliberate. If we didn't close the file descriptor
4488 # before closing wio, wio would try to flush its internal
4489 # buffer, and could block (in case of failure).
4490 try:
4491 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004492 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004493 if e.errno != errno.EBADF:
4494 raise
4495
Antoine Pitrou20db5112011-08-19 20:32:34 +02004496 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004497 self.check_interrupted_write_retry(b"x", mode="wb")
4498
Antoine Pitrou20db5112011-08-19 20:32:34 +02004499 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004500 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4501
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004502
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004503class CSignalsTest(SignalsTest):
4504 io = io
4505
4506class PySignalsTest(SignalsTest):
4507 io = pyio
4508
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004509 # Handling reentrancy issues would slow down _pyio even more, so the
4510 # tests are disabled.
4511 test_reentrant_write_buffered = None
4512 test_reentrant_write_text = None
4513
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004514
Ezio Melottidaa42c72013-03-23 16:30:16 +02004515def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004516 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004517 CBufferedReaderTest, PyBufferedReaderTest,
4518 CBufferedWriterTest, PyBufferedWriterTest,
4519 CBufferedRWPairTest, PyBufferedRWPairTest,
4520 CBufferedRandomTest, PyBufferedRandomTest,
4521 StatefulIncrementalDecoderTest,
4522 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4523 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004524 CMiscIOTest, PyMiscIOTest,
4525 CSignalsTest, PySignalsTest,
4526 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004527
4528 # Put the namespaces of the IO module we are testing and some useful mock
4529 # classes in the __dict__ of each test.
4530 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004531 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4532 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004533 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4534 c_io_ns = {name : getattr(io, name) for name in all_members}
4535 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4536 globs = globals()
4537 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4538 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4539 # Avoid turning open into a bound method.
4540 py_io_ns["open"] = pyio.OpenWrapper
4541 for test in tests:
4542 if test.__name__.startswith("C"):
4543 for name, obj in c_io_ns.items():
4544 setattr(test, name, obj)
4545 elif test.__name__.startswith("Py"):
4546 for name, obj in py_io_ns.items():
4547 setattr(test, name, obj)
4548
Ezio Melottidaa42c72013-03-23 16:30:16 +02004549 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4550 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004551
4552if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004553 unittest.main()