blob: 7b8511b66bf10f299bb9463710d9bdce946d40cc [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Hai Shie80697d2020-05-28 06:10:27 +080043from test.support import threading_helper
Serhiy Storchakab21d1552018-03-02 11:53:51 +020044from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000045
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000046import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047import io # C implementation of io
48import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Martin Panter6bb91f32016-05-28 00:41:57 +000050try:
51 import ctypes
52except ImportError:
53 def byteslike(*pos, **kw):
54 return array.array("b", bytes(*pos, **kw))
55else:
56 def byteslike(*pos, **kw):
57 """Create a bytes-like object having no string or sequence methods"""
58 data = bytes(*pos, **kw)
59 obj = EmptyStruct()
60 ctypes.resize(obj, len(data))
61 memoryview(obj).cast("B")[:] = data
62 return obj
63 class EmptyStruct(ctypes.Structure):
64 pass
65
Gregory P. Smithe5796c42018-12-30 20:17:57 -080066_cflags = sysconfig.get_config_var('CFLAGS') or ''
67_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
68MEMORY_SANITIZER = (
69 '-fsanitize=memory' in _cflags or
70 '--with-memory-sanitizer' in _config_args
71)
72
Victor Stinnerbc2aa812019-05-23 03:45:09 +020073# Does io.IOBase finalizer log the exception if the close() method fails?
74# The exception is ignored silently by default in release build.
75IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020076
77
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078def _default_chunk_size():
79 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000080 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return f._CHUNK_SIZE
82
83
Antoine Pitrou328ec742010-09-14 18:37:24 +000084class MockRawIOWithoutRead:
85 """A RawIO implementation without read(), so as to exercise the default
86 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000087
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000088 def __init__(self, read_stack=()):
89 self._read_stack = list(read_stack)
90 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000091 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000093
Guido van Rossum01a27522007-03-07 01:00:12 +000094 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000096 return len(b)
97
98 def writable(self):
99 return True
100
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000101 def fileno(self):
102 return 42
103
104 def readable(self):
105 return True
106
Guido van Rossum01a27522007-03-07 01:00:12 +0000107 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000108 return True
109
Guido van Rossum01a27522007-03-07 01:00:12 +0000110 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000111 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000112
113 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114 return 0 # same comment as above
115
116 def readinto(self, buf):
117 self._reads += 1
118 max_len = len(buf)
119 try:
120 data = self._read_stack[0]
121 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000122 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000123 return 0
124 if data is None:
125 del self._read_stack[0]
126 return None
127 n = len(data)
128 if len(data) <= max_len:
129 del self._read_stack[0]
130 buf[:n] = data
131 return n
132 else:
133 buf[:] = data[:max_len]
134 self._read_stack[0] = data[max_len:]
135 return max_len
136
137 def truncate(self, pos=None):
138 return pos
139
Antoine Pitrou328ec742010-09-14 18:37:24 +0000140class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
141 pass
142
143class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
144 pass
145
146
147class MockRawIO(MockRawIOWithoutRead):
148
149 def read(self, n=None):
150 self._reads += 1
151 try:
152 return self._read_stack.pop(0)
153 except:
154 self._extraneous_reads += 1
155 return b""
156
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157class CMockRawIO(MockRawIO, io.RawIOBase):
158 pass
159
160class PyMockRawIO(MockRawIO, pyio.RawIOBase):
161 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000162
Guido van Rossuma9e20242007-03-08 00:43:48 +0000163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164class MisbehavedRawIO(MockRawIO):
165 def write(self, b):
166 return super().write(b) * 2
167
168 def read(self, n=None):
169 return super().read(n) * 2
170
171 def seek(self, pos, whence):
172 return -123
173
174 def tell(self):
175 return -456
176
177 def readinto(self, buf):
178 super().readinto(buf)
179 return len(buf) * 5
180
181class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
182 pass
183
184class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
185 pass
186
187
benfogle9703f092017-11-10 16:03:40 -0500188class SlowFlushRawIO(MockRawIO):
189 def __init__(self):
190 super().__init__()
191 self.in_flush = threading.Event()
192
193 def flush(self):
194 self.in_flush.set()
195 time.sleep(0.25)
196
197class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
198 pass
199
200class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
201 pass
202
203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000204class CloseFailureIO(MockRawIO):
205 closed = 0
206
207 def close(self):
208 if not self.closed:
209 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200210 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000211
212class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
213 pass
214
215class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
216 pass
217
218
219class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000220
221 def __init__(self, data):
222 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000224
225 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000226 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000227 self.read_history.append(None if res is None else len(res))
228 return res
229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000230 def readinto(self, b):
231 res = super().readinto(b)
232 self.read_history.append(res)
233 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235class CMockFileIO(MockFileIO, io.BytesIO):
236 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238class PyMockFileIO(MockFileIO, pyio.BytesIO):
239 pass
240
241
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000242class MockUnseekableIO:
243 def seekable(self):
244 return False
245
246 def seek(self, *args):
247 raise self.UnsupportedOperation("not seekable")
248
249 def tell(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
Martin Panter754aab22016-03-31 07:21:56 +0000252 def truncate(self, *args):
253 raise self.UnsupportedOperation("not seekable")
254
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000255class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
256 UnsupportedOperation = io.UnsupportedOperation
257
258class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
259 UnsupportedOperation = pyio.UnsupportedOperation
260
261
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000262class MockNonBlockWriterIO:
263
264 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000265 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000266 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000268 def pop_written(self):
269 s = b"".join(self._write_stack)
270 self._write_stack[:] = []
271 return s
272
273 def block_on(self, char):
274 """Block when a given char is encountered."""
275 self._blocker_char = char
276
277 def readable(self):
278 return True
279
280 def seekable(self):
281 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Victor Stinnerb589cef2019-06-11 03:10:59 +0200283 def seek(self, pos, whence=0):
284 # naive implementation, enough for tests
285 return 0
286
Guido van Rossum01a27522007-03-07 01:00:12 +0000287 def writable(self):
288 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000289
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000290 def write(self, b):
291 b = bytes(b)
292 n = -1
293 if self._blocker_char:
294 try:
295 n = b.index(self._blocker_char)
296 except ValueError:
297 pass
298 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100299 if n > 0:
300 # write data up to the first blocker
301 self._write_stack.append(b[:n])
302 return n
303 else:
304 # cancel blocker and indicate would block
305 self._blocker_char = None
306 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000307 self._write_stack.append(b)
308 return len(b)
309
310class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
311 BlockingIOError = io.BlockingIOError
312
313class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
314 BlockingIOError = pyio.BlockingIOError
315
Guido van Rossuma9e20242007-03-08 00:43:48 +0000316
Guido van Rossum28524c72007-02-27 05:47:44 +0000317class IOTest(unittest.TestCase):
318
Neal Norwitze7789b12008-03-24 06:18:09 +0000319 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000320 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000321
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000322 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000323 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000324
Guido van Rossum28524c72007-02-27 05:47:44 +0000325 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000327 f.truncate(0)
328 self.assertEqual(f.tell(), 5)
329 f.seek(0)
330
331 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.seek(0), 0)
333 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000334 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000336 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000337 buffer = bytearray(b" world\n\n\n")
338 self.assertEqual(f.write(buffer), 9)
339 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000340 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000341 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000342 self.assertEqual(f.seek(-1, 2), 13)
343 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000344
Guido van Rossum87429772007-04-10 21:06:59 +0000345 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000346 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000347 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000348
Guido van Rossum9b76da62007-04-11 01:09:03 +0000349 def read_ops(self, f, buffered=False):
350 data = f.read(5)
351 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000352 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000353 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000354 self.assertEqual(bytes(data), b" worl")
355 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.assertEqual(f.readinto(data), 2)
357 self.assertEqual(len(data), 5)
358 self.assertEqual(data[:2], b"d\n")
359 self.assertEqual(f.seek(0), 0)
360 self.assertEqual(f.read(20), b"hello world\n")
361 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000362 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000363 self.assertEqual(f.seek(-6, 2), 6)
364 self.assertEqual(f.read(5), b"world")
365 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000366 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000367 self.assertEqual(f.seek(-6, 1), 5)
368 self.assertEqual(f.read(5), b" worl")
369 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000370 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000371 if buffered:
372 f.seek(0)
373 self.assertEqual(f.read(), b"hello world\n")
374 f.seek(6)
375 self.assertEqual(f.read(), b"world\n")
376 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000377 f.seek(0)
378 data = byteslike(5)
379 self.assertEqual(f.readinto1(data), 5)
380 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000381
Guido van Rossum34d69e52007-04-10 20:08:41 +0000382 LARGE = 2**31
383
Guido van Rossum53807da2007-04-10 19:01:47 +0000384 def large_file_ops(self, f):
385 assert f.readable()
386 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100387 try:
388 self.assertEqual(f.seek(self.LARGE), self.LARGE)
389 except (OverflowError, ValueError):
390 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000391 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000392 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000393 self.assertEqual(f.tell(), self.LARGE + 3)
394 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000395 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000396 self.assertEqual(f.tell(), self.LARGE + 2)
397 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000398 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000399 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000400 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
401 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 self.assertEqual(f.read(2), b"x")
403
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000404 def test_invalid_operations(self):
405 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000406 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000407 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000408 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000409 self.assertRaises(exc, fp.read)
410 self.assertRaises(exc, fp.readline)
411 with self.open(support.TESTFN, "wb", buffering=0) as fp:
412 self.assertRaises(exc, fp.read)
413 self.assertRaises(exc, fp.readline)
414 with self.open(support.TESTFN, "rb", buffering=0) as fp:
415 self.assertRaises(exc, fp.write, b"blah")
416 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000417 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, b"blah")
419 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000420 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000421 self.assertRaises(exc, fp.write, "blah")
422 self.assertRaises(exc, fp.writelines, ["blah\n"])
423 # Non-zero seeking from current or end pos
424 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
425 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000426
Martin Panter754aab22016-03-31 07:21:56 +0000427 def test_optional_abilities(self):
428 # Test for OSError when optional APIs are not supported
429 # The purpose of this test is to try fileno(), reading, writing and
430 # seeking operations with various objects that indicate they do not
431 # support these operations.
432
433 def pipe_reader():
434 [r, w] = os.pipe()
435 os.close(w) # So that read() is harmless
436 return self.FileIO(r, "r")
437
438 def pipe_writer():
439 [r, w] = os.pipe()
440 self.addCleanup(os.close, r)
441 # Guarantee that we can write into the pipe without blocking
442 thread = threading.Thread(target=os.read, args=(r, 100))
443 thread.start()
444 self.addCleanup(thread.join)
445 return self.FileIO(w, "w")
446
447 def buffered_reader():
448 return self.BufferedReader(self.MockUnseekableIO())
449
450 def buffered_writer():
451 return self.BufferedWriter(self.MockUnseekableIO())
452
453 def buffered_random():
454 return self.BufferedRandom(self.BytesIO())
455
456 def buffered_rw_pair():
457 return self.BufferedRWPair(self.MockUnseekableIO(),
458 self.MockUnseekableIO())
459
460 def text_reader():
461 class UnseekableReader(self.MockUnseekableIO):
462 writable = self.BufferedIOBase.writable
463 write = self.BufferedIOBase.write
464 return self.TextIOWrapper(UnseekableReader(), "ascii")
465
466 def text_writer():
467 class UnseekableWriter(self.MockUnseekableIO):
468 readable = self.BufferedIOBase.readable
469 read = self.BufferedIOBase.read
470 return self.TextIOWrapper(UnseekableWriter(), "ascii")
471
472 tests = (
473 (pipe_reader, "fr"), (pipe_writer, "fw"),
474 (buffered_reader, "r"), (buffered_writer, "w"),
475 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
476 (text_reader, "r"), (text_writer, "w"),
477 (self.BytesIO, "rws"), (self.StringIO, "rws"),
478 )
479 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000480 with self.subTest(test), test() as obj:
481 readable = "r" in abilities
482 self.assertEqual(obj.readable(), readable)
483 writable = "w" in abilities
484 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000485
486 if isinstance(obj, self.TextIOBase):
487 data = "3"
488 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
489 data = b"3"
490 else:
491 self.fail("Unknown base class")
492
493 if "f" in abilities:
494 obj.fileno()
495 else:
496 self.assertRaises(OSError, obj.fileno)
497
498 if readable:
499 obj.read(1)
500 obj.read()
501 else:
502 self.assertRaises(OSError, obj.read, 1)
503 self.assertRaises(OSError, obj.read)
504
505 if writable:
506 obj.write(data)
507 else:
508 self.assertRaises(OSError, obj.write, data)
509
Martin Panter3ee147f2016-03-31 21:05:31 +0000510 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000511 pipe_reader, pipe_writer):
512 # Pipes seem to appear as seekable on Windows
513 continue
514 seekable = "s" in abilities
515 self.assertEqual(obj.seekable(), seekable)
516
Martin Panter754aab22016-03-31 07:21:56 +0000517 if seekable:
518 obj.tell()
519 obj.seek(0)
520 else:
521 self.assertRaises(OSError, obj.tell)
522 self.assertRaises(OSError, obj.seek, 0)
523
524 if writable and seekable:
525 obj.truncate()
526 obj.truncate(0)
527 else:
528 self.assertRaises(OSError, obj.truncate)
529 self.assertRaises(OSError, obj.truncate, 0)
530
Antoine Pitrou13348842012-01-29 18:36:34 +0100531 def test_open_handles_NUL_chars(self):
532 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300533 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100534
535 bytes_fn = bytes(fn_with_NUL, 'ascii')
536 with warnings.catch_warnings():
537 warnings.simplefilter("ignore", DeprecationWarning)
538 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100539
Guido van Rossum28524c72007-02-27 05:47:44 +0000540 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000541 with self.open(support.TESTFN, "wb", buffering=0) as f:
542 self.assertEqual(f.readable(), False)
543 self.assertEqual(f.writable(), True)
544 self.assertEqual(f.seekable(), True)
545 self.write_ops(f)
546 with self.open(support.TESTFN, "rb", buffering=0) as f:
547 self.assertEqual(f.readable(), True)
548 self.assertEqual(f.writable(), False)
549 self.assertEqual(f.seekable(), True)
550 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000551
Guido van Rossum87429772007-04-10 21:06:59 +0000552 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000553 with self.open(support.TESTFN, "wb") as f:
554 self.assertEqual(f.readable(), False)
555 self.assertEqual(f.writable(), True)
556 self.assertEqual(f.seekable(), True)
557 self.write_ops(f)
558 with self.open(support.TESTFN, "rb") as f:
559 self.assertEqual(f.readable(), True)
560 self.assertEqual(f.writable(), False)
561 self.assertEqual(f.seekable(), True)
562 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000563
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000564 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000565 with self.open(support.TESTFN, "wb") as f:
566 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
567 with self.open(support.TESTFN, "rb") as f:
568 self.assertEqual(f.readline(), b"abc\n")
569 self.assertEqual(f.readline(10), b"def\n")
570 self.assertEqual(f.readline(2), b"xy")
571 self.assertEqual(f.readline(4), b"zzy\n")
572 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000573 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000574 self.assertRaises(TypeError, f.readline, 5.3)
575 with self.open(support.TESTFN, "r") as f:
576 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000577
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300578 def test_readline_nonsizeable(self):
579 # Issue #30061
580 # Crash when readline() returns an object without __len__
581 class R(self.IOBase):
582 def readline(self):
583 return None
584 self.assertRaises((TypeError, StopIteration), next, R())
585
586 def test_next_nonsizeable(self):
587 # Issue #30061
588 # Crash when __next__() returns an object without __len__
589 class R(self.IOBase):
590 def __next__(self):
591 return None
592 self.assertRaises(TypeError, R().readlines, 1)
593
Guido van Rossum28524c72007-02-27 05:47:44 +0000594 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000595 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000596 self.write_ops(f)
597 data = f.getvalue()
598 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000599 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000600 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000601
Guido van Rossum53807da2007-04-10 19:01:47 +0000602 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300603 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800604 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000605 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200606 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600607 support.requires(
608 'largefile',
609 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000610 with self.open(support.TESTFN, "w+b", 0) as f:
611 self.large_file_ops(f)
612 with self.open(support.TESTFN, "w+b") as f:
613 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000614
615 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300616 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000617 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000618 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000619 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000620 self.assertEqual(f.closed, True)
621 f = None
622 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000623 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000624 1/0
625 except ZeroDivisionError:
626 self.assertEqual(f.closed, True)
627 else:
628 self.fail("1/0 didn't raise an exception")
629
Antoine Pitrou08838b62009-01-21 00:55:13 +0000630 # issue 5008
631 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000632 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000633 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000635 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000636 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000637 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000638 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300639 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000640
Guido van Rossum87429772007-04-10 21:06:59 +0000641 def test_destructor(self):
642 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000643 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000644 def __del__(self):
645 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 try:
647 f = super().__del__
648 except AttributeError:
649 pass
650 else:
651 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000652 def close(self):
653 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000655 def flush(self):
656 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000658 with support.check_warnings(('', ResourceWarning)):
659 f = MyFileIO(support.TESTFN, "wb")
660 f.write(b"xxx")
661 del f
662 support.gc_collect()
663 self.assertEqual(record, [1, 2, 3])
664 with self.open(support.TESTFN, "rb") as f:
665 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666
667 def _check_base_destructor(self, base):
668 record = []
669 class MyIO(base):
670 def __init__(self):
671 # This exercises the availability of attributes on object
672 # destruction.
673 # (in the C version, close() is called by the tp_dealloc
674 # function, not by __del__)
675 self.on_del = 1
676 self.on_close = 2
677 self.on_flush = 3
678 def __del__(self):
679 record.append(self.on_del)
680 try:
681 f = super().__del__
682 except AttributeError:
683 pass
684 else:
685 f()
686 def close(self):
687 record.append(self.on_close)
688 super().close()
689 def flush(self):
690 record.append(self.on_flush)
691 super().flush()
692 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000693 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000694 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000695 self.assertEqual(record, [1, 2, 3])
696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_IOBase_destructor(self):
698 self._check_base_destructor(self.IOBase)
699
700 def test_RawIOBase_destructor(self):
701 self._check_base_destructor(self.RawIOBase)
702
703 def test_BufferedIOBase_destructor(self):
704 self._check_base_destructor(self.BufferedIOBase)
705
706 def test_TextIOBase_destructor(self):
707 self._check_base_destructor(self.TextIOBase)
708
Guido van Rossum87429772007-04-10 21:06:59 +0000709 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000710 with self.open(support.TESTFN, "wb") as f:
711 f.write(b"xxx")
712 with self.open(support.TESTFN, "rb") as f:
713 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000714
Guido van Rossumd4103952007-04-12 05:44:49 +0000715 def test_array_writes(self):
716 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000717 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000718 def check(f):
719 with f:
720 self.assertEqual(f.write(a), n)
721 f.writelines((a,))
722 check(self.BytesIO())
723 check(self.FileIO(support.TESTFN, "w"))
724 check(self.BufferedWriter(self.MockRawIO()))
725 check(self.BufferedRandom(self.MockRawIO()))
726 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000727
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000728 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000730 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000731
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 def test_read_closed(self):
733 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000734 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 with self.open(support.TESTFN, "r") as f:
736 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000737 self.assertEqual(file.read(), "egg\n")
738 file.seek(0)
739 file.close()
740 self.assertRaises(ValueError, file.read)
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100741 with self.open(support.TESTFN, "rb") as f:
742 file = self.open(f.fileno(), "rb", closefd=False)
743 self.assertEqual(file.read()[:3], b"egg")
744 file.close()
745 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000746
747 def test_no_closefd_with_filename(self):
748 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000749 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000750
751 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000752 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000753 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000754 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000755 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000756 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000757 self.assertEqual(file.buffer.raw.closefd, False)
758
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 def test_garbage_collection(self):
760 # FileIO objects are collected, and collecting them flushes
761 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000762 with support.check_warnings(('', ResourceWarning)):
763 f = self.FileIO(support.TESTFN, "wb")
764 f.write(b"abcxxx")
765 f.f = f
766 wr = weakref.ref(f)
767 del f
768 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300769 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000770 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000771 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000772
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000773 def test_unbounded_file(self):
774 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
775 zero = "/dev/zero"
776 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000777 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000778 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000779 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000780 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800781 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000782 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000783 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000784 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000785 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000786 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000787 self.assertRaises(OverflowError, f.read)
788
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200789 def check_flush_error_on_close(self, *args, **kwargs):
790 # Test that the file is closed despite failed flush
791 # and that flush() is called before file closed.
792 f = self.open(*args, **kwargs)
793 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000794 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200795 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200796 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000797 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200798 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600799 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200800 self.assertTrue(closed) # flush() called
801 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200802 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200803
804 def test_flush_error_on_close(self):
805 # raw file
806 # Issue #5700: io.FileIO calls flush() after file closed
807 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
808 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
809 self.check_flush_error_on_close(fd, 'wb', buffering=0)
810 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
811 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
812 os.close(fd)
813 # buffered io
814 self.check_flush_error_on_close(support.TESTFN, 'wb')
815 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
816 self.check_flush_error_on_close(fd, 'wb')
817 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
818 self.check_flush_error_on_close(fd, 'wb', closefd=False)
819 os.close(fd)
820 # text io
821 self.check_flush_error_on_close(support.TESTFN, 'w')
822 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
823 self.check_flush_error_on_close(fd, 'w')
824 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
825 self.check_flush_error_on_close(fd, 'w', closefd=False)
826 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000827
828 def test_multi_close(self):
829 f = self.open(support.TESTFN, "wb", buffering=0)
830 f.close()
831 f.close()
832 f.close()
833 self.assertRaises(ValueError, f.flush)
834
Antoine Pitrou328ec742010-09-14 18:37:24 +0000835 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530836 # Exercise the default limited RawIOBase.read(n) implementation (which
837 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000838 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
839 self.assertEqual(rawio.read(2), b"ab")
840 self.assertEqual(rawio.read(2), b"c")
841 self.assertEqual(rawio.read(2), b"d")
842 self.assertEqual(rawio.read(2), None)
843 self.assertEqual(rawio.read(2), b"ef")
844 self.assertEqual(rawio.read(2), b"g")
845 self.assertEqual(rawio.read(2), None)
846 self.assertEqual(rawio.read(2), b"")
847
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400848 def test_types_have_dict(self):
849 test = (
850 self.IOBase(),
851 self.RawIOBase(),
852 self.TextIOBase(),
853 self.StringIO(),
854 self.BytesIO()
855 )
856 for obj in test:
857 self.assertTrue(hasattr(obj, "__dict__"))
858
Ross Lagerwall59142db2011-10-31 20:34:46 +0200859 def test_opener(self):
860 with self.open(support.TESTFN, "w") as f:
861 f.write("egg\n")
862 fd = os.open(support.TESTFN, os.O_RDONLY)
863 def opener(path, flags):
864 return fd
865 with self.open("non-existent", "r", opener=opener) as f:
866 self.assertEqual(f.read(), "egg\n")
867
Barry Warsaw480e2852016-06-08 17:47:26 -0400868 def test_bad_opener_negative_1(self):
869 # Issue #27066.
870 def badopener(fname, flags):
871 return -1
872 with self.assertRaises(ValueError) as cm:
873 open('non-existent', 'r', opener=badopener)
874 self.assertEqual(str(cm.exception), 'opener returned -1')
875
876 def test_bad_opener_other_negative(self):
877 # Issue #27066.
878 def badopener(fname, flags):
879 return -2
880 with self.assertRaises(ValueError) as cm:
881 open('non-existent', 'r', opener=badopener)
882 self.assertEqual(str(cm.exception), 'opener returned -2')
883
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200884 def test_fileio_closefd(self):
885 # Issue #4841
886 with self.open(__file__, 'rb') as f1, \
887 self.open(__file__, 'rb') as f2:
888 fileio = self.FileIO(f1.fileno(), closefd=False)
889 # .__init__() must not close f1
890 fileio.__init__(f2.fileno(), closefd=False)
891 f1.readline()
892 # .close() must not close f2
893 fileio.close()
894 f2.readline()
895
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300896 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200897 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300898 with self.assertRaises(ValueError):
899 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300900
901 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200902 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300903 with self.assertRaises(ValueError):
904 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300905
Martin Panter6bb91f32016-05-28 00:41:57 +0000906 def test_buffered_readinto_mixin(self):
907 # Test the implementation provided by BufferedIOBase
908 class Stream(self.BufferedIOBase):
909 def read(self, size):
910 return b"12345"
911 read1 = read
912 stream = Stream()
913 for method in ("readinto", "readinto1"):
914 with self.subTest(method):
915 buffer = byteslike(5)
916 self.assertEqual(getattr(stream, method)(buffer), 5)
917 self.assertEqual(bytes(buffer), b"12345")
918
Ethan Furmand62548a2016-06-04 14:38:43 -0700919 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700920 def check_path_succeeds(path):
921 with self.open(path, "w") as f:
922 f.write("egg\n")
923
924 with self.open(path, "r") as f:
925 self.assertEqual(f.read(), "egg\n")
926
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200927 check_path_succeeds(FakePath(support.TESTFN))
928 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700929
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200930 with self.open(support.TESTFN, "w") as f:
931 bad_path = FakePath(f.fileno())
932 with self.assertRaises(TypeError):
933 self.open(bad_path, 'w')
934
935 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700936 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700937 self.open(bad_path, 'w')
938
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200939 bad_path = FakePath(FloatingPointError)
940 with self.assertRaises(FloatingPointError):
941 self.open(bad_path, 'w')
942
Ethan Furmand62548a2016-06-04 14:38:43 -0700943 # ensure that refcounting is correct with some error conditions
944 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200945 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700946
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530947 def test_RawIOBase_readall(self):
948 # Exercise the default unlimited RawIOBase.read() and readall()
949 # implementations.
950 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
951 self.assertEqual(rawio.read(), b"abcdefg")
952 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
953 self.assertEqual(rawio.readall(), b"abcdefg")
954
955 def test_BufferedIOBase_readinto(self):
956 # Exercise the default BufferedIOBase.readinto() and readinto1()
957 # implementations (which call read() or read1() internally).
958 class Reader(self.BufferedIOBase):
959 def __init__(self, avail):
960 self.avail = avail
961 def read(self, size):
962 result = self.avail[:size]
963 self.avail = self.avail[size:]
964 return result
965 def read1(self, size):
966 """Returns no more than 5 bytes at once"""
967 return self.read(min(size, 5))
968 tests = (
969 # (test method, total data available, read buffer size, expected
970 # read size)
971 ("readinto", 10, 5, 5),
972 ("readinto", 10, 6, 6), # More than read1() can return
973 ("readinto", 5, 6, 5), # Buffer larger than total available
974 ("readinto", 6, 7, 6),
975 ("readinto", 10, 0, 0), # Empty buffer
976 ("readinto1", 10, 5, 5), # Result limited to single read1() call
977 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
978 ("readinto1", 5, 6, 5), # Buffer larger than total available
979 ("readinto1", 6, 7, 5),
980 ("readinto1", 10, 0, 0), # Empty buffer
981 )
982 UNUSED_BYTE = 0x81
983 for test in tests:
984 with self.subTest(test):
985 method, avail, request, result = test
986 reader = Reader(bytes(range(avail)))
987 buffer = bytearray((UNUSED_BYTE,) * request)
988 method = getattr(reader, method)
989 self.assertEqual(method(buffer), result)
990 self.assertEqual(len(buffer), request)
991 self.assertSequenceEqual(buffer[:result], range(result))
992 unused = (UNUSED_BYTE,) * (request - result)
993 self.assertSequenceEqual(buffer[result:], unused)
994 self.assertEqual(len(reader.avail), avail - result)
995
Zackery Spytz28f07362018-07-17 00:31:44 -0600996 def test_close_assert(self):
997 class R(self.IOBase):
998 def __setattr__(self, name, value):
999 pass
1000 def flush(self):
1001 raise OSError()
1002 f = R()
1003 # This would cause an assertion failure.
1004 self.assertRaises(OSError, f.close)
1005
Victor Stinner472f7942019-04-12 21:58:24 +02001006 # Silence destructor error
1007 R.flush = lambda self: None
1008
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001009
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001010class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001011
1012 def test_IOBase_finalize(self):
1013 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1014 # class which inherits IOBase and an object of this class are caught
1015 # in a reference cycle and close() is already in the method cache.
1016 class MyIO(self.IOBase):
1017 def close(self):
1018 pass
1019
1020 # create an instance to populate the method cache
1021 MyIO()
1022 obj = MyIO()
1023 obj.obj = obj
1024 wr = weakref.ref(obj)
1025 del MyIO
1026 del obj
1027 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001028 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001029
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001030class PyIOTest(IOTest):
1031 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001032
Guido van Rossuma9e20242007-03-08 00:43:48 +00001033
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001034@support.cpython_only
1035class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001036
Gregory P. Smith054b0652015-04-14 12:58:05 -07001037 def test_RawIOBase_io_in_pyio_match(self):
1038 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001039 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1040 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001041 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1042
1043 def test_RawIOBase_pyio_in_io_match(self):
1044 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1045 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1046 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1047
1048
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049class CommonBufferedTests:
1050 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1051
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001052 def test_detach(self):
1053 raw = self.MockRawIO()
1054 buf = self.tp(raw)
1055 self.assertIs(buf.detach(), raw)
1056 self.assertRaises(ValueError, buf.detach)
1057
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001058 repr(buf) # Should still work
1059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_fileno(self):
1061 rawio = self.MockRawIO()
1062 bufio = self.tp(rawio)
1063
Ezio Melottib3aedd42010-11-20 19:04:17 +00001064 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066 def test_invalid_args(self):
1067 rawio = self.MockRawIO()
1068 bufio = self.tp(rawio)
1069 # Invalid whence
1070 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001071 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072
1073 def test_override_destructor(self):
1074 tp = self.tp
1075 record = []
1076 class MyBufferedIO(tp):
1077 def __del__(self):
1078 record.append(1)
1079 try:
1080 f = super().__del__
1081 except AttributeError:
1082 pass
1083 else:
1084 f()
1085 def close(self):
1086 record.append(2)
1087 super().close()
1088 def flush(self):
1089 record.append(3)
1090 super().flush()
1091 rawio = self.MockRawIO()
1092 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001094 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001095 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096
1097 def test_context_manager(self):
1098 # Test usability as a context manager
1099 rawio = self.MockRawIO()
1100 bufio = self.tp(rawio)
1101 def _with():
1102 with bufio:
1103 pass
1104 _with()
1105 # bufio should now be closed, and using it a second time should raise
1106 # a ValueError.
1107 self.assertRaises(ValueError, _with)
1108
1109 def test_error_through_destructor(self):
1110 # Test that the exception state is not modified by a destructor,
1111 # even if close() fails.
1112 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001113 with support.catch_unraisable_exception() as cm:
1114 with self.assertRaises(AttributeError):
1115 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001116
1117 if not IOBASE_EMITS_UNRAISABLE:
1118 self.assertIsNone(cm.unraisable)
1119 elif cm.unraisable is not None:
1120 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001121
Antoine Pitrou716c4442009-05-23 19:04:03 +00001122 def test_repr(self):
1123 raw = self.MockRawIO()
1124 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001125 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1126 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001127 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001128 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001129 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001130 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001131
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001132 def test_recursive_repr(self):
1133 # Issue #25455
1134 raw = self.MockRawIO()
1135 b = self.tp(raw)
1136 with support.swap_attr(raw, 'name', b):
1137 try:
1138 repr(b) # Should not crash
1139 except RuntimeError:
1140 pass
1141
Antoine Pitrou6be88762010-05-03 16:48:20 +00001142 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001143 # Test that buffered file is closed despite failed flush
1144 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001145 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001146 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001147 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001148 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001149 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001150 raw.flush = bad_flush
1151 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001152 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001153 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001154 self.assertTrue(raw.closed)
1155 self.assertTrue(closed) # flush() called
1156 self.assertFalse(closed[0]) # flush() called before file closed
1157 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001158 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001159
1160 def test_close_error_on_close(self):
1161 raw = self.MockRawIO()
1162 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001163 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001164 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001165 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001166 raw.close = bad_close
1167 b = self.tp(raw)
1168 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001169 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001170 b.close()
1171 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001172 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001173 self.assertEqual(err.exception.__context__.args, ('flush',))
1174 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001175
Victor Stinner472f7942019-04-12 21:58:24 +02001176 # Silence destructor error
1177 raw.close = lambda: None
1178 b.flush = lambda: None
1179
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001180 def test_nonnormalized_close_error_on_close(self):
1181 # Issue #21677
1182 raw = self.MockRawIO()
1183 def bad_flush():
1184 raise non_existing_flush
1185 def bad_close():
1186 raise non_existing_close
1187 raw.close = bad_close
1188 b = self.tp(raw)
1189 b.flush = bad_flush
1190 with self.assertRaises(NameError) as err: # exception not swallowed
1191 b.close()
1192 self.assertIn('non_existing_close', str(err.exception))
1193 self.assertIsInstance(err.exception.__context__, NameError)
1194 self.assertIn('non_existing_flush', str(err.exception.__context__))
1195 self.assertFalse(b.closed)
1196
Victor Stinner472f7942019-04-12 21:58:24 +02001197 # Silence destructor error
1198 b.flush = lambda: None
1199 raw.close = lambda: None
1200
Antoine Pitrou6be88762010-05-03 16:48:20 +00001201 def test_multi_close(self):
1202 raw = self.MockRawIO()
1203 b = self.tp(raw)
1204 b.close()
1205 b.close()
1206 b.close()
1207 self.assertRaises(ValueError, b.flush)
1208
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001209 def test_unseekable(self):
1210 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1211 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1212 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1213
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001214 def test_readonly_attributes(self):
1215 raw = self.MockRawIO()
1216 buf = self.tp(raw)
1217 x = self.MockRawIO()
1218 with self.assertRaises(AttributeError):
1219 buf.raw = x
1220
Guido van Rossum78892e42007-04-06 17:31:18 +00001221
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001222class SizeofTest:
1223
1224 @support.cpython_only
1225 def test_sizeof(self):
1226 bufsize1 = 4096
1227 bufsize2 = 8192
1228 rawio = self.MockRawIO()
1229 bufio = self.tp(rawio, buffer_size=bufsize1)
1230 size = sys.getsizeof(bufio) - bufsize1
1231 rawio = self.MockRawIO()
1232 bufio = self.tp(rawio, buffer_size=bufsize2)
1233 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1234
Jesus Ceadc469452012-10-04 12:37:56 +02001235 @support.cpython_only
1236 def test_buffer_freeing(self) :
1237 bufsize = 4096
1238 rawio = self.MockRawIO()
1239 bufio = self.tp(rawio, buffer_size=bufsize)
1240 size = sys.getsizeof(bufio) - bufsize
1241 bufio.close()
1242 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1245 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247 def test_constructor(self):
1248 rawio = self.MockRawIO([b"abc"])
1249 bufio = self.tp(rawio)
1250 bufio.__init__(rawio)
1251 bufio.__init__(rawio, buffer_size=1024)
1252 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001253 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001254 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1255 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1256 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1257 rawio = self.MockRawIO([b"abc"])
1258 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001259 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001260
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001261 def test_uninitialized(self):
1262 bufio = self.tp.__new__(self.tp)
1263 del bufio
1264 bufio = self.tp.__new__(self.tp)
1265 self.assertRaisesRegex((ValueError, AttributeError),
1266 'uninitialized|has no attribute',
1267 bufio.read, 0)
1268 bufio.__init__(self.MockRawIO())
1269 self.assertEqual(bufio.read(0), b'')
1270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001272 for arg in (None, 7):
1273 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1274 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001275 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001276 # Invalid args
1277 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 def test_read1(self):
1280 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1281 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001282 self.assertEqual(b"a", bufio.read(1))
1283 self.assertEqual(b"b", bufio.read1(1))
1284 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001285 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001286 self.assertEqual(b"c", bufio.read1(100))
1287 self.assertEqual(rawio._reads, 1)
1288 self.assertEqual(b"d", bufio.read1(100))
1289 self.assertEqual(rawio._reads, 2)
1290 self.assertEqual(b"efg", bufio.read1(100))
1291 self.assertEqual(rawio._reads, 3)
1292 self.assertEqual(b"", bufio.read1(100))
1293 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001294
1295 def test_read1_arbitrary(self):
1296 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1297 bufio = self.tp(rawio)
1298 self.assertEqual(b"a", bufio.read(1))
1299 self.assertEqual(b"bc", bufio.read1())
1300 self.assertEqual(b"d", bufio.read1())
1301 self.assertEqual(b"efg", bufio.read1(-1))
1302 self.assertEqual(rawio._reads, 3)
1303 self.assertEqual(b"", bufio.read1())
1304 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305
1306 def test_readinto(self):
1307 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1308 bufio = self.tp(rawio)
1309 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001310 self.assertEqual(bufio.readinto(b), 2)
1311 self.assertEqual(b, b"ab")
1312 self.assertEqual(bufio.readinto(b), 2)
1313 self.assertEqual(b, b"cd")
1314 self.assertEqual(bufio.readinto(b), 2)
1315 self.assertEqual(b, b"ef")
1316 self.assertEqual(bufio.readinto(b), 1)
1317 self.assertEqual(b, b"gf")
1318 self.assertEqual(bufio.readinto(b), 0)
1319 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001320 rawio = self.MockRawIO((b"abc", None))
1321 bufio = self.tp(rawio)
1322 self.assertEqual(bufio.readinto(b), 2)
1323 self.assertEqual(b, b"ab")
1324 self.assertEqual(bufio.readinto(b), 1)
1325 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001326
Benjamin Petersona96fea02014-06-22 14:17:44 -07001327 def test_readinto1(self):
1328 buffer_size = 10
1329 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1330 bufio = self.tp(rawio, buffer_size=buffer_size)
1331 b = bytearray(2)
1332 self.assertEqual(bufio.peek(3), b'abc')
1333 self.assertEqual(rawio._reads, 1)
1334 self.assertEqual(bufio.readinto1(b), 2)
1335 self.assertEqual(b, b"ab")
1336 self.assertEqual(rawio._reads, 1)
1337 self.assertEqual(bufio.readinto1(b), 1)
1338 self.assertEqual(b[:1], b"c")
1339 self.assertEqual(rawio._reads, 1)
1340 self.assertEqual(bufio.readinto1(b), 2)
1341 self.assertEqual(b, b"de")
1342 self.assertEqual(rawio._reads, 2)
1343 b = bytearray(2*buffer_size)
1344 self.assertEqual(bufio.peek(3), b'fgh')
1345 self.assertEqual(rawio._reads, 3)
1346 self.assertEqual(bufio.readinto1(b), 6)
1347 self.assertEqual(b[:6], b"fghjkl")
1348 self.assertEqual(rawio._reads, 4)
1349
1350 def test_readinto_array(self):
1351 buffer_size = 60
1352 data = b"a" * 26
1353 rawio = self.MockRawIO((data,))
1354 bufio = self.tp(rawio, buffer_size=buffer_size)
1355
1356 # Create an array with element size > 1 byte
1357 b = array.array('i', b'x' * 32)
1358 assert len(b) != 16
1359
1360 # Read into it. We should get as many *bytes* as we can fit into b
1361 # (which is more than the number of elements)
1362 n = bufio.readinto(b)
1363 self.assertGreater(n, len(b))
1364
1365 # Check that old contents of b are preserved
1366 bm = memoryview(b).cast('B')
1367 self.assertLess(n, len(bm))
1368 self.assertEqual(bm[:n], data[:n])
1369 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1370
1371 def test_readinto1_array(self):
1372 buffer_size = 60
1373 data = b"a" * 26
1374 rawio = self.MockRawIO((data,))
1375 bufio = self.tp(rawio, buffer_size=buffer_size)
1376
1377 # Create an array with element size > 1 byte
1378 b = array.array('i', b'x' * 32)
1379 assert len(b) != 16
1380
1381 # Read into it. We should get as many *bytes* as we can fit into b
1382 # (which is more than the number of elements)
1383 n = bufio.readinto1(b)
1384 self.assertGreater(n, len(b))
1385
1386 # Check that old contents of b are preserved
1387 bm = memoryview(b).cast('B')
1388 self.assertLess(n, len(bm))
1389 self.assertEqual(bm[:n], data[:n])
1390 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1391
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001392 def test_readlines(self):
1393 def bufio():
1394 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1395 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001396 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1397 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1398 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001399
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001400 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001401 data = b"abcdefghi"
1402 dlen = len(data)
1403
1404 tests = [
1405 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1406 [ 100, [ 3, 3, 3], [ dlen ] ],
1407 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1408 ]
1409
1410 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411 rawio = self.MockFileIO(data)
1412 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001413 pos = 0
1414 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001415 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001416 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001417 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001418 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001419
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001420 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001421 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001422 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1423 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001424 self.assertEqual(b"abcd", bufio.read(6))
1425 self.assertEqual(b"e", bufio.read(1))
1426 self.assertEqual(b"fg", bufio.read())
1427 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001428 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001429 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001430
Victor Stinnera80987f2011-05-25 22:47:16 +02001431 rawio = self.MockRawIO((b"a", None, None))
1432 self.assertEqual(b"a", rawio.readall())
1433 self.assertIsNone(rawio.readall())
1434
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435 def test_read_past_eof(self):
1436 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1437 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001438
Ezio Melottib3aedd42010-11-20 19:04:17 +00001439 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001440
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001441 def test_read_all(self):
1442 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1443 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001444
Ezio Melottib3aedd42010-11-20 19:04:17 +00001445 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001446
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001447 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001449 try:
1450 # Write out many bytes with exactly the same number of 0's,
1451 # 1's... 255's. This will help us check that concurrent reading
1452 # doesn't duplicate or forget contents.
1453 N = 1000
1454 l = list(range(256)) * N
1455 random.shuffle(l)
1456 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001457 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001458 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001459 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001460 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001461 errors = []
1462 results = []
1463 def f():
1464 try:
1465 # Intra-buffer read then buffer-flushing read
1466 for n in cycle([1, 19]):
1467 s = bufio.read(n)
1468 if not s:
1469 break
1470 # list.append() is atomic
1471 results.append(s)
1472 except Exception as e:
1473 errors.append(e)
1474 raise
1475 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001476 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001477 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001478 self.assertFalse(errors,
1479 "the following exceptions were caught: %r" % errors)
1480 s = b''.join(results)
1481 for i in range(256):
1482 c = bytes(bytearray([i]))
1483 self.assertEqual(s.count(c), N)
1484 finally:
1485 support.unlink(support.TESTFN)
1486
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001487 def test_unseekable(self):
1488 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1489 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1490 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1491 bufio.read(1)
1492 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1493 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001495 def test_misbehaved_io(self):
1496 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1497 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001498 self.assertRaises(OSError, bufio.seek, 0)
1499 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500
Victor Stinnerb589cef2019-06-11 03:10:59 +02001501 # Silence destructor error
1502 bufio.close = lambda: None
1503
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001504 def test_no_extraneous_read(self):
1505 # Issue #9550; when the raw IO object has satisfied the read request,
1506 # we should not issue any additional reads, otherwise it may block
1507 # (e.g. socket).
1508 bufsize = 16
1509 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1510 rawio = self.MockRawIO([b"x" * n])
1511 bufio = self.tp(rawio, bufsize)
1512 self.assertEqual(bufio.read(n), b"x" * n)
1513 # Simple case: one raw read is enough to satisfy the request.
1514 self.assertEqual(rawio._extraneous_reads, 0,
1515 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1516 # A more complex case where two raw reads are needed to satisfy
1517 # the request.
1518 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1519 bufio = self.tp(rawio, bufsize)
1520 self.assertEqual(bufio.read(n), b"x" * n)
1521 self.assertEqual(rawio._extraneous_reads, 0,
1522 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1523
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001524 def test_read_on_closed(self):
1525 # Issue #23796
1526 b = io.BufferedReader(io.BytesIO(b"12"))
1527 b.read(1)
1528 b.close()
1529 self.assertRaises(ValueError, b.peek)
1530 self.assertRaises(ValueError, b.read1, 1)
1531
Berker Peksagfd5116c2020-02-21 20:57:26 +03001532 def test_truncate_on_read_only(self):
1533 rawio = self.MockFileIO(b"abc")
1534 bufio = self.tp(rawio)
1535 self.assertFalse(bufio.writable())
1536 self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1537 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1538
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001539
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001540class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541 tp = io.BufferedReader
1542
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001543 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1544 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001545 def test_constructor(self):
1546 BufferedReaderTest.test_constructor(self)
1547 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001548 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001549 if sys.maxsize > 0x7FFFFFFF:
1550 rawio = self.MockRawIO()
1551 bufio = self.tp(rawio)
1552 self.assertRaises((OverflowError, MemoryError, ValueError),
1553 bufio.__init__, rawio, sys.maxsize)
1554
1555 def test_initialization(self):
1556 rawio = self.MockRawIO([b"abc"])
1557 bufio = self.tp(rawio)
1558 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1559 self.assertRaises(ValueError, bufio.read)
1560 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1561 self.assertRaises(ValueError, bufio.read)
1562 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1563 self.assertRaises(ValueError, bufio.read)
1564
1565 def test_misbehaved_io_read(self):
1566 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1567 bufio = self.tp(rawio)
1568 # _pyio.BufferedReader seems to implement reading different, so that
1569 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001570 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571
1572 def test_garbage_collection(self):
1573 # C BufferedReader objects are collected.
1574 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001575 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001576 with support.check_warnings(('', ResourceWarning)):
1577 rawio = self.FileIO(support.TESTFN, "w+b")
1578 f = self.tp(rawio)
1579 f.f = f
1580 wr = weakref.ref(f)
1581 del f
1582 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001583 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001584
R David Murray67bfe802013-02-23 21:51:05 -05001585 def test_args_error(self):
1586 # Issue #17275
1587 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1588 self.tp(io.BytesIO(), 1024, 1024, 1024)
1589
1590
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001591class PyBufferedReaderTest(BufferedReaderTest):
1592 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001593
Guido van Rossuma9e20242007-03-08 00:43:48 +00001594
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1596 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001597
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598 def test_constructor(self):
1599 rawio = self.MockRawIO()
1600 bufio = self.tp(rawio)
1601 bufio.__init__(rawio)
1602 bufio.__init__(rawio, buffer_size=1024)
1603 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001604 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001605 bufio.flush()
1606 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1607 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1608 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1609 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001610 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001611 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001612 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001613
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001614 def test_uninitialized(self):
1615 bufio = self.tp.__new__(self.tp)
1616 del bufio
1617 bufio = self.tp.__new__(self.tp)
1618 self.assertRaisesRegex((ValueError, AttributeError),
1619 'uninitialized|has no attribute',
1620 bufio.write, b'')
1621 bufio.__init__(self.MockRawIO())
1622 self.assertEqual(bufio.write(b''), 0)
1623
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001624 def test_detach_flush(self):
1625 raw = self.MockRawIO()
1626 buf = self.tp(raw)
1627 buf.write(b"howdy!")
1628 self.assertFalse(raw._write_stack)
1629 buf.detach()
1630 self.assertEqual(raw._write_stack, [b"howdy!"])
1631
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001632 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001633 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 writer = self.MockRawIO()
1635 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001636 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001637 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001638 buffer = bytearray(b"def")
1639 bufio.write(buffer)
1640 buffer[:] = b"***" # Overwrite our copy of the data
1641 bufio.flush()
1642 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001643
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001644 def test_write_overflow(self):
1645 writer = self.MockRawIO()
1646 bufio = self.tp(writer, 8)
1647 contents = b"abcdefghijklmnop"
1648 for n in range(0, len(contents), 3):
1649 bufio.write(contents[n:n+3])
1650 flushed = b"".join(writer._write_stack)
1651 # At least (total - 8) bytes were implicitly flushed, perhaps more
1652 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001653 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001654
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 def check_writes(self, intermediate_func):
1656 # Lots of writes, test the flushed output is as expected.
1657 contents = bytes(range(256)) * 1000
1658 n = 0
1659 writer = self.MockRawIO()
1660 bufio = self.tp(writer, 13)
1661 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1662 def gen_sizes():
1663 for size in count(1):
1664 for i in range(15):
1665 yield size
1666 sizes = gen_sizes()
1667 while n < len(contents):
1668 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001669 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 intermediate_func(bufio)
1671 n += size
1672 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001673 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001675 def test_writes(self):
1676 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001677
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001678 def test_writes_and_flushes(self):
1679 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001680
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001681 def test_writes_and_seeks(self):
1682 def _seekabs(bufio):
1683 pos = bufio.tell()
1684 bufio.seek(pos + 1, 0)
1685 bufio.seek(pos - 1, 0)
1686 bufio.seek(pos, 0)
1687 self.check_writes(_seekabs)
1688 def _seekrel(bufio):
1689 pos = bufio.seek(0, 1)
1690 bufio.seek(+1, 1)
1691 bufio.seek(-1, 1)
1692 bufio.seek(pos, 0)
1693 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695 def test_writes_and_truncates(self):
1696 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001698 def test_write_non_blocking(self):
1699 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001700 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001701
Ezio Melottib3aedd42010-11-20 19:04:17 +00001702 self.assertEqual(bufio.write(b"abcd"), 4)
1703 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001704 # 1 byte will be written, the rest will be buffered
1705 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001706 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001707
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1709 raw.block_on(b"0")
1710 try:
1711 bufio.write(b"opqrwxyz0123456789")
1712 except self.BlockingIOError as e:
1713 written = e.characters_written
1714 else:
1715 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001716 self.assertEqual(written, 16)
1717 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001718 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001719
Ezio Melottib3aedd42010-11-20 19:04:17 +00001720 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001721 s = raw.pop_written()
1722 # Previously buffered bytes were flushed
1723 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001725 def test_write_and_rewind(self):
1726 raw = io.BytesIO()
1727 bufio = self.tp(raw, 4)
1728 self.assertEqual(bufio.write(b"abcdef"), 6)
1729 self.assertEqual(bufio.tell(), 6)
1730 bufio.seek(0, 0)
1731 self.assertEqual(bufio.write(b"XY"), 2)
1732 bufio.seek(6, 0)
1733 self.assertEqual(raw.getvalue(), b"XYcdef")
1734 self.assertEqual(bufio.write(b"123456"), 6)
1735 bufio.flush()
1736 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001737
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001738 def test_flush(self):
1739 writer = self.MockRawIO()
1740 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001741 bufio.write(b"abc")
1742 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001743 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001744
Antoine Pitrou131a4892012-10-16 22:57:11 +02001745 def test_writelines(self):
1746 l = [b'ab', b'cd', b'ef']
1747 writer = self.MockRawIO()
1748 bufio = self.tp(writer, 8)
1749 bufio.writelines(l)
1750 bufio.flush()
1751 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1752
1753 def test_writelines_userlist(self):
1754 l = UserList([b'ab', b'cd', b'ef'])
1755 writer = self.MockRawIO()
1756 bufio = self.tp(writer, 8)
1757 bufio.writelines(l)
1758 bufio.flush()
1759 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1760
1761 def test_writelines_error(self):
1762 writer = self.MockRawIO()
1763 bufio = self.tp(writer, 8)
1764 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1765 self.assertRaises(TypeError, bufio.writelines, None)
1766 self.assertRaises(TypeError, bufio.writelines, 'abc')
1767
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001768 def test_destructor(self):
1769 writer = self.MockRawIO()
1770 bufio = self.tp(writer, 8)
1771 bufio.write(b"abc")
1772 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001773 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001774 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775
1776 def test_truncate(self):
1777 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001778 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001779 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001780 bufio = self.tp(raw, 8)
1781 bufio.write(b"abcdef")
1782 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001783 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001784 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001785 self.assertEqual(f.read(), b"abc")
1786
Nitish Chandra059f58c2018-01-28 21:30:09 +05301787 def test_truncate_after_write(self):
1788 # Ensure that truncate preserves the file position after
1789 # writes longer than the buffer size.
1790 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001791 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301792 with self.open(support.TESTFN, "wb") as f:
1793 # Fill with some buffer
1794 f.write(b'\x00' * 10000)
1795 buffer_sizes = [8192, 4096, 200]
1796 for buffer_size in buffer_sizes:
1797 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1798 f.write(b'\x00' * (buffer_size + 1))
1799 # After write write_pos and write_end are set to 0
1800 f.read(1)
1801 # read operation makes sure that pos != raw_pos
1802 f.truncate()
1803 self.assertEqual(f.tell(), buffer_size + 2)
1804
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001805 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001806 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001807 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001808 # Write out many bytes from many threads and test they were
1809 # all flushed.
1810 N = 1000
1811 contents = bytes(range(256)) * N
1812 sizes = cycle([1, 19])
1813 n = 0
1814 queue = deque()
1815 while n < len(contents):
1816 size = next(sizes)
1817 queue.append(contents[n:n+size])
1818 n += size
1819 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001820 # We use a real file object because it allows us to
1821 # exercise situations where the GIL is released before
1822 # writing the buffer to the raw streams. This is in addition
1823 # to concurrency issues due to switching threads in the middle
1824 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001825 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001826 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001827 errors = []
1828 def f():
1829 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001830 while True:
1831 try:
1832 s = queue.popleft()
1833 except IndexError:
1834 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001835 bufio.write(s)
1836 except Exception as e:
1837 errors.append(e)
1838 raise
1839 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001840 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001841 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001842 self.assertFalse(errors,
1843 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001844 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001845 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001846 s = f.read()
1847 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001848 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001849 finally:
1850 support.unlink(support.TESTFN)
1851
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001852 def test_misbehaved_io(self):
1853 rawio = self.MisbehavedRawIO()
1854 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001855 self.assertRaises(OSError, bufio.seek, 0)
1856 self.assertRaises(OSError, bufio.tell)
1857 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001858
Victor Stinnerb589cef2019-06-11 03:10:59 +02001859 # Silence destructor error
1860 bufio.close = lambda: None
1861
Florent Xicluna109d5732012-07-07 17:03:22 +02001862 def test_max_buffer_size_removal(self):
1863 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001864 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001865
Benjamin Peterson68623612012-12-20 11:53:11 -06001866 def test_write_error_on_close(self):
1867 raw = self.MockRawIO()
1868 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001869 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001870 raw.write = bad_write
1871 b = self.tp(raw)
1872 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001873 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001874 self.assertTrue(b.closed)
1875
benfogle9703f092017-11-10 16:03:40 -05001876 def test_slow_close_from_thread(self):
1877 # Issue #31976
1878 rawio = self.SlowFlushRawIO()
1879 bufio = self.tp(rawio, 8)
1880 t = threading.Thread(target=bufio.close)
1881 t.start()
1882 rawio.in_flush.wait()
1883 self.assertRaises(ValueError, bufio.write, b'spam')
1884 self.assertTrue(bufio.closed)
1885 t.join()
1886
1887
Benjamin Peterson59406a92009-03-26 17:10:29 +00001888
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001889class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 tp = io.BufferedWriter
1891
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001892 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1893 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001894 def test_constructor(self):
1895 BufferedWriterTest.test_constructor(self)
1896 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001897 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001898 if sys.maxsize > 0x7FFFFFFF:
1899 rawio = self.MockRawIO()
1900 bufio = self.tp(rawio)
1901 self.assertRaises((OverflowError, MemoryError, ValueError),
1902 bufio.__init__, rawio, sys.maxsize)
1903
1904 def test_initialization(self):
1905 rawio = self.MockRawIO()
1906 bufio = self.tp(rawio)
1907 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1908 self.assertRaises(ValueError, bufio.write, b"def")
1909 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1910 self.assertRaises(ValueError, bufio.write, b"def")
1911 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1912 self.assertRaises(ValueError, bufio.write, b"def")
1913
1914 def test_garbage_collection(self):
1915 # C BufferedWriter objects are collected, and collecting them flushes
1916 # all data to disk.
1917 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001918 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001919 with support.check_warnings(('', ResourceWarning)):
1920 rawio = self.FileIO(support.TESTFN, "w+b")
1921 f = self.tp(rawio)
1922 f.write(b"123xxx")
1923 f.x = f
1924 wr = weakref.ref(f)
1925 del f
1926 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001927 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001928 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001929 self.assertEqual(f.read(), b"123xxx")
1930
R David Murray67bfe802013-02-23 21:51:05 -05001931 def test_args_error(self):
1932 # Issue #17275
1933 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1934 self.tp(io.BytesIO(), 1024, 1024, 1024)
1935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001936
1937class PyBufferedWriterTest(BufferedWriterTest):
1938 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001939
Guido van Rossum01a27522007-03-07 01:00:12 +00001940class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001941
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001942 def test_constructor(self):
1943 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001944 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001945
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001946 def test_uninitialized(self):
1947 pair = self.tp.__new__(self.tp)
1948 del pair
1949 pair = self.tp.__new__(self.tp)
1950 self.assertRaisesRegex((ValueError, AttributeError),
1951 'uninitialized|has no attribute',
1952 pair.read, 0)
1953 self.assertRaisesRegex((ValueError, AttributeError),
1954 'uninitialized|has no attribute',
1955 pair.write, b'')
1956 pair.__init__(self.MockRawIO(), self.MockRawIO())
1957 self.assertEqual(pair.read(0), b'')
1958 self.assertEqual(pair.write(b''), 0)
1959
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001960 def test_detach(self):
1961 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1962 self.assertRaises(self.UnsupportedOperation, pair.detach)
1963
Florent Xicluna109d5732012-07-07 17:03:22 +02001964 def test_constructor_max_buffer_size_removal(self):
1965 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001966 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001967
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001968 def test_constructor_with_not_readable(self):
1969 class NotReadable(MockRawIO):
1970 def readable(self):
1971 return False
1972
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001973 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001974
1975 def test_constructor_with_not_writeable(self):
1976 class NotWriteable(MockRawIO):
1977 def writable(self):
1978 return False
1979
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001980 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001981
1982 def test_read(self):
1983 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1984
1985 self.assertEqual(pair.read(3), b"abc")
1986 self.assertEqual(pair.read(1), b"d")
1987 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001988 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1989 self.assertEqual(pair.read(None), b"abc")
1990
1991 def test_readlines(self):
1992 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1993 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1994 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1995 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001996
1997 def test_read1(self):
1998 # .read1() is delegated to the underlying reader object, so this test
1999 # can be shallow.
2000 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2001
2002 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00002003 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002004
2005 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00002006 for method in ("readinto", "readinto1"):
2007 with self.subTest(method):
2008 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002009
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002010 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002011 self.assertEqual(getattr(pair, method)(data), 5)
2012 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002013
2014 def test_write(self):
2015 w = self.MockRawIO()
2016 pair = self.tp(self.MockRawIO(), w)
2017
2018 pair.write(b"abc")
2019 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002020 buffer = bytearray(b"def")
2021 pair.write(buffer)
2022 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002023 pair.flush()
2024 self.assertEqual(w._write_stack, [b"abc", b"def"])
2025
2026 def test_peek(self):
2027 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2028
2029 self.assertTrue(pair.peek(3).startswith(b"abc"))
2030 self.assertEqual(pair.read(3), b"abc")
2031
2032 def test_readable(self):
2033 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2034 self.assertTrue(pair.readable())
2035
2036 def test_writeable(self):
2037 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2038 self.assertTrue(pair.writable())
2039
2040 def test_seekable(self):
2041 # BufferedRWPairs are never seekable, even if their readers and writers
2042 # are.
2043 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2044 self.assertFalse(pair.seekable())
2045
2046 # .flush() is delegated to the underlying writer object and has been
2047 # tested in the test_write method.
2048
2049 def test_close_and_closed(self):
2050 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2051 self.assertFalse(pair.closed)
2052 pair.close()
2053 self.assertTrue(pair.closed)
2054
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002055 def test_reader_close_error_on_close(self):
2056 def reader_close():
2057 reader_non_existing
2058 reader = self.MockRawIO()
2059 reader.close = reader_close
2060 writer = self.MockRawIO()
2061 pair = self.tp(reader, writer)
2062 with self.assertRaises(NameError) as err:
2063 pair.close()
2064 self.assertIn('reader_non_existing', str(err.exception))
2065 self.assertTrue(pair.closed)
2066 self.assertFalse(reader.closed)
2067 self.assertTrue(writer.closed)
2068
Victor Stinner472f7942019-04-12 21:58:24 +02002069 # Silence destructor error
2070 reader.close = lambda: None
2071
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002072 def test_writer_close_error_on_close(self):
2073 def writer_close():
2074 writer_non_existing
2075 reader = self.MockRawIO()
2076 writer = self.MockRawIO()
2077 writer.close = writer_close
2078 pair = self.tp(reader, writer)
2079 with self.assertRaises(NameError) as err:
2080 pair.close()
2081 self.assertIn('writer_non_existing', str(err.exception))
2082 self.assertFalse(pair.closed)
2083 self.assertTrue(reader.closed)
2084 self.assertFalse(writer.closed)
2085
Victor Stinner472f7942019-04-12 21:58:24 +02002086 # Silence destructor error
2087 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002088 writer = None
2089
Victor Stinner212646c2019-06-14 18:03:22 +02002090 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002091 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002092 # Ignore BufferedRWPair unraisable exception
2093 with support.catch_unraisable_exception():
2094 pair = None
2095 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002096 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002097
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002098 def test_reader_writer_close_error_on_close(self):
2099 def reader_close():
2100 reader_non_existing
2101 def writer_close():
2102 writer_non_existing
2103 reader = self.MockRawIO()
2104 reader.close = reader_close
2105 writer = self.MockRawIO()
2106 writer.close = writer_close
2107 pair = self.tp(reader, writer)
2108 with self.assertRaises(NameError) as err:
2109 pair.close()
2110 self.assertIn('reader_non_existing', str(err.exception))
2111 self.assertIsInstance(err.exception.__context__, NameError)
2112 self.assertIn('writer_non_existing', str(err.exception.__context__))
2113 self.assertFalse(pair.closed)
2114 self.assertFalse(reader.closed)
2115 self.assertFalse(writer.closed)
2116
Victor Stinner472f7942019-04-12 21:58:24 +02002117 # Silence destructor error
2118 reader.close = lambda: None
2119 writer.close = lambda: None
2120
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002121 def test_isatty(self):
2122 class SelectableIsAtty(MockRawIO):
2123 def __init__(self, isatty):
2124 MockRawIO.__init__(self)
2125 self._isatty = isatty
2126
2127 def isatty(self):
2128 return self._isatty
2129
2130 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2131 self.assertFalse(pair.isatty())
2132
2133 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2134 self.assertTrue(pair.isatty())
2135
2136 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2137 self.assertTrue(pair.isatty())
2138
2139 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2140 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002141
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002142 def test_weakref_clearing(self):
2143 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2144 ref = weakref.ref(brw)
2145 brw = None
2146 ref = None # Shouldn't segfault.
2147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002148class CBufferedRWPairTest(BufferedRWPairTest):
2149 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002150
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151class PyBufferedRWPairTest(BufferedRWPairTest):
2152 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154
2155class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2156 read_mode = "rb+"
2157 write_mode = "wb+"
2158
2159 def test_constructor(self):
2160 BufferedReaderTest.test_constructor(self)
2161 BufferedWriterTest.test_constructor(self)
2162
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002163 def test_uninitialized(self):
2164 BufferedReaderTest.test_uninitialized(self)
2165 BufferedWriterTest.test_uninitialized(self)
2166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 def test_read_and_write(self):
2168 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002169 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002170
2171 self.assertEqual(b"as", rw.read(2))
2172 rw.write(b"ddd")
2173 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002174 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002175 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002176 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002177
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002178 def test_seek_and_tell(self):
2179 raw = self.BytesIO(b"asdfghjkl")
2180 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002181
Ezio Melottib3aedd42010-11-20 19:04:17 +00002182 self.assertEqual(b"as", rw.read(2))
2183 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002184 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002185 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002186
Antoine Pitroue05565e2011-08-20 14:39:23 +02002187 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002188 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002189 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002190 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002191 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002193 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002194 self.assertEqual(7, rw.tell())
2195 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002196 rw.flush()
2197 self.assertEqual(b"asdf123fl", raw.getvalue())
2198
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002199 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002201 def check_flush_and_read(self, read_func):
2202 raw = self.BytesIO(b"abcdefghi")
2203 bufio = self.tp(raw)
2204
Ezio Melottib3aedd42010-11-20 19:04:17 +00002205 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002206 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002207 self.assertEqual(b"ef", read_func(bufio, 2))
2208 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002209 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002210 self.assertEqual(6, bufio.tell())
2211 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002212 raw.seek(0, 0)
2213 raw.write(b"XYZ")
2214 # flush() resets the read buffer
2215 bufio.flush()
2216 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002217 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002218
2219 def test_flush_and_read(self):
2220 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2221
2222 def test_flush_and_readinto(self):
2223 def _readinto(bufio, n=-1):
2224 b = bytearray(n if n >= 0 else 9999)
2225 n = bufio.readinto(b)
2226 return bytes(b[:n])
2227 self.check_flush_and_read(_readinto)
2228
2229 def test_flush_and_peek(self):
2230 def _peek(bufio, n=-1):
2231 # This relies on the fact that the buffer can contain the whole
2232 # raw stream, otherwise peek() can return less.
2233 b = bufio.peek(n)
2234 if n != -1:
2235 b = b[:n]
2236 bufio.seek(len(b), 1)
2237 return b
2238 self.check_flush_and_read(_peek)
2239
2240 def test_flush_and_write(self):
2241 raw = self.BytesIO(b"abcdefghi")
2242 bufio = self.tp(raw)
2243
2244 bufio.write(b"123")
2245 bufio.flush()
2246 bufio.write(b"45")
2247 bufio.flush()
2248 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002249 self.assertEqual(b"12345fghi", raw.getvalue())
2250 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002251
2252 def test_threads(self):
2253 BufferedReaderTest.test_threads(self)
2254 BufferedWriterTest.test_threads(self)
2255
2256 def test_writes_and_peek(self):
2257 def _peek(bufio):
2258 bufio.peek(1)
2259 self.check_writes(_peek)
2260 def _peek(bufio):
2261 pos = bufio.tell()
2262 bufio.seek(-1, 1)
2263 bufio.peek(1)
2264 bufio.seek(pos, 0)
2265 self.check_writes(_peek)
2266
2267 def test_writes_and_reads(self):
2268 def _read(bufio):
2269 bufio.seek(-1, 1)
2270 bufio.read(1)
2271 self.check_writes(_read)
2272
2273 def test_writes_and_read1s(self):
2274 def _read1(bufio):
2275 bufio.seek(-1, 1)
2276 bufio.read1(1)
2277 self.check_writes(_read1)
2278
2279 def test_writes_and_readintos(self):
2280 def _read(bufio):
2281 bufio.seek(-1, 1)
2282 bufio.readinto(bytearray(1))
2283 self.check_writes(_read)
2284
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002285 def test_write_after_readahead(self):
2286 # Issue #6629: writing after the buffer was filled by readahead should
2287 # first rewind the raw stream.
2288 for overwrite_size in [1, 5]:
2289 raw = self.BytesIO(b"A" * 10)
2290 bufio = self.tp(raw, 4)
2291 # Trigger readahead
2292 self.assertEqual(bufio.read(1), b"A")
2293 self.assertEqual(bufio.tell(), 1)
2294 # Overwriting should rewind the raw stream if it needs so
2295 bufio.write(b"B" * overwrite_size)
2296 self.assertEqual(bufio.tell(), overwrite_size + 1)
2297 # If the write size was smaller than the buffer size, flush() and
2298 # check that rewind happens.
2299 bufio.flush()
2300 self.assertEqual(bufio.tell(), overwrite_size + 1)
2301 s = raw.getvalue()
2302 self.assertEqual(s,
2303 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2304
Antoine Pitrou7c404892011-05-13 00:13:33 +02002305 def test_write_rewind_write(self):
2306 # Various combinations of reading / writing / seeking backwards / writing again
2307 def mutate(bufio, pos1, pos2):
2308 assert pos2 >= pos1
2309 # Fill the buffer
2310 bufio.seek(pos1)
2311 bufio.read(pos2 - pos1)
2312 bufio.write(b'\x02')
2313 # This writes earlier than the previous write, but still inside
2314 # the buffer.
2315 bufio.seek(pos1)
2316 bufio.write(b'\x01')
2317
2318 b = b"\x80\x81\x82\x83\x84"
2319 for i in range(0, len(b)):
2320 for j in range(i, len(b)):
2321 raw = self.BytesIO(b)
2322 bufio = self.tp(raw, 100)
2323 mutate(bufio, i, j)
2324 bufio.flush()
2325 expected = bytearray(b)
2326 expected[j] = 2
2327 expected[i] = 1
2328 self.assertEqual(raw.getvalue(), expected,
2329 "failed result for i=%d, j=%d" % (i, j))
2330
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002331 def test_truncate_after_read_or_write(self):
2332 raw = self.BytesIO(b"A" * 10)
2333 bufio = self.tp(raw, 100)
2334 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2335 self.assertEqual(bufio.truncate(), 2)
2336 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2337 self.assertEqual(bufio.truncate(), 4)
2338
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002339 def test_misbehaved_io(self):
2340 BufferedReaderTest.test_misbehaved_io(self)
2341 BufferedWriterTest.test_misbehaved_io(self)
2342
Antoine Pitroue05565e2011-08-20 14:39:23 +02002343 def test_interleaved_read_write(self):
2344 # Test for issue #12213
2345 with self.BytesIO(b'abcdefgh') as raw:
2346 with self.tp(raw, 100) as f:
2347 f.write(b"1")
2348 self.assertEqual(f.read(1), b'b')
2349 f.write(b'2')
2350 self.assertEqual(f.read1(1), b'd')
2351 f.write(b'3')
2352 buf = bytearray(1)
2353 f.readinto(buf)
2354 self.assertEqual(buf, b'f')
2355 f.write(b'4')
2356 self.assertEqual(f.peek(1), b'h')
2357 f.flush()
2358 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2359
2360 with self.BytesIO(b'abc') as raw:
2361 with self.tp(raw, 100) as f:
2362 self.assertEqual(f.read(1), b'a')
2363 f.write(b"2")
2364 self.assertEqual(f.read(1), b'c')
2365 f.flush()
2366 self.assertEqual(raw.getvalue(), b'a2c')
2367
2368 def test_interleaved_readline_write(self):
2369 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2370 with self.tp(raw) as f:
2371 f.write(b'1')
2372 self.assertEqual(f.readline(), b'b\n')
2373 f.write(b'2')
2374 self.assertEqual(f.readline(), b'def\n')
2375 f.write(b'3')
2376 self.assertEqual(f.readline(), b'\n')
2377 f.flush()
2378 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2379
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002380 # You can't construct a BufferedRandom over a non-seekable stream.
2381 test_unseekable = None
2382
Berker Peksagfd5116c2020-02-21 20:57:26 +03002383 # writable() returns True, so there's no point to test it over
2384 # a writable stream.
2385 test_truncate_on_read_only = None
2386
R David Murray67bfe802013-02-23 21:51:05 -05002387
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002388class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002389 tp = io.BufferedRandom
2390
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002391 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2392 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002393 def test_constructor(self):
2394 BufferedRandomTest.test_constructor(self)
2395 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002396 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002397 if sys.maxsize > 0x7FFFFFFF:
2398 rawio = self.MockRawIO()
2399 bufio = self.tp(rawio)
2400 self.assertRaises((OverflowError, MemoryError, ValueError),
2401 bufio.__init__, rawio, sys.maxsize)
2402
2403 def test_garbage_collection(self):
2404 CBufferedReaderTest.test_garbage_collection(self)
2405 CBufferedWriterTest.test_garbage_collection(self)
2406
R David Murray67bfe802013-02-23 21:51:05 -05002407 def test_args_error(self):
2408 # Issue #17275
2409 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2410 self.tp(io.BytesIO(), 1024, 1024, 1024)
2411
2412
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002413class PyBufferedRandomTest(BufferedRandomTest):
2414 tp = pyio.BufferedRandom
2415
2416
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002417# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2418# properties:
2419# - A single output character can correspond to many bytes of input.
2420# - The number of input bytes to complete the character can be
2421# undetermined until the last input byte is received.
2422# - The number of input bytes can vary depending on previous input.
2423# - A single input byte can correspond to many characters of output.
2424# - The number of output characters can be undetermined until the
2425# last input byte is received.
2426# - The number of output characters can vary depending on previous input.
2427
2428class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2429 """
2430 For testing seek/tell behavior with a stateful, buffering decoder.
2431
2432 Input is a sequence of words. Words may be fixed-length (length set
2433 by input) or variable-length (period-terminated). In variable-length
2434 mode, extra periods are ignored. Possible words are:
2435 - 'i' followed by a number sets the input length, I (maximum 99).
2436 When I is set to 0, words are space-terminated.
2437 - 'o' followed by a number sets the output length, O (maximum 99).
2438 - Any other word is converted into a word followed by a period on
2439 the output. The output word consists of the input word truncated
2440 or padded out with hyphens to make its length equal to O. If O
2441 is 0, the word is output verbatim without truncating or padding.
2442 I and O are initially set to 1. When I changes, any buffered input is
2443 re-scanned according to the new I. EOF also terminates the last word.
2444 """
2445
2446 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002447 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002448 self.reset()
2449
2450 def __repr__(self):
2451 return '<SID %x>' % id(self)
2452
2453 def reset(self):
2454 self.i = 1
2455 self.o = 1
2456 self.buffer = bytearray()
2457
2458 def getstate(self):
2459 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2460 return bytes(self.buffer), i*100 + o
2461
2462 def setstate(self, state):
2463 buffer, io = state
2464 self.buffer = bytearray(buffer)
2465 i, o = divmod(io, 100)
2466 self.i, self.o = i ^ 1, o ^ 1
2467
2468 def decode(self, input, final=False):
2469 output = ''
2470 for b in input:
2471 if self.i == 0: # variable-length, terminated with period
2472 if b == ord('.'):
2473 if self.buffer:
2474 output += self.process_word()
2475 else:
2476 self.buffer.append(b)
2477 else: # fixed-length, terminate after self.i bytes
2478 self.buffer.append(b)
2479 if len(self.buffer) == self.i:
2480 output += self.process_word()
2481 if final and self.buffer: # EOF terminates the last word
2482 output += self.process_word()
2483 return output
2484
2485 def process_word(self):
2486 output = ''
2487 if self.buffer[0] == ord('i'):
2488 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2489 elif self.buffer[0] == ord('o'):
2490 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2491 else:
2492 output = self.buffer.decode('ascii')
2493 if len(output) < self.o:
2494 output += '-'*self.o # pad out with hyphens
2495 if self.o:
2496 output = output[:self.o] # truncate to output length
2497 output += '.'
2498 self.buffer = bytearray()
2499 return output
2500
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002501 codecEnabled = False
2502
2503 @classmethod
2504 def lookupTestDecoder(cls, name):
2505 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002506 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002507 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002508 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002509 incrementalencoder=None,
2510 streamreader=None, streamwriter=None,
2511 incrementaldecoder=cls)
2512
2513# Register the previous decoder for testing.
2514# Disabled by default, tests will enable it.
2515codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2516
2517
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002518class StatefulIncrementalDecoderTest(unittest.TestCase):
2519 """
2520 Make sure the StatefulIncrementalDecoder actually works.
2521 """
2522
2523 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002524 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002525 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002526 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002527 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002528 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002529 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002530 # I=0, O=6 (variable-length input, fixed-length output)
2531 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2532 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002533 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002534 # I=6, O=3 (fixed-length input > fixed-length output)
2535 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2536 # I=0, then 3; O=29, then 15 (with longer output)
2537 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2538 'a----------------------------.' +
2539 'b----------------------------.' +
2540 'cde--------------------------.' +
2541 'abcdefghijabcde.' +
2542 'a.b------------.' +
2543 '.c.------------.' +
2544 'd.e------------.' +
2545 'k--------------.' +
2546 'l--------------.' +
2547 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002548 ]
2549
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002550 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002551 # Try a few one-shot test cases.
2552 for input, eof, output in self.test_cases:
2553 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002554 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002555
2556 # Also test an unfinished decode, followed by forcing EOF.
2557 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002558 self.assertEqual(d.decode(b'oiabcd'), '')
2559 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002560
2561class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002562
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002563 def setUp(self):
2564 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2565 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002566 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002567
Guido van Rossumd0712812007-04-11 16:32:43 +00002568 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002569 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002570
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002571 def test_constructor(self):
2572 r = self.BytesIO(b"\xc3\xa9\n\n")
2573 b = self.BufferedReader(r, 1000)
2574 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002575 t.__init__(b, encoding="latin-1", newline="\r\n")
2576 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002577 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002578 t.__init__(b, encoding="utf-8", line_buffering=True)
2579 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002580 self.assertEqual(t.line_buffering, True)
2581 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002582 self.assertRaises(TypeError, t.__init__, b, newline=42)
2583 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2584
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002585 def test_uninitialized(self):
2586 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2587 del t
2588 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2589 self.assertRaises(Exception, repr, t)
2590 self.assertRaisesRegex((ValueError, AttributeError),
2591 'uninitialized|has no attribute',
2592 t.read, 0)
2593 t.__init__(self.MockRawIO())
2594 self.assertEqual(t.read(0), '')
2595
Nick Coghlana9b15242014-02-04 22:11:18 +10002596 def test_non_text_encoding_codecs_are_rejected(self):
2597 # Ensure the constructor complains if passed a codec that isn't
2598 # marked as a text encoding
2599 # http://bugs.python.org/issue20404
2600 r = self.BytesIO()
2601 b = self.BufferedWriter(r)
2602 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2603 self.TextIOWrapper(b, encoding="hex")
2604
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002605 def test_detach(self):
2606 r = self.BytesIO()
2607 b = self.BufferedWriter(r)
2608 t = self.TextIOWrapper(b)
2609 self.assertIs(t.detach(), b)
2610
2611 t = self.TextIOWrapper(b, encoding="ascii")
2612 t.write("howdy")
2613 self.assertFalse(r.getvalue())
2614 t.detach()
2615 self.assertEqual(r.getvalue(), b"howdy")
2616 self.assertRaises(ValueError, t.detach)
2617
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002618 # Operations independent of the detached stream should still work
2619 repr(t)
2620 self.assertEqual(t.encoding, "ascii")
2621 self.assertEqual(t.errors, "strict")
2622 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002623 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002624
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002625 def test_repr(self):
2626 raw = self.BytesIO("hello".encode("utf-8"))
2627 b = self.BufferedReader(raw)
2628 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002629 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002630 self.assertRegex(repr(t),
2631 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002632 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002633 self.assertRegex(repr(t),
2634 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002635 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002636 self.assertRegex(repr(t),
2637 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002638 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002639 self.assertRegex(repr(t),
2640 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002641
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002642 t.buffer.detach()
2643 repr(t) # Should not raise an exception
2644
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002645 def test_recursive_repr(self):
2646 # Issue #25455
2647 raw = self.BytesIO()
2648 t = self.TextIOWrapper(raw)
2649 with support.swap_attr(raw, 'name', t):
2650 try:
2651 repr(t) # Should not crash
2652 except RuntimeError:
2653 pass
2654
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002655 def test_line_buffering(self):
2656 r = self.BytesIO()
2657 b = self.BufferedWriter(r, 1000)
2658 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002659 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002660 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002661 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002662 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002663 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002664 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002665
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002666 def test_reconfigure_line_buffering(self):
2667 r = self.BytesIO()
2668 b = self.BufferedWriter(r, 1000)
2669 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2670 t.write("AB\nC")
2671 self.assertEqual(r.getvalue(), b"")
2672
2673 t.reconfigure(line_buffering=True) # implicit flush
2674 self.assertEqual(r.getvalue(), b"AB\nC")
2675 t.write("DEF\nG")
2676 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2677 t.write("H")
2678 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2679 t.reconfigure(line_buffering=False) # implicit flush
2680 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2681 t.write("IJ")
2682 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2683
2684 # Keeping default value
2685 t.reconfigure()
2686 t.reconfigure(line_buffering=None)
2687 self.assertEqual(t.line_buffering, False)
2688 t.reconfigure(line_buffering=True)
2689 t.reconfigure()
2690 t.reconfigure(line_buffering=None)
2691 self.assertEqual(t.line_buffering, True)
2692
Victor Stinner91106cd2017-12-13 12:29:09 +01002693 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002694 def test_default_encoding(self):
2695 old_environ = dict(os.environ)
2696 try:
2697 # try to get a user preferred encoding different than the current
2698 # locale encoding to check that TextIOWrapper() uses the current
2699 # locale encoding and not the user preferred encoding
2700 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2701 if key in os.environ:
2702 del os.environ[key]
2703
2704 current_locale_encoding = locale.getpreferredencoding(False)
2705 b = self.BytesIO()
2706 t = self.TextIOWrapper(b)
2707 self.assertEqual(t.encoding, current_locale_encoding)
2708 finally:
2709 os.environ.clear()
2710 os.environ.update(old_environ)
2711
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002712 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002713 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002714 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002715 # Issue 15989
2716 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002717 b = self.BytesIO()
2718 b.fileno = lambda: _testcapi.INT_MAX + 1
2719 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2720 b.fileno = lambda: _testcapi.UINT_MAX + 1
2721 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723 def test_encoding(self):
2724 # Check the encoding attribute is always set, and valid
2725 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002726 t = self.TextIOWrapper(b, encoding="utf-8")
2727 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002728 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002729 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002730 codecs.lookup(t.encoding)
2731
2732 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002733 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002734 b = self.BytesIO(b"abc\n\xff\n")
2735 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002736 self.assertRaises(UnicodeError, t.read)
2737 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002738 b = self.BytesIO(b"abc\n\xff\n")
2739 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002740 self.assertRaises(UnicodeError, t.read)
2741 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002742 b = self.BytesIO(b"abc\n\xff\n")
2743 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002744 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002745 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 b = self.BytesIO(b"abc\n\xff\n")
2747 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002748 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002749
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002750 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002751 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002752 b = self.BytesIO()
2753 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002754 self.assertRaises(UnicodeError, t.write, "\xff")
2755 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002756 b = self.BytesIO()
2757 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002758 self.assertRaises(UnicodeError, t.write, "\xff")
2759 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 b = self.BytesIO()
2761 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002762 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002763 t.write("abc\xffdef\n")
2764 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002765 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002766 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002767 b = self.BytesIO()
2768 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002769 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002770 t.write("abc\xffdef\n")
2771 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002772 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002773
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002774 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002775 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2776
2777 tests = [
2778 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002779 [ '', input_lines ],
2780 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2781 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2782 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002783 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002784 encodings = (
2785 'utf-8', 'latin-1',
2786 'utf-16', 'utf-16-le', 'utf-16-be',
2787 'utf-32', 'utf-32-le', 'utf-32-be',
2788 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002789
Guido van Rossum8358db22007-08-18 21:39:55 +00002790 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002791 # character in TextIOWrapper._pending_line.
2792 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002793 # XXX: str.encode() should return bytes
2794 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002795 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002796 for bufsize in range(1, 10):
2797 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002798 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2799 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002800 encoding=encoding)
2801 if do_reads:
2802 got_lines = []
2803 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002804 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002805 if c2 == '':
2806 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002807 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002808 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002809 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002810 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002811
2812 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002813 self.assertEqual(got_line, exp_line)
2814 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002815
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002816 def test_newlines_input(self):
2817 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002818 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2819 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002820 (None, normalized.decode("ascii").splitlines(keepends=True)),
2821 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002822 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2823 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2824 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002825 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002826 buf = self.BytesIO(testdata)
2827 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002828 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002829 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002830 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002831
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002832 def test_newlines_output(self):
2833 testdict = {
2834 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2835 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2836 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2837 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2838 }
2839 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2840 for newline, expected in tests:
2841 buf = self.BytesIO()
2842 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2843 txt.write("AAA\nB")
2844 txt.write("BB\nCCC\n")
2845 txt.write("X\rY\r\nZ")
2846 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002847 self.assertEqual(buf.closed, False)
2848 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002849
2850 def test_destructor(self):
2851 l = []
2852 base = self.BytesIO
2853 class MyBytesIO(base):
2854 def close(self):
2855 l.append(self.getvalue())
2856 base.close(self)
2857 b = MyBytesIO()
2858 t = self.TextIOWrapper(b, encoding="ascii")
2859 t.write("abc")
2860 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002861 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002862 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002863
2864 def test_override_destructor(self):
2865 record = []
2866 class MyTextIO(self.TextIOWrapper):
2867 def __del__(self):
2868 record.append(1)
2869 try:
2870 f = super().__del__
2871 except AttributeError:
2872 pass
2873 else:
2874 f()
2875 def close(self):
2876 record.append(2)
2877 super().close()
2878 def flush(self):
2879 record.append(3)
2880 super().flush()
2881 b = self.BytesIO()
2882 t = MyTextIO(b, encoding="ascii")
2883 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002884 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002885 self.assertEqual(record, [1, 2, 3])
2886
2887 def test_error_through_destructor(self):
2888 # Test that the exception state is not modified by a destructor,
2889 # even if close() fails.
2890 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002891 with support.catch_unraisable_exception() as cm:
2892 with self.assertRaises(AttributeError):
2893 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002894
2895 if not IOBASE_EMITS_UNRAISABLE:
2896 self.assertIsNone(cm.unraisable)
2897 elif cm.unraisable is not None:
2898 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002899
Guido van Rossum9b76da62007-04-11 01:09:03 +00002900 # Systematic tests of the text I/O API
2901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002902 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002903 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002904 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002905 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002906 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002907 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002908 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002909 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002910 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002911 self.assertEqual(f.tell(), 0)
2912 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002913 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002914 self.assertEqual(f.seek(0), 0)
2915 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002916 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002917 self.assertEqual(f.read(2), "ab")
2918 self.assertEqual(f.read(1), "c")
2919 self.assertEqual(f.read(1), "")
2920 self.assertEqual(f.read(), "")
2921 self.assertEqual(f.tell(), cookie)
2922 self.assertEqual(f.seek(0), 0)
2923 self.assertEqual(f.seek(0, 2), cookie)
2924 self.assertEqual(f.write("def"), 3)
2925 self.assertEqual(f.seek(cookie), cookie)
2926 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002927 if enc.startswith("utf"):
2928 self.multi_line_test(f, enc)
2929 f.close()
2930
2931 def multi_line_test(self, f, enc):
2932 f.seek(0)
2933 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002934 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002935 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002936 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002937 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002938 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002939 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002940 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002941 wlines.append((f.tell(), line))
2942 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002943 f.seek(0)
2944 rlines = []
2945 while True:
2946 pos = f.tell()
2947 line = f.readline()
2948 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002949 break
2950 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002951 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002953 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002954 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002955 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002956 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002957 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002958 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002959 p2 = f.tell()
2960 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002961 self.assertEqual(f.tell(), p0)
2962 self.assertEqual(f.readline(), "\xff\n")
2963 self.assertEqual(f.tell(), p1)
2964 self.assertEqual(f.readline(), "\xff\n")
2965 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002966 f.seek(0)
2967 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002968 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002969 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002970 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002971 f.close()
2972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002973 def test_seeking(self):
2974 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002975 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002976 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002977 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002978 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002979 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002980 suffix = bytes(u_suffix.encode("utf-8"))
2981 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002982 with self.open(support.TESTFN, "wb") as f:
2983 f.write(line*2)
2984 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2985 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002986 self.assertEqual(s, str(prefix, "ascii"))
2987 self.assertEqual(f.tell(), prefix_size)
2988 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002989
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002990 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002991 # Regression test for a specific bug
2992 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002993 with self.open(support.TESTFN, "wb") as f:
2994 f.write(data)
2995 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2996 f._CHUNK_SIZE # Just test that it exists
2997 f._CHUNK_SIZE = 2
2998 f.readline()
2999 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003001 def test_seek_and_tell(self):
3002 #Test seek/tell using the StatefulIncrementalDecoder.
3003 # Make test faster by doing smaller seeks
3004 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003005
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003006 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003007 """Tell/seek to various points within a data stream and ensure
3008 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003009 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003010 f.write(data)
3011 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003012 f = self.open(support.TESTFN, encoding='test_decoder')
3013 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003014 decoded = f.read()
3015 f.close()
3016
Neal Norwitze2b07052008-03-18 19:52:05 +00003017 for i in range(min_pos, len(decoded) + 1): # seek positions
3018 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003019 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003020 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003021 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003022 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003023 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003024 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003025 f.close()
3026
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003027 # Enable the test decoder.
3028 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003029
3030 # Run the tests.
3031 try:
3032 # Try each test case.
3033 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003034 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003035
3036 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003037 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3038 offset = CHUNK_SIZE - len(input)//2
3039 prefix = b'.'*offset
3040 # Don't bother seeking into the prefix (takes too long).
3041 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003042 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003043
3044 # Ensure our test decoder won't interfere with subsequent tests.
3045 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003046 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003047
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003048 def test_multibyte_seek_and_tell(self):
3049 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3050 f.write("AB\n\u3046\u3048\n")
3051 f.close()
3052
3053 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3054 self.assertEqual(f.readline(), "AB\n")
3055 p0 = f.tell()
3056 self.assertEqual(f.readline(), "\u3046\u3048\n")
3057 p1 = f.tell()
3058 f.seek(p0)
3059 self.assertEqual(f.readline(), "\u3046\u3048\n")
3060 self.assertEqual(f.tell(), p1)
3061 f.close()
3062
3063 def test_seek_with_encoder_state(self):
3064 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3065 f.write("\u00e6\u0300")
3066 p0 = f.tell()
3067 f.write("\u00e6")
3068 f.seek(p0)
3069 f.write("\u0300")
3070 f.close()
3071
3072 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3073 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3074 f.close()
3075
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003076 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003077 data = "1234567890"
3078 tests = ("utf-16",
3079 "utf-16-le",
3080 "utf-16-be",
3081 "utf-32",
3082 "utf-32-le",
3083 "utf-32-be")
3084 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003085 buf = self.BytesIO()
3086 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003087 # Check if the BOM is written only once (see issue1753).
3088 f.write(data)
3089 f.write(data)
3090 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003091 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003092 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003093 self.assertEqual(f.read(), data * 2)
3094 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003095
Benjamin Petersona1b49012009-03-31 23:11:32 +00003096 def test_unreadable(self):
3097 class UnReadable(self.BytesIO):
3098 def readable(self):
3099 return False
3100 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003101 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003102
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003103 def test_read_one_by_one(self):
3104 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003105 reads = ""
3106 while True:
3107 c = txt.read(1)
3108 if not c:
3109 break
3110 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003111 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003112
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003113 def test_readlines(self):
3114 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3115 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3116 txt.seek(0)
3117 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3118 txt.seek(0)
3119 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3120
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003121 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003122 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003123 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003124 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003125 reads = ""
3126 while True:
3127 c = txt.read(128)
3128 if not c:
3129 break
3130 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003131 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003132
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003133 def test_writelines(self):
3134 l = ['ab', 'cd', 'ef']
3135 buf = self.BytesIO()
3136 txt = self.TextIOWrapper(buf)
3137 txt.writelines(l)
3138 txt.flush()
3139 self.assertEqual(buf.getvalue(), b'abcdef')
3140
3141 def test_writelines_userlist(self):
3142 l = UserList(['ab', 'cd', 'ef'])
3143 buf = self.BytesIO()
3144 txt = self.TextIOWrapper(buf)
3145 txt.writelines(l)
3146 txt.flush()
3147 self.assertEqual(buf.getvalue(), b'abcdef')
3148
3149 def test_writelines_error(self):
3150 txt = self.TextIOWrapper(self.BytesIO())
3151 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3152 self.assertRaises(TypeError, txt.writelines, None)
3153 self.assertRaises(TypeError, txt.writelines, b'abc')
3154
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003155 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003156 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003157
3158 # read one char at a time
3159 reads = ""
3160 while True:
3161 c = txt.read(1)
3162 if not c:
3163 break
3164 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003165 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003166
3167 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003168 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003169 txt._CHUNK_SIZE = 4
3170
3171 reads = ""
3172 while True:
3173 c = txt.read(4)
3174 if not c:
3175 break
3176 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003177 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003178
3179 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003180 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003181 txt._CHUNK_SIZE = 4
3182
3183 reads = txt.read(4)
3184 reads += txt.read(4)
3185 reads += txt.readline()
3186 reads += txt.readline()
3187 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003188 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003189
3190 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003191 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003192 txt._CHUNK_SIZE = 4
3193
3194 reads = txt.read(4)
3195 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003196 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003197
3198 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003199 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003200 txt._CHUNK_SIZE = 4
3201
3202 reads = txt.read(4)
3203 pos = txt.tell()
3204 txt.seek(0)
3205 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003206 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003207
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003208 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003209 buffer = self.BytesIO(self.testdata)
3210 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003211
3212 self.assertEqual(buffer.seekable(), txt.seekable())
3213
Antoine Pitroue4501852009-05-14 18:55:55 +00003214 def test_append_bom(self):
3215 # The BOM is not written again when appending to a non-empty file
3216 filename = support.TESTFN
3217 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3218 with self.open(filename, 'w', encoding=charset) as f:
3219 f.write('aaa')
3220 pos = f.tell()
3221 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003222 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003223
3224 with self.open(filename, 'a', encoding=charset) as f:
3225 f.write('xxx')
3226 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003227 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003228
3229 def test_seek_bom(self):
3230 # Same test, but when seeking manually
3231 filename = support.TESTFN
3232 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3233 with self.open(filename, 'w', encoding=charset) as f:
3234 f.write('aaa')
3235 pos = f.tell()
3236 with self.open(filename, 'r+', encoding=charset) as f:
3237 f.seek(pos)
3238 f.write('zzz')
3239 f.seek(0)
3240 f.write('bbb')
3241 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003242 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003243
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003244 def test_seek_append_bom(self):
3245 # Same test, but first seek to the start and then to the end
3246 filename = support.TESTFN
3247 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3248 with self.open(filename, 'w', encoding=charset) as f:
3249 f.write('aaa')
3250 with self.open(filename, 'a', encoding=charset) as f:
3251 f.seek(0)
3252 f.seek(0, self.SEEK_END)
3253 f.write('xxx')
3254 with self.open(filename, 'rb') as f:
3255 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3256
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003257 def test_errors_property(self):
3258 with self.open(support.TESTFN, "w") as f:
3259 self.assertEqual(f.errors, "strict")
3260 with self.open(support.TESTFN, "w", errors="replace") as f:
3261 self.assertEqual(f.errors, "replace")
3262
Brett Cannon31f59292011-02-21 19:29:56 +00003263 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003264 def test_threads_write(self):
3265 # Issue6750: concurrent writes could duplicate data
3266 event = threading.Event()
3267 with self.open(support.TESTFN, "w", buffering=1) as f:
3268 def run(n):
3269 text = "Thread%03d\n" % n
3270 event.wait()
3271 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003272 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003273 for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08003274 with threading_helper.start_threads(threads, event.set):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003275 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003276 with self.open(support.TESTFN) as f:
3277 content = f.read()
3278 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003279 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003280
Antoine Pitrou6be88762010-05-03 16:48:20 +00003281 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003282 # Test that text file is closed despite failed flush
3283 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003284 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003285 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003286 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003287 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003288 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003289 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003290 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003291 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003292 self.assertTrue(txt.buffer.closed)
3293 self.assertTrue(closed) # flush() called
3294 self.assertFalse(closed[0]) # flush() called before file closed
3295 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003296 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003297
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003298 def test_close_error_on_close(self):
3299 buffer = self.BytesIO(self.testdata)
3300 def bad_flush():
3301 raise OSError('flush')
3302 def bad_close():
3303 raise OSError('close')
3304 buffer.close = bad_close
3305 txt = self.TextIOWrapper(buffer, encoding="ascii")
3306 txt.flush = bad_flush
3307 with self.assertRaises(OSError) as err: # exception not swallowed
3308 txt.close()
3309 self.assertEqual(err.exception.args, ('close',))
3310 self.assertIsInstance(err.exception.__context__, OSError)
3311 self.assertEqual(err.exception.__context__.args, ('flush',))
3312 self.assertFalse(txt.closed)
3313
Victor Stinner472f7942019-04-12 21:58:24 +02003314 # Silence destructor error
3315 buffer.close = lambda: None
3316 txt.flush = lambda: None
3317
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003318 def test_nonnormalized_close_error_on_close(self):
3319 # Issue #21677
3320 buffer = self.BytesIO(self.testdata)
3321 def bad_flush():
3322 raise non_existing_flush
3323 def bad_close():
3324 raise non_existing_close
3325 buffer.close = bad_close
3326 txt = self.TextIOWrapper(buffer, encoding="ascii")
3327 txt.flush = bad_flush
3328 with self.assertRaises(NameError) as err: # exception not swallowed
3329 txt.close()
3330 self.assertIn('non_existing_close', str(err.exception))
3331 self.assertIsInstance(err.exception.__context__, NameError)
3332 self.assertIn('non_existing_flush', str(err.exception.__context__))
3333 self.assertFalse(txt.closed)
3334
Victor Stinner472f7942019-04-12 21:58:24 +02003335 # Silence destructor error
3336 buffer.close = lambda: None
3337 txt.flush = lambda: None
3338
Antoine Pitrou6be88762010-05-03 16:48:20 +00003339 def test_multi_close(self):
3340 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3341 txt.close()
3342 txt.close()
3343 txt.close()
3344 self.assertRaises(ValueError, txt.flush)
3345
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003346 def test_unseekable(self):
3347 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3348 self.assertRaises(self.UnsupportedOperation, txt.tell)
3349 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3350
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003351 def test_readonly_attributes(self):
3352 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3353 buf = self.BytesIO(self.testdata)
3354 with self.assertRaises(AttributeError):
3355 txt.buffer = buf
3356
Antoine Pitroue96ec682011-07-23 21:46:35 +02003357 def test_rawio(self):
3358 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3359 # that subprocess.Popen() can have the required unbuffered
3360 # semantics with universal_newlines=True.
3361 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3362 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3363 # Reads
3364 self.assertEqual(txt.read(4), 'abcd')
3365 self.assertEqual(txt.readline(), 'efghi\n')
3366 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3367
3368 def test_rawio_write_through(self):
3369 # Issue #12591: with write_through=True, writes don't need a flush
3370 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3371 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3372 write_through=True)
3373 txt.write('1')
3374 txt.write('23\n4')
3375 txt.write('5')
3376 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3377
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003378 def test_bufio_write_through(self):
3379 # Issue #21396: write_through=True doesn't force a flush()
3380 # on the underlying binary buffered object.
3381 flush_called, write_called = [], []
3382 class BufferedWriter(self.BufferedWriter):
3383 def flush(self, *args, **kwargs):
3384 flush_called.append(True)
3385 return super().flush(*args, **kwargs)
3386 def write(self, *args, **kwargs):
3387 write_called.append(True)
3388 return super().write(*args, **kwargs)
3389
3390 rawio = self.BytesIO()
3391 data = b"a"
3392 bufio = BufferedWriter(rawio, len(data)*2)
3393 textio = self.TextIOWrapper(bufio, encoding='ascii',
3394 write_through=True)
3395 # write to the buffered io but don't overflow the buffer
3396 text = data.decode('ascii')
3397 textio.write(text)
3398
3399 # buffer.flush is not called with write_through=True
3400 self.assertFalse(flush_called)
3401 # buffer.write *is* called with write_through=True
3402 self.assertTrue(write_called)
3403 self.assertEqual(rawio.getvalue(), b"") # no flush
3404
3405 write_called = [] # reset
3406 textio.write(text * 10) # total content is larger than bufio buffer
3407 self.assertTrue(write_called)
3408 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3409
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003410 def test_reconfigure_write_through(self):
3411 raw = self.MockRawIO([])
3412 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3413 t.write('1')
3414 t.reconfigure(write_through=True) # implied flush
3415 self.assertEqual(t.write_through, True)
3416 self.assertEqual(b''.join(raw._write_stack), b'1')
3417 t.write('23')
3418 self.assertEqual(b''.join(raw._write_stack), b'123')
3419 t.reconfigure(write_through=False)
3420 self.assertEqual(t.write_through, False)
3421 t.write('45')
3422 t.flush()
3423 self.assertEqual(b''.join(raw._write_stack), b'12345')
3424 # Keeping default value
3425 t.reconfigure()
3426 t.reconfigure(write_through=None)
3427 self.assertEqual(t.write_through, False)
3428 t.reconfigure(write_through=True)
3429 t.reconfigure()
3430 t.reconfigure(write_through=None)
3431 self.assertEqual(t.write_through, True)
3432
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003433 def test_read_nonbytes(self):
3434 # Issue #17106
3435 # Crash when underlying read() returns non-bytes
3436 t = self.TextIOWrapper(self.StringIO('a'))
3437 self.assertRaises(TypeError, t.read, 1)
3438 t = self.TextIOWrapper(self.StringIO('a'))
3439 self.assertRaises(TypeError, t.readline)
3440 t = self.TextIOWrapper(self.StringIO('a'))
3441 self.assertRaises(TypeError, t.read)
3442
Oren Milmana5b4ea12017-08-25 21:14:54 +03003443 def test_illegal_encoder(self):
3444 # Issue 31271: Calling write() while the return value of encoder's
3445 # encode() is invalid shouldn't cause an assertion failure.
3446 rot13 = codecs.lookup("rot13")
3447 with support.swap_attr(rot13, '_is_text_encoding', True):
3448 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3449 self.assertRaises(TypeError, t.write, 'bar')
3450
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003451 def test_illegal_decoder(self):
3452 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003453 # Bypass the early encoding check added in issue 20404
3454 def _make_illegal_wrapper():
3455 quopri = codecs.lookup("quopri")
3456 quopri._is_text_encoding = True
3457 try:
3458 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3459 newline='\n', encoding="quopri")
3460 finally:
3461 quopri._is_text_encoding = False
3462 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003463 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003464 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003465 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003466 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003467 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003468 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003469 self.assertRaises(TypeError, t.read)
3470
Oren Milmanba7d7362017-08-29 11:58:27 +03003471 # Issue 31243: calling read() while the return value of decoder's
3472 # getstate() is invalid should neither crash the interpreter nor
3473 # raise a SystemError.
3474 def _make_very_illegal_wrapper(getstate_ret_val):
3475 class BadDecoder:
3476 def getstate(self):
3477 return getstate_ret_val
3478 def _get_bad_decoder(dummy):
3479 return BadDecoder()
3480 quopri = codecs.lookup("quopri")
3481 with support.swap_attr(quopri, 'incrementaldecoder',
3482 _get_bad_decoder):
3483 return _make_illegal_wrapper()
3484 t = _make_very_illegal_wrapper(42)
3485 self.assertRaises(TypeError, t.read, 42)
3486 t = _make_very_illegal_wrapper(())
3487 self.assertRaises(TypeError, t.read, 42)
3488 t = _make_very_illegal_wrapper((1, 2))
3489 self.assertRaises(TypeError, t.read, 42)
3490
Antoine Pitrou712cb732013-12-21 15:51:54 +01003491 def _check_create_at_shutdown(self, **kwargs):
3492 # Issue #20037: creating a TextIOWrapper at shutdown
3493 # shouldn't crash the interpreter.
3494 iomod = self.io.__name__
3495 code = """if 1:
3496 import codecs
3497 import {iomod} as io
3498
3499 # Avoid looking up codecs at shutdown
3500 codecs.lookup('utf-8')
3501
3502 class C:
3503 def __init__(self):
3504 self.buf = io.BytesIO()
3505 def __del__(self):
3506 io.TextIOWrapper(self.buf, **{kwargs})
3507 print("ok")
3508 c = C()
3509 """.format(iomod=iomod, kwargs=kwargs)
3510 return assert_python_ok("-c", code)
3511
3512 def test_create_at_shutdown_without_encoding(self):
3513 rc, out, err = self._check_create_at_shutdown()
3514 if err:
3515 # Can error out with a RuntimeError if the module state
3516 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003517 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003518 else:
3519 self.assertEqual("ok", out.decode().strip())
3520
3521 def test_create_at_shutdown_with_encoding(self):
3522 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3523 errors='strict')
3524 self.assertFalse(err)
3525 self.assertEqual("ok", out.decode().strip())
3526
Antoine Pitroub8503892014-04-29 10:14:02 +02003527 def test_read_byteslike(self):
3528 r = MemviewBytesIO(b'Just some random string\n')
3529 t = self.TextIOWrapper(r, 'utf-8')
3530
3531 # TextIOwrapper will not read the full string, because
3532 # we truncate it to a multiple of the native int size
3533 # so that we can construct a more complex memoryview.
3534 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3535
3536 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3537
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003538 def test_issue22849(self):
3539 class F(object):
3540 def readable(self): return True
3541 def writable(self): return True
3542 def seekable(self): return True
3543
3544 for i in range(10):
3545 try:
3546 self.TextIOWrapper(F(), encoding='utf-8')
3547 except Exception:
3548 pass
3549
3550 F.tell = lambda x: 0
3551 t = self.TextIOWrapper(F(), encoding='utf-8')
3552
INADA Naoki507434f2017-12-21 09:59:53 +09003553 def test_reconfigure_encoding_read(self):
3554 # latin1 -> utf8
3555 # (latin1 can decode utf-8 encoded string)
3556 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3557 raw = self.BytesIO(data)
3558 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3559 self.assertEqual(txt.readline(), 'abc\xe9\n')
3560 with self.assertRaises(self.UnsupportedOperation):
3561 txt.reconfigure(encoding='utf-8')
3562 with self.assertRaises(self.UnsupportedOperation):
3563 txt.reconfigure(newline=None)
3564
3565 def test_reconfigure_write_fromascii(self):
3566 # ascii has a specific encodefunc in the C implementation,
3567 # but utf-8-sig has not. Make sure that we get rid of the
3568 # cached encodefunc when we switch encoders.
3569 raw = self.BytesIO()
3570 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3571 txt.write('foo\n')
3572 txt.reconfigure(encoding='utf-8-sig')
3573 txt.write('\xe9\n')
3574 txt.flush()
3575 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3576
3577 def test_reconfigure_write(self):
3578 # latin -> utf8
3579 raw = self.BytesIO()
3580 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3581 txt.write('abc\xe9\n')
3582 txt.reconfigure(encoding='utf-8')
3583 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3584 txt.write('d\xe9f\n')
3585 txt.flush()
3586 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3587
3588 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3589 # the file
3590 raw = self.BytesIO()
3591 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3592 txt.write('abc\n')
3593 txt.reconfigure(encoding='utf-8-sig')
3594 txt.write('d\xe9f\n')
3595 txt.flush()
3596 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3597
3598 def test_reconfigure_write_non_seekable(self):
3599 raw = self.BytesIO()
3600 raw.seekable = lambda: False
3601 raw.seek = None
3602 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3603 txt.write('abc\n')
3604 txt.reconfigure(encoding='utf-8-sig')
3605 txt.write('d\xe9f\n')
3606 txt.flush()
3607
3608 # If the raw stream is not seekable, there'll be a BOM
3609 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3610
3611 def test_reconfigure_defaults(self):
3612 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3613 txt.reconfigure(encoding=None)
3614 self.assertEqual(txt.encoding, 'ascii')
3615 self.assertEqual(txt.errors, 'replace')
3616 txt.write('LF\n')
3617
3618 txt.reconfigure(newline='\r\n')
3619 self.assertEqual(txt.encoding, 'ascii')
3620 self.assertEqual(txt.errors, 'replace')
3621
3622 txt.reconfigure(errors='ignore')
3623 self.assertEqual(txt.encoding, 'ascii')
3624 self.assertEqual(txt.errors, 'ignore')
3625 txt.write('CRLF\n')
3626
3627 txt.reconfigure(encoding='utf-8', newline=None)
3628 self.assertEqual(txt.errors, 'strict')
3629 txt.seek(0)
3630 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3631
3632 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3633
3634 def test_reconfigure_newline(self):
3635 raw = self.BytesIO(b'CR\rEOF')
3636 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3637 txt.reconfigure(newline=None)
3638 self.assertEqual(txt.readline(), 'CR\n')
3639 raw = self.BytesIO(b'CR\rEOF')
3640 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3641 txt.reconfigure(newline='')
3642 self.assertEqual(txt.readline(), 'CR\r')
3643 raw = self.BytesIO(b'CR\rLF\nEOF')
3644 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3645 txt.reconfigure(newline='\n')
3646 self.assertEqual(txt.readline(), 'CR\rLF\n')
3647 raw = self.BytesIO(b'LF\nCR\rEOF')
3648 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3649 txt.reconfigure(newline='\r')
3650 self.assertEqual(txt.readline(), 'LF\nCR\r')
3651 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3652 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3653 txt.reconfigure(newline='\r\n')
3654 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3655
3656 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3657 txt.reconfigure(newline=None)
3658 txt.write('linesep\n')
3659 txt.reconfigure(newline='')
3660 txt.write('LF\n')
3661 txt.reconfigure(newline='\n')
3662 txt.write('LF\n')
3663 txt.reconfigure(newline='\r')
3664 txt.write('CR\n')
3665 txt.reconfigure(newline='\r\n')
3666 txt.write('CRLF\n')
3667 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3668 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3669
Zackery Spytz23db9352018-06-29 04:14:58 -06003670 def test_issue25862(self):
3671 # Assertion failures occurred in tell() after read() and write().
3672 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3673 t.read(1)
3674 t.read()
3675 t.tell()
3676 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3677 t.read(1)
3678 t.write('x')
3679 t.tell()
3680
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003681
Antoine Pitroub8503892014-04-29 10:14:02 +02003682class MemviewBytesIO(io.BytesIO):
3683 '''A BytesIO object whose read method returns memoryviews
3684 rather than bytes'''
3685
3686 def read1(self, len_):
3687 return _to_memoryview(super().read1(len_))
3688
3689 def read(self, len_):
3690 return _to_memoryview(super().read(len_))
3691
3692def _to_memoryview(buf):
3693 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3694
3695 arr = array.array('i')
3696 idx = len(buf) - len(buf) % arr.itemsize
3697 arr.frombytes(buf[:idx])
3698 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003699
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003700
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003701class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003702 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003703 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003704
3705 def test_initialization(self):
3706 r = self.BytesIO(b"\xc3\xa9\n\n")
3707 b = self.BufferedReader(r, 1000)
3708 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003709 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3710 self.assertRaises(ValueError, t.read)
3711
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003712 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3713 self.assertRaises(Exception, repr, t)
3714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003715 def test_garbage_collection(self):
3716 # C TextIOWrapper objects are collected, and collecting them flushes
3717 # all data to disk.
3718 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003719 with support.check_warnings(('', ResourceWarning)):
3720 rawio = io.FileIO(support.TESTFN, "wb")
3721 b = self.BufferedWriter(rawio)
3722 t = self.TextIOWrapper(b, encoding="ascii")
3723 t.write("456def")
3724 t.x = t
3725 wr = weakref.ref(t)
3726 del t
3727 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003728 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003729 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003730 self.assertEqual(f.read(), b"456def")
3731
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003732 def test_rwpair_cleared_before_textio(self):
3733 # Issue 13070: TextIOWrapper's finalization would crash when called
3734 # after the reference to the underlying BufferedRWPair's writer got
3735 # cleared by the GC.
3736 for i in range(1000):
3737 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3738 t1 = self.TextIOWrapper(b1, encoding="ascii")
3739 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3740 t2 = self.TextIOWrapper(b2, encoding="ascii")
3741 # circular references
3742 t1.buddy = t2
3743 t2.buddy = t1
3744 support.gc_collect()
3745
Zackery Spytz842acaa2018-12-17 07:52:45 -07003746 def test_del__CHUNK_SIZE_SystemError(self):
3747 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3748 with self.assertRaises(AttributeError):
3749 del t._CHUNK_SIZE
3750
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003751
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003752class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003753 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003754 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003755
3756
3757class IncrementalNewlineDecoderTest(unittest.TestCase):
3758
3759 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003760 # UTF-8 specific tests for a newline decoder
3761 def _check_decode(b, s, **kwargs):
3762 # We exercise getstate() / setstate() as well as decode()
3763 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003764 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003765 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003766 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003767
Antoine Pitrou180a3362008-12-14 16:36:46 +00003768 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003769
Antoine Pitrou180a3362008-12-14 16:36:46 +00003770 _check_decode(b'\xe8', "")
3771 _check_decode(b'\xa2', "")
3772 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003773
Antoine Pitrou180a3362008-12-14 16:36:46 +00003774 _check_decode(b'\xe8', "")
3775 _check_decode(b'\xa2', "")
3776 _check_decode(b'\x88', "\u8888")
3777
3778 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003779 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3780
Antoine Pitrou180a3362008-12-14 16:36:46 +00003781 decoder.reset()
3782 _check_decode(b'\n', "\n")
3783 _check_decode(b'\r', "")
3784 _check_decode(b'', "\n", final=True)
3785 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003786
Antoine Pitrou180a3362008-12-14 16:36:46 +00003787 _check_decode(b'\r', "")
3788 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003789
Antoine Pitrou180a3362008-12-14 16:36:46 +00003790 _check_decode(b'\r\r\n', "\n\n")
3791 _check_decode(b'\r', "")
3792 _check_decode(b'\r', "\n")
3793 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003794
Antoine Pitrou180a3362008-12-14 16:36:46 +00003795 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3796 _check_decode(b'\xe8\xa2\x88', "\u8888")
3797 _check_decode(b'\n', "\n")
3798 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3799 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003800
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003801 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003802 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003803 if encoding is not None:
3804 encoder = codecs.getincrementalencoder(encoding)()
3805 def _decode_bytewise(s):
3806 # Decode one byte at a time
3807 for b in encoder.encode(s):
3808 result.append(decoder.decode(bytes([b])))
3809 else:
3810 encoder = None
3811 def _decode_bytewise(s):
3812 # Decode one char at a time
3813 for c in s:
3814 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003815 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003816 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003817 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003818 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003819 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003820 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003821 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003822 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003823 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003824 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003825 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003826 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003827 input = "abc"
3828 if encoder is not None:
3829 encoder.reset()
3830 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003831 self.assertEqual(decoder.decode(input), "abc")
3832 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003833
3834 def test_newline_decoder(self):
3835 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003836 # None meaning the IncrementalNewlineDecoder takes unicode input
3837 # rather than bytes input
3838 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003839 'utf-16', 'utf-16-le', 'utf-16-be',
3840 'utf-32', 'utf-32-le', 'utf-32-be',
3841 )
3842 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003843 decoder = enc and codecs.getincrementaldecoder(enc)()
3844 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3845 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003846 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003847 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3848 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003849 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003850
Antoine Pitrou66913e22009-03-06 23:40:56 +00003851 def test_newline_bytes(self):
3852 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3853 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003854 self.assertEqual(dec.newlines, None)
3855 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3856 self.assertEqual(dec.newlines, None)
3857 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3858 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003859 dec = self.IncrementalNewlineDecoder(None, translate=False)
3860 _check(dec)
3861 dec = self.IncrementalNewlineDecoder(None, translate=True)
3862 _check(dec)
3863
Xiang Zhangb08746b2018-10-31 19:49:16 +08003864 def test_translate(self):
3865 # issue 35062
3866 for translate in (-2, -1, 1, 2):
3867 decoder = codecs.getincrementaldecoder("utf-8")()
3868 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3869 self.check_newline_decoding_utf8(decoder)
3870 decoder = codecs.getincrementaldecoder("utf-8")()
3871 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3872 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3873
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003874class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3875 pass
3876
3877class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3878 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003879
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003880
Guido van Rossum01a27522007-03-07 01:00:12 +00003881# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003882
Guido van Rossum5abbf752007-08-27 17:39:33 +00003883class MiscIOTest(unittest.TestCase):
3884
Barry Warsaw40e82462008-11-20 20:14:50 +00003885 def tearDown(self):
3886 support.unlink(support.TESTFN)
3887
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003888 def test___all__(self):
3889 for name in self.io.__all__:
3890 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003891 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003892 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003893 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003894 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003895 self.assertTrue(issubclass(obj, Exception), name)
3896 elif not name.startswith("SEEK_"):
3897 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003898
Barry Warsaw40e82462008-11-20 20:14:50 +00003899 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003900 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003901 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003902 f.close()
3903
Victor Stinner942f7a22020-03-04 18:50:22 +01003904 with support.check_warnings(('', DeprecationWarning)):
3905 f = self.open(support.TESTFN, "U")
3906 self.assertEqual(f.name, support.TESTFN)
3907 self.assertEqual(f.buffer.name, support.TESTFN)
3908 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3909 self.assertEqual(f.mode, "U")
3910 self.assertEqual(f.buffer.mode, "rb")
3911 self.assertEqual(f.buffer.raw.mode, "rb")
3912 f.close()
3913
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003914 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003915 self.assertEqual(f.mode, "w+")
3916 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3917 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003918
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003919 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003920 self.assertEqual(g.mode, "wb")
3921 self.assertEqual(g.raw.mode, "wb")
3922 self.assertEqual(g.name, f.fileno())
3923 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003924 f.close()
3925 g.close()
3926
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003927 def test_open_pipe_with_append(self):
3928 # bpo-27805: Ignore ESPIPE from lseek() in open().
3929 r, w = os.pipe()
3930 self.addCleanup(os.close, r)
3931 f = self.open(w, 'a')
3932 self.addCleanup(f.close)
3933 # Check that the file is marked non-seekable. On Windows, however, lseek
3934 # somehow succeeds on pipes.
3935 if sys.platform != 'win32':
3936 self.assertFalse(f.seekable())
3937
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003938 def test_io_after_close(self):
3939 for kwargs in [
3940 {"mode": "w"},
3941 {"mode": "wb"},
3942 {"mode": "w", "buffering": 1},
3943 {"mode": "w", "buffering": 2},
3944 {"mode": "wb", "buffering": 0},
3945 {"mode": "r"},
3946 {"mode": "rb"},
3947 {"mode": "r", "buffering": 1},
3948 {"mode": "r", "buffering": 2},
3949 {"mode": "rb", "buffering": 0},
3950 {"mode": "w+"},
3951 {"mode": "w+b"},
3952 {"mode": "w+", "buffering": 1},
3953 {"mode": "w+", "buffering": 2},
3954 {"mode": "w+b", "buffering": 0},
3955 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003956 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003957 f.close()
3958 self.assertRaises(ValueError, f.flush)
3959 self.assertRaises(ValueError, f.fileno)
3960 self.assertRaises(ValueError, f.isatty)
3961 self.assertRaises(ValueError, f.__iter__)
3962 if hasattr(f, "peek"):
3963 self.assertRaises(ValueError, f.peek, 1)
3964 self.assertRaises(ValueError, f.read)
3965 if hasattr(f, "read1"):
3966 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003967 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003968 if hasattr(f, "readall"):
3969 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003970 if hasattr(f, "readinto"):
3971 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003972 if hasattr(f, "readinto1"):
3973 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003974 self.assertRaises(ValueError, f.readline)
3975 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003976 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003977 self.assertRaises(ValueError, f.seek, 0)
3978 self.assertRaises(ValueError, f.tell)
3979 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003980 self.assertRaises(ValueError, f.write,
3981 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003982 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003983 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003984
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003985 def test_blockingioerror(self):
3986 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003987 class C(str):
3988 pass
3989 c = C("")
3990 b = self.BlockingIOError(1, c)
3991 c.b = b
3992 b.c = c
3993 wr = weakref.ref(c)
3994 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003995 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003996 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003997
3998 def test_abcs(self):
3999 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00004000 self.assertIsInstance(self.IOBase, abc.ABCMeta)
4001 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4002 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4003 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004004
4005 def _check_abc_inheritance(self, abcmodule):
4006 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004007 self.assertIsInstance(f, abcmodule.IOBase)
4008 self.assertIsInstance(f, abcmodule.RawIOBase)
4009 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4010 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004011 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004012 self.assertIsInstance(f, abcmodule.IOBase)
4013 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4014 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4015 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004016 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004017 self.assertIsInstance(f, abcmodule.IOBase)
4018 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4019 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4020 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004021
4022 def test_abc_inheritance(self):
4023 # Test implementations inherit from their respective ABCs
4024 self._check_abc_inheritance(self)
4025
4026 def test_abc_inheritance_official(self):
4027 # Test implementations inherit from the official ABCs of the
4028 # baseline "io" module.
4029 self._check_abc_inheritance(io)
4030
Antoine Pitroue033e062010-10-29 10:38:18 +00004031 def _check_warn_on_dealloc(self, *args, **kwargs):
4032 f = open(*args, **kwargs)
4033 r = repr(f)
4034 with self.assertWarns(ResourceWarning) as cm:
4035 f = None
4036 support.gc_collect()
4037 self.assertIn(r, str(cm.warning.args[0]))
4038
4039 def test_warn_on_dealloc(self):
4040 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4041 self._check_warn_on_dealloc(support.TESTFN, "wb")
4042 self._check_warn_on_dealloc(support.TESTFN, "w")
4043
4044 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4045 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004046 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004047 for fd in fds:
4048 try:
4049 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004050 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004051 if e.errno != errno.EBADF:
4052 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004053 self.addCleanup(cleanup_fds)
4054 r, w = os.pipe()
4055 fds += r, w
4056 self._check_warn_on_dealloc(r, *args, **kwargs)
4057 # When using closefd=False, there's no warning
4058 r, w = os.pipe()
4059 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004060 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004061 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004062
4063 def test_warn_on_dealloc_fd(self):
4064 self._check_warn_on_dealloc_fd("rb", buffering=0)
4065 self._check_warn_on_dealloc_fd("rb")
4066 self._check_warn_on_dealloc_fd("r")
4067
4068
Antoine Pitrou243757e2010-11-05 21:15:39 +00004069 def test_pickling(self):
4070 # Pickling file objects is forbidden
4071 for kwargs in [
4072 {"mode": "w"},
4073 {"mode": "wb"},
4074 {"mode": "wb", "buffering": 0},
4075 {"mode": "r"},
4076 {"mode": "rb"},
4077 {"mode": "rb", "buffering": 0},
4078 {"mode": "w+"},
4079 {"mode": "w+b"},
4080 {"mode": "w+b", "buffering": 0},
4081 ]:
4082 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4083 with self.open(support.TESTFN, **kwargs) as f:
4084 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4085
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004086 def test_nonblock_pipe_write_bigbuf(self):
4087 self._test_nonblock_pipe_write(16*1024)
4088
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004089 def test_nonblock_pipe_write_smallbuf(self):
4090 self._test_nonblock_pipe_write(1024)
4091
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004092 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4093 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004094 def _test_nonblock_pipe_write(self, bufsize):
4095 sent = []
4096 received = []
4097 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004098 os.set_blocking(r, False)
4099 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004100
4101 # To exercise all code paths in the C implementation we need
4102 # to play with buffer sizes. For instance, if we choose a
4103 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4104 # then we will never get a partial write of the buffer.
4105 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4106 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4107
4108 with rf, wf:
4109 for N in 9999, 73, 7574:
4110 try:
4111 i = 0
4112 while True:
4113 msg = bytes([i % 26 + 97]) * N
4114 sent.append(msg)
4115 wf.write(msg)
4116 i += 1
4117
4118 except self.BlockingIOError as e:
4119 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004120 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004121 sent[-1] = sent[-1][:e.characters_written]
4122 received.append(rf.read())
4123 msg = b'BLOCKED'
4124 wf.write(msg)
4125 sent.append(msg)
4126
4127 while True:
4128 try:
4129 wf.flush()
4130 break
4131 except self.BlockingIOError as e:
4132 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004133 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004134 self.assertEqual(e.characters_written, 0)
4135 received.append(rf.read())
4136
4137 received += iter(rf.read, None)
4138
4139 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004140 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004141 self.assertTrue(wf.closed)
4142 self.assertTrue(rf.closed)
4143
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004144 def test_create_fail(self):
4145 # 'x' mode fails if file is existing
4146 with self.open(support.TESTFN, 'w'):
4147 pass
4148 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4149
4150 def test_create_writes(self):
4151 # 'x' mode opens for writing
4152 with self.open(support.TESTFN, 'xb') as f:
4153 f.write(b"spam")
4154 with self.open(support.TESTFN, 'rb') as f:
4155 self.assertEqual(b"spam", f.read())
4156
Christian Heimes7b648752012-09-10 14:48:43 +02004157 def test_open_allargs(self):
4158 # there used to be a buffer overflow in the parser for rawmode
4159 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4160
Victor Stinner22eb6892019-06-26 00:51:05 +02004161 def test_check_encoding_errors(self):
4162 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4163 # arguments in dev mode
4164 mod = self.io.__name__
4165 filename = __file__
4166 invalid = 'Boom, Shaka Laka, Boom!'
4167 code = textwrap.dedent(f'''
4168 import sys
4169 from {mod} import open, TextIOWrapper
4170
4171 try:
4172 open({filename!r}, encoding={invalid!r})
4173 except LookupError:
4174 pass
4175 else:
4176 sys.exit(21)
4177
4178 try:
4179 open({filename!r}, errors={invalid!r})
4180 except LookupError:
4181 pass
4182 else:
4183 sys.exit(22)
4184
4185 fp = open({filename!r}, "rb")
4186 with fp:
4187 try:
4188 TextIOWrapper(fp, encoding={invalid!r})
4189 except LookupError:
4190 pass
4191 else:
4192 sys.exit(23)
4193
4194 try:
4195 TextIOWrapper(fp, errors={invalid!r})
4196 except LookupError:
4197 pass
4198 else:
4199 sys.exit(24)
4200
4201 sys.exit(10)
4202 ''')
4203 proc = assert_python_failure('-X', 'dev', '-c', code)
4204 self.assertEqual(proc.rc, 10, proc)
4205
Christian Heimes7b648752012-09-10 14:48:43 +02004206
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004207class CMiscIOTest(MiscIOTest):
4208 io = io
4209
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004210 def test_readinto_buffer_overflow(self):
4211 # Issue #18025
4212 class BadReader(self.io.BufferedIOBase):
4213 def read(self, n=-1):
4214 return b'x' * 10**6
4215 bufio = BadReader()
4216 b = bytearray(2)
4217 self.assertRaises(ValueError, bufio.readinto, b)
4218
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004219 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4220 # Issue #23309: deadlocks at shutdown should be avoided when a
4221 # daemon thread and the main thread both write to a file.
4222 code = """if 1:
4223 import sys
4224 import time
4225 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004226 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004227
4228 file = sys.{stream_name}
4229
4230 def run():
4231 while True:
4232 file.write('.')
4233 file.flush()
4234
Victor Stinner2a1aed02017-04-21 17:59:23 +02004235 crash = SuppressCrashReport()
4236 crash.__enter__()
4237 # don't call __exit__(): the crash occurs at Python shutdown
4238
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004239 thread = threading.Thread(target=run)
4240 thread.daemon = True
4241 thread.start()
4242
4243 time.sleep(0.5)
4244 file.write('!')
4245 file.flush()
4246 """.format_map(locals())
4247 res, _ = run_python_until_end("-c", code)
4248 err = res.err.decode()
4249 if res.rc != 0:
4250 # Failure: should be a fatal error
Victor Stinner9e5d30c2020-03-07 00:54:20 +01004251 pattern = (r"Fatal Python error: _enter_buffered_busy: "
4252 r"could not acquire lock "
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004253 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4254 r"at interpreter shutdown, possibly due to "
4255 r"daemon threads".format_map(locals()))
4256 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004257 else:
4258 self.assertFalse(err.strip('.!'))
4259
4260 def test_daemon_threads_shutdown_stdout_deadlock(self):
4261 self.check_daemon_threads_shutdown_deadlock('stdout')
4262
4263 def test_daemon_threads_shutdown_stderr_deadlock(self):
4264 self.check_daemon_threads_shutdown_deadlock('stderr')
4265
4266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004267class PyMiscIOTest(MiscIOTest):
4268 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004269
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004270
4271@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4272class SignalsTest(unittest.TestCase):
4273
4274 def setUp(self):
4275 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4276
4277 def tearDown(self):
4278 signal.signal(signal.SIGALRM, self.oldalrm)
4279
4280 def alarm_interrupt(self, sig, frame):
4281 1/0
4282
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004283 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4284 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004285 invokes the signal handler, and bubbles up the exception raised
4286 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004287 read_results = []
4288 def _read():
4289 s = os.read(r, 1)
4290 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004291
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004292 t = threading.Thread(target=_read)
4293 t.daemon = True
4294 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004295 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004296 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004297 try:
4298 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004299 if hasattr(signal, 'pthread_sigmask'):
4300 # create the thread with SIGALRM signal blocked
4301 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4302 t.start()
4303 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4304 else:
4305 t.start()
4306
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004307 # Fill the pipe enough that the write will be blocking.
4308 # It will be interrupted by the timer armed above. Since the
4309 # other thread has read one byte, the low-level write will
4310 # return with a successful (partial) result rather than an EINTR.
4311 # The buffered IO layer must check for pending signal
4312 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004313 signal.alarm(1)
4314 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004315 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004316 finally:
4317 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004318 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004319 # We got one byte, get another one and check that it isn't a
4320 # repeat of the first one.
4321 read_results.append(os.read(r, 1))
4322 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4323 finally:
4324 os.close(w)
4325 os.close(r)
4326 # This is deliberate. If we didn't close the file descriptor
4327 # before closing wio, wio would try to flush its internal
4328 # buffer, and block again.
4329 try:
4330 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004331 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004332 if e.errno != errno.EBADF:
4333 raise
4334
4335 def test_interrupted_write_unbuffered(self):
4336 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4337
4338 def test_interrupted_write_buffered(self):
4339 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4340
4341 def test_interrupted_write_text(self):
4342 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4343
Brett Cannon31f59292011-02-21 19:29:56 +00004344 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004345 def check_reentrant_write(self, data, **fdopen_kwargs):
4346 def on_alarm(*args):
4347 # Will be called reentrantly from the same thread
4348 wio.write(data)
4349 1/0
4350 signal.signal(signal.SIGALRM, on_alarm)
4351 r, w = os.pipe()
4352 wio = self.io.open(w, **fdopen_kwargs)
4353 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004354 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004355 # Either the reentrant call to wio.write() fails with RuntimeError,
4356 # or the signal handler raises ZeroDivisionError.
4357 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4358 while 1:
4359 for i in range(100):
4360 wio.write(data)
4361 wio.flush()
4362 # Make sure the buffer doesn't fill up and block further writes
4363 os.read(r, len(data) * 100)
4364 exc = cm.exception
4365 if isinstance(exc, RuntimeError):
4366 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4367 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004368 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004369 wio.close()
4370 os.close(r)
4371
4372 def test_reentrant_write_buffered(self):
4373 self.check_reentrant_write(b"xy", mode="wb")
4374
4375 def test_reentrant_write_text(self):
4376 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4377
Antoine Pitrou707ce822011-02-25 21:24:11 +00004378 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4379 """Check that a buffered read, when it gets interrupted (either
4380 returning a partial result or EINTR), properly invokes the signal
4381 handler and retries if the latter returned successfully."""
4382 r, w = os.pipe()
4383 fdopen_kwargs["closefd"] = False
4384 def alarm_handler(sig, frame):
4385 os.write(w, b"bar")
4386 signal.signal(signal.SIGALRM, alarm_handler)
4387 try:
4388 rio = self.io.open(r, **fdopen_kwargs)
4389 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004390 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004391 # Expected behaviour:
4392 # - first raw read() returns partial b"foo"
4393 # - second raw read() returns EINTR
4394 # - third raw read() returns b"bar"
4395 self.assertEqual(decode(rio.read(6)), "foobar")
4396 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004397 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004398 rio.close()
4399 os.close(w)
4400 os.close(r)
4401
Antoine Pitrou20db5112011-08-19 20:32:34 +02004402 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004403 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4404 mode="rb")
4405
Antoine Pitrou20db5112011-08-19 20:32:34 +02004406 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004407 self.check_interrupted_read_retry(lambda x: x,
4408 mode="r")
4409
Antoine Pitrou707ce822011-02-25 21:24:11 +00004410 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4411 """Check that a buffered write, when it gets interrupted (either
4412 returning a partial result or EINTR), properly invokes the signal
4413 handler and retries if the latter returned successfully."""
4414 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004415
Antoine Pitrou707ce822011-02-25 21:24:11 +00004416 # A quantity that exceeds the buffer size of an anonymous pipe's
4417 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004418 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004419 r, w = os.pipe()
4420 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004421
Antoine Pitrou707ce822011-02-25 21:24:11 +00004422 # We need a separate thread to read from the pipe and allow the
4423 # write() to finish. This thread is started after the SIGALRM is
4424 # received (forcing a first EINTR in write()).
4425 read_results = []
4426 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004427 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004428 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004429 try:
4430 while not write_finished:
4431 while r in select.select([r], [], [], 1.0)[0]:
4432 s = os.read(r, 1024)
4433 read_results.append(s)
4434 except BaseException as exc:
4435 nonlocal error
4436 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004437 t = threading.Thread(target=_read)
4438 t.daemon = True
4439 def alarm1(sig, frame):
4440 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004441 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004442 def alarm2(sig, frame):
4443 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004444
4445 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004446 signal.signal(signal.SIGALRM, alarm1)
4447 try:
4448 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004449 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004450 # Expected behaviour:
4451 # - first raw write() is partial (because of the limited pipe buffer
4452 # and the first alarm)
4453 # - second raw write() returns EINTR (because of the second alarm)
4454 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004455 written = wio.write(large_data)
4456 self.assertEqual(N, written)
4457
Antoine Pitrou707ce822011-02-25 21:24:11 +00004458 wio.flush()
4459 write_finished = True
4460 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004461
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004462 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004463 self.assertEqual(N, sum(len(x) for x in read_results))
4464 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004465 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004466 write_finished = True
4467 os.close(w)
4468 os.close(r)
4469 # This is deliberate. If we didn't close the file descriptor
4470 # before closing wio, wio would try to flush its internal
4471 # buffer, and could block (in case of failure).
4472 try:
4473 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004474 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004475 if e.errno != errno.EBADF:
4476 raise
4477
Antoine Pitrou20db5112011-08-19 20:32:34 +02004478 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004479 self.check_interrupted_write_retry(b"x", mode="wb")
4480
Antoine Pitrou20db5112011-08-19 20:32:34 +02004481 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004482 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4483
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004484
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004485class CSignalsTest(SignalsTest):
4486 io = io
4487
4488class PySignalsTest(SignalsTest):
4489 io = pyio
4490
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004491 # Handling reentrancy issues would slow down _pyio even more, so the
4492 # tests are disabled.
4493 test_reentrant_write_buffered = None
4494 test_reentrant_write_text = None
4495
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004496
Ezio Melottidaa42c72013-03-23 16:30:16 +02004497def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004498 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004499 CBufferedReaderTest, PyBufferedReaderTest,
4500 CBufferedWriterTest, PyBufferedWriterTest,
4501 CBufferedRWPairTest, PyBufferedRWPairTest,
4502 CBufferedRandomTest, PyBufferedRandomTest,
4503 StatefulIncrementalDecoderTest,
4504 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4505 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004506 CMiscIOTest, PyMiscIOTest,
4507 CSignalsTest, PySignalsTest,
4508 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004509
4510 # Put the namespaces of the IO module we are testing and some useful mock
4511 # classes in the __dict__ of each test.
4512 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004513 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4514 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004515 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4516 c_io_ns = {name : getattr(io, name) for name in all_members}
4517 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4518 globs = globals()
4519 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4520 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4521 # Avoid turning open into a bound method.
4522 py_io_ns["open"] = pyio.OpenWrapper
4523 for test in tests:
4524 if test.__name__.startswith("C"):
4525 for name, obj in c_io_ns.items():
4526 setattr(test, name, obj)
4527 elif test.__name__.startswith("Py"):
4528 for name, obj in py_io_ns.items():
4529 setattr(test, name, obj)
4530
Ezio Melottidaa42c72013-03-23 16:30:16 +02004531 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4532 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004533
4534if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004535 unittest.main()