blob: c0d67a17d8c6f1de9232fafa093c2ea0f8f3de1d [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Hai Shie80697d2020-05-28 06:10:27 +080043from test.support import threading_helper
Serhiy Storchakab21d1552018-03-02 11:53:51 +020044from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000045
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000046import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000047import io # C implementation of io
48import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000049
Martin Panter6bb91f32016-05-28 00:41:57 +000050try:
51 import ctypes
52except ImportError:
53 def byteslike(*pos, **kw):
54 return array.array("b", bytes(*pos, **kw))
55else:
56 def byteslike(*pos, **kw):
57 """Create a bytes-like object having no string or sequence methods"""
58 data = bytes(*pos, **kw)
59 obj = EmptyStruct()
60 ctypes.resize(obj, len(data))
61 memoryview(obj).cast("B")[:] = data
62 return obj
63 class EmptyStruct(ctypes.Structure):
64 pass
65
Gregory P. Smithe5796c42018-12-30 20:17:57 -080066_cflags = sysconfig.get_config_var('CFLAGS') or ''
67_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
68MEMORY_SANITIZER = (
69 '-fsanitize=memory' in _cflags or
70 '--with-memory-sanitizer' in _config_args
71)
72
Victor Stinnerbc2aa812019-05-23 03:45:09 +020073# Does io.IOBase finalizer log the exception if the close() method fails?
74# The exception is ignored silently by default in release build.
75IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020076
77
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078def _default_chunk_size():
79 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000080 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000081 return f._CHUNK_SIZE
82
83
Antoine Pitrou328ec742010-09-14 18:37:24 +000084class MockRawIOWithoutRead:
85 """A RawIO implementation without read(), so as to exercise the default
86 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000087
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000088 def __init__(self, read_stack=()):
89 self._read_stack = list(read_stack)
90 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000091 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000092 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000093
Guido van Rossum01a27522007-03-07 01:00:12 +000094 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000095 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000096 return len(b)
97
98 def writable(self):
99 return True
100
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000101 def fileno(self):
102 return 42
103
104 def readable(self):
105 return True
106
Guido van Rossum01a27522007-03-07 01:00:12 +0000107 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000108 return True
109
Guido van Rossum01a27522007-03-07 01:00:12 +0000110 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000111 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000112
113 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000114 return 0 # same comment as above
115
116 def readinto(self, buf):
117 self._reads += 1
118 max_len = len(buf)
119 try:
120 data = self._read_stack[0]
121 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000122 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000123 return 0
124 if data is None:
125 del self._read_stack[0]
126 return None
127 n = len(data)
128 if len(data) <= max_len:
129 del self._read_stack[0]
130 buf[:n] = data
131 return n
132 else:
133 buf[:] = data[:max_len]
134 self._read_stack[0] = data[max_len:]
135 return max_len
136
137 def truncate(self, pos=None):
138 return pos
139
Antoine Pitrou328ec742010-09-14 18:37:24 +0000140class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
141 pass
142
143class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
144 pass
145
146
147class MockRawIO(MockRawIOWithoutRead):
148
149 def read(self, n=None):
150 self._reads += 1
151 try:
152 return self._read_stack.pop(0)
153 except:
154 self._extraneous_reads += 1
155 return b""
156
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000157class CMockRawIO(MockRawIO, io.RawIOBase):
158 pass
159
160class PyMockRawIO(MockRawIO, pyio.RawIOBase):
161 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000162
Guido van Rossuma9e20242007-03-08 00:43:48 +0000163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000164class MisbehavedRawIO(MockRawIO):
165 def write(self, b):
166 return super().write(b) * 2
167
168 def read(self, n=None):
169 return super().read(n) * 2
170
171 def seek(self, pos, whence):
172 return -123
173
174 def tell(self):
175 return -456
176
177 def readinto(self, buf):
178 super().readinto(buf)
179 return len(buf) * 5
180
181class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
182 pass
183
184class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
185 pass
186
187
benfogle9703f092017-11-10 16:03:40 -0500188class SlowFlushRawIO(MockRawIO):
189 def __init__(self):
190 super().__init__()
191 self.in_flush = threading.Event()
192
193 def flush(self):
194 self.in_flush.set()
195 time.sleep(0.25)
196
197class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
198 pass
199
200class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
201 pass
202
203
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000204class CloseFailureIO(MockRawIO):
205 closed = 0
206
207 def close(self):
208 if not self.closed:
209 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200210 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000211
212class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
213 pass
214
215class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
216 pass
217
218
219class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000220
221 def __init__(self, data):
222 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000224
225 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000226 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000227 self.read_history.append(None if res is None else len(res))
228 return res
229
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000230 def readinto(self, b):
231 res = super().readinto(b)
232 self.read_history.append(res)
233 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235class CMockFileIO(MockFileIO, io.BytesIO):
236 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000238class PyMockFileIO(MockFileIO, pyio.BytesIO):
239 pass
240
241
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000242class MockUnseekableIO:
243 def seekable(self):
244 return False
245
246 def seek(self, *args):
247 raise self.UnsupportedOperation("not seekable")
248
249 def tell(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
Martin Panter754aab22016-03-31 07:21:56 +0000252 def truncate(self, *args):
253 raise self.UnsupportedOperation("not seekable")
254
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000255class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
256 UnsupportedOperation = io.UnsupportedOperation
257
258class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
259 UnsupportedOperation = pyio.UnsupportedOperation
260
261
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000262class MockNonBlockWriterIO:
263
264 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000265 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000266 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000268 def pop_written(self):
269 s = b"".join(self._write_stack)
270 self._write_stack[:] = []
271 return s
272
273 def block_on(self, char):
274 """Block when a given char is encountered."""
275 self._blocker_char = char
276
277 def readable(self):
278 return True
279
280 def seekable(self):
281 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000282
Victor Stinnerb589cef2019-06-11 03:10:59 +0200283 def seek(self, pos, whence=0):
284 # naive implementation, enough for tests
285 return 0
286
Guido van Rossum01a27522007-03-07 01:00:12 +0000287 def writable(self):
288 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000289
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000290 def write(self, b):
291 b = bytes(b)
292 n = -1
293 if self._blocker_char:
294 try:
295 n = b.index(self._blocker_char)
296 except ValueError:
297 pass
298 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100299 if n > 0:
300 # write data up to the first blocker
301 self._write_stack.append(b[:n])
302 return n
303 else:
304 # cancel blocker and indicate would block
305 self._blocker_char = None
306 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000307 self._write_stack.append(b)
308 return len(b)
309
310class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
311 BlockingIOError = io.BlockingIOError
312
313class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
314 BlockingIOError = pyio.BlockingIOError
315
Guido van Rossuma9e20242007-03-08 00:43:48 +0000316
Guido van Rossum28524c72007-02-27 05:47:44 +0000317class IOTest(unittest.TestCase):
318
Neal Norwitze7789b12008-03-24 06:18:09 +0000319 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000320 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000321
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000322 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000323 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000324
Guido van Rossum28524c72007-02-27 05:47:44 +0000325 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000326 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000327 f.truncate(0)
328 self.assertEqual(f.tell(), 5)
329 f.seek(0)
330
331 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.seek(0), 0)
333 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000334 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000335 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000336 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000337 buffer = bytearray(b" world\n\n\n")
338 self.assertEqual(f.write(buffer), 9)
339 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000340 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000341 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000342 self.assertEqual(f.seek(-1, 2), 13)
343 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000344
Guido van Rossum87429772007-04-10 21:06:59 +0000345 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000346 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000347 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000348
Guido van Rossum9b76da62007-04-11 01:09:03 +0000349 def read_ops(self, f, buffered=False):
350 data = f.read(5)
351 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000352 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000353 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000354 self.assertEqual(bytes(data), b" worl")
355 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000356 self.assertEqual(f.readinto(data), 2)
357 self.assertEqual(len(data), 5)
358 self.assertEqual(data[:2], b"d\n")
359 self.assertEqual(f.seek(0), 0)
360 self.assertEqual(f.read(20), b"hello world\n")
361 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000362 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000363 self.assertEqual(f.seek(-6, 2), 6)
364 self.assertEqual(f.read(5), b"world")
365 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000366 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000367 self.assertEqual(f.seek(-6, 1), 5)
368 self.assertEqual(f.read(5), b" worl")
369 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000370 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000371 if buffered:
372 f.seek(0)
373 self.assertEqual(f.read(), b"hello world\n")
374 f.seek(6)
375 self.assertEqual(f.read(), b"world\n")
376 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000377 f.seek(0)
378 data = byteslike(5)
379 self.assertEqual(f.readinto1(data), 5)
380 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000381
Guido van Rossum34d69e52007-04-10 20:08:41 +0000382 LARGE = 2**31
383
Guido van Rossum53807da2007-04-10 19:01:47 +0000384 def large_file_ops(self, f):
385 assert f.readable()
386 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100387 try:
388 self.assertEqual(f.seek(self.LARGE), self.LARGE)
389 except (OverflowError, ValueError):
390 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000391 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000392 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000393 self.assertEqual(f.tell(), self.LARGE + 3)
394 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000395 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000396 self.assertEqual(f.tell(), self.LARGE + 2)
397 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000398 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000399 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000400 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
401 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000402 self.assertEqual(f.read(2), b"x")
403
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000404 def test_invalid_operations(self):
405 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000406 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000407 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000408 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000409 self.assertRaises(exc, fp.read)
410 self.assertRaises(exc, fp.readline)
411 with self.open(support.TESTFN, "wb", buffering=0) as fp:
412 self.assertRaises(exc, fp.read)
413 self.assertRaises(exc, fp.readline)
414 with self.open(support.TESTFN, "rb", buffering=0) as fp:
415 self.assertRaises(exc, fp.write, b"blah")
416 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000417 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, b"blah")
419 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000420 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000421 self.assertRaises(exc, fp.write, "blah")
422 self.assertRaises(exc, fp.writelines, ["blah\n"])
423 # Non-zero seeking from current or end pos
424 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
425 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000426
Martin Panter754aab22016-03-31 07:21:56 +0000427 def test_optional_abilities(self):
428 # Test for OSError when optional APIs are not supported
429 # The purpose of this test is to try fileno(), reading, writing and
430 # seeking operations with various objects that indicate they do not
431 # support these operations.
432
433 def pipe_reader():
434 [r, w] = os.pipe()
435 os.close(w) # So that read() is harmless
436 return self.FileIO(r, "r")
437
438 def pipe_writer():
439 [r, w] = os.pipe()
440 self.addCleanup(os.close, r)
441 # Guarantee that we can write into the pipe without blocking
442 thread = threading.Thread(target=os.read, args=(r, 100))
443 thread.start()
444 self.addCleanup(thread.join)
445 return self.FileIO(w, "w")
446
447 def buffered_reader():
448 return self.BufferedReader(self.MockUnseekableIO())
449
450 def buffered_writer():
451 return self.BufferedWriter(self.MockUnseekableIO())
452
453 def buffered_random():
454 return self.BufferedRandom(self.BytesIO())
455
456 def buffered_rw_pair():
457 return self.BufferedRWPair(self.MockUnseekableIO(),
458 self.MockUnseekableIO())
459
460 def text_reader():
461 class UnseekableReader(self.MockUnseekableIO):
462 writable = self.BufferedIOBase.writable
463 write = self.BufferedIOBase.write
464 return self.TextIOWrapper(UnseekableReader(), "ascii")
465
466 def text_writer():
467 class UnseekableWriter(self.MockUnseekableIO):
468 readable = self.BufferedIOBase.readable
469 read = self.BufferedIOBase.read
470 return self.TextIOWrapper(UnseekableWriter(), "ascii")
471
472 tests = (
473 (pipe_reader, "fr"), (pipe_writer, "fw"),
474 (buffered_reader, "r"), (buffered_writer, "w"),
475 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
476 (text_reader, "r"), (text_writer, "w"),
477 (self.BytesIO, "rws"), (self.StringIO, "rws"),
478 )
479 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000480 with self.subTest(test), test() as obj:
481 readable = "r" in abilities
482 self.assertEqual(obj.readable(), readable)
483 writable = "w" in abilities
484 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000485
486 if isinstance(obj, self.TextIOBase):
487 data = "3"
488 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
489 data = b"3"
490 else:
491 self.fail("Unknown base class")
492
493 if "f" in abilities:
494 obj.fileno()
495 else:
496 self.assertRaises(OSError, obj.fileno)
497
498 if readable:
499 obj.read(1)
500 obj.read()
501 else:
502 self.assertRaises(OSError, obj.read, 1)
503 self.assertRaises(OSError, obj.read)
504
505 if writable:
506 obj.write(data)
507 else:
508 self.assertRaises(OSError, obj.write, data)
509
Martin Panter3ee147f2016-03-31 21:05:31 +0000510 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000511 pipe_reader, pipe_writer):
512 # Pipes seem to appear as seekable on Windows
513 continue
514 seekable = "s" in abilities
515 self.assertEqual(obj.seekable(), seekable)
516
Martin Panter754aab22016-03-31 07:21:56 +0000517 if seekable:
518 obj.tell()
519 obj.seek(0)
520 else:
521 self.assertRaises(OSError, obj.tell)
522 self.assertRaises(OSError, obj.seek, 0)
523
524 if writable and seekable:
525 obj.truncate()
526 obj.truncate(0)
527 else:
528 self.assertRaises(OSError, obj.truncate)
529 self.assertRaises(OSError, obj.truncate, 0)
530
Antoine Pitrou13348842012-01-29 18:36:34 +0100531 def test_open_handles_NUL_chars(self):
532 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300533 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100534
535 bytes_fn = bytes(fn_with_NUL, 'ascii')
536 with warnings.catch_warnings():
537 warnings.simplefilter("ignore", DeprecationWarning)
538 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100539
Guido van Rossum28524c72007-02-27 05:47:44 +0000540 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000541 with self.open(support.TESTFN, "wb", buffering=0) as f:
542 self.assertEqual(f.readable(), False)
543 self.assertEqual(f.writable(), True)
544 self.assertEqual(f.seekable(), True)
545 self.write_ops(f)
546 with self.open(support.TESTFN, "rb", buffering=0) as f:
547 self.assertEqual(f.readable(), True)
548 self.assertEqual(f.writable(), False)
549 self.assertEqual(f.seekable(), True)
550 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000551
Guido van Rossum87429772007-04-10 21:06:59 +0000552 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000553 with self.open(support.TESTFN, "wb") as f:
554 self.assertEqual(f.readable(), False)
555 self.assertEqual(f.writable(), True)
556 self.assertEqual(f.seekable(), True)
557 self.write_ops(f)
558 with self.open(support.TESTFN, "rb") as f:
559 self.assertEqual(f.readable(), True)
560 self.assertEqual(f.writable(), False)
561 self.assertEqual(f.seekable(), True)
562 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000563
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000564 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000565 with self.open(support.TESTFN, "wb") as f:
566 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
567 with self.open(support.TESTFN, "rb") as f:
568 self.assertEqual(f.readline(), b"abc\n")
569 self.assertEqual(f.readline(10), b"def\n")
570 self.assertEqual(f.readline(2), b"xy")
571 self.assertEqual(f.readline(4), b"zzy\n")
572 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000573 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000574 self.assertRaises(TypeError, f.readline, 5.3)
575 with self.open(support.TESTFN, "r") as f:
576 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000577
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300578 def test_readline_nonsizeable(self):
579 # Issue #30061
580 # Crash when readline() returns an object without __len__
581 class R(self.IOBase):
582 def readline(self):
583 return None
584 self.assertRaises((TypeError, StopIteration), next, R())
585
586 def test_next_nonsizeable(self):
587 # Issue #30061
588 # Crash when __next__() returns an object without __len__
589 class R(self.IOBase):
590 def __next__(self):
591 return None
592 self.assertRaises(TypeError, R().readlines, 1)
593
Guido van Rossum28524c72007-02-27 05:47:44 +0000594 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000595 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000596 self.write_ops(f)
597 data = f.getvalue()
598 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000599 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000600 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000601
Guido van Rossum53807da2007-04-10 19:01:47 +0000602 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300603 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800604 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000605 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200606 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600607 support.requires(
608 'largefile',
609 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000610 with self.open(support.TESTFN, "w+b", 0) as f:
611 self.large_file_ops(f)
612 with self.open(support.TESTFN, "w+b") as f:
613 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000614
615 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300616 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000617 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000618 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000619 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000620 self.assertEqual(f.closed, True)
621 f = None
622 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000623 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000624 1/0
625 except ZeroDivisionError:
626 self.assertEqual(f.closed, True)
627 else:
628 self.fail("1/0 didn't raise an exception")
629
Antoine Pitrou08838b62009-01-21 00:55:13 +0000630 # issue 5008
631 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000632 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000633 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000634 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000635 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000636 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000637 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000638 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300639 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000640
Guido van Rossum87429772007-04-10 21:06:59 +0000641 def test_destructor(self):
642 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000643 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000644 def __del__(self):
645 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000646 try:
647 f = super().__del__
648 except AttributeError:
649 pass
650 else:
651 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000652 def close(self):
653 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000655 def flush(self):
656 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000657 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000658 with support.check_warnings(('', ResourceWarning)):
659 f = MyFileIO(support.TESTFN, "wb")
660 f.write(b"xxx")
661 del f
662 support.gc_collect()
663 self.assertEqual(record, [1, 2, 3])
664 with self.open(support.TESTFN, "rb") as f:
665 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000666
667 def _check_base_destructor(self, base):
668 record = []
669 class MyIO(base):
670 def __init__(self):
671 # This exercises the availability of attributes on object
672 # destruction.
673 # (in the C version, close() is called by the tp_dealloc
674 # function, not by __del__)
675 self.on_del = 1
676 self.on_close = 2
677 self.on_flush = 3
678 def __del__(self):
679 record.append(self.on_del)
680 try:
681 f = super().__del__
682 except AttributeError:
683 pass
684 else:
685 f()
686 def close(self):
687 record.append(self.on_close)
688 super().close()
689 def flush(self):
690 record.append(self.on_flush)
691 super().flush()
692 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000693 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000694 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000695 self.assertEqual(record, [1, 2, 3])
696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000697 def test_IOBase_destructor(self):
698 self._check_base_destructor(self.IOBase)
699
700 def test_RawIOBase_destructor(self):
701 self._check_base_destructor(self.RawIOBase)
702
703 def test_BufferedIOBase_destructor(self):
704 self._check_base_destructor(self.BufferedIOBase)
705
706 def test_TextIOBase_destructor(self):
707 self._check_base_destructor(self.TextIOBase)
708
Guido van Rossum87429772007-04-10 21:06:59 +0000709 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000710 with self.open(support.TESTFN, "wb") as f:
711 f.write(b"xxx")
712 with self.open(support.TESTFN, "rb") as f:
713 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000714
Guido van Rossumd4103952007-04-12 05:44:49 +0000715 def test_array_writes(self):
716 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000717 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000718 def check(f):
719 with f:
720 self.assertEqual(f.write(a), n)
721 f.writelines((a,))
722 check(self.BytesIO())
723 check(self.FileIO(support.TESTFN, "w"))
724 check(self.BufferedWriter(self.MockRawIO()))
725 check(self.BufferedRandom(self.MockRawIO()))
726 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000727
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000728 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000730 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000731
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 def test_read_closed(self):
733 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000734 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000735 with self.open(support.TESTFN, "r") as f:
736 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000737 self.assertEqual(file.read(), "egg\n")
738 file.seek(0)
739 file.close()
740 self.assertRaises(ValueError, file.read)
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100741 with self.open(support.TESTFN, "rb") as f:
742 file = self.open(f.fileno(), "rb", closefd=False)
743 self.assertEqual(file.read()[:3], b"egg")
744 file.close()
745 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000746
747 def test_no_closefd_with_filename(self):
748 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000749 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000750
751 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000752 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000753 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000754 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000755 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000756 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000757 self.assertEqual(file.buffer.raw.closefd, False)
758
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000759 def test_garbage_collection(self):
760 # FileIO objects are collected, and collecting them flushes
761 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000762 with support.check_warnings(('', ResourceWarning)):
763 f = self.FileIO(support.TESTFN, "wb")
764 f.write(b"abcxxx")
765 f.f = f
766 wr = weakref.ref(f)
767 del f
768 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300769 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000770 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000771 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000772
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000773 def test_unbounded_file(self):
774 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
775 zero = "/dev/zero"
776 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000777 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000778 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000779 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000780 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800781 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000782 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000783 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000784 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000785 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000786 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000787 self.assertRaises(OverflowError, f.read)
788
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200789 def check_flush_error_on_close(self, *args, **kwargs):
790 # Test that the file is closed despite failed flush
791 # and that flush() is called before file closed.
792 f = self.open(*args, **kwargs)
793 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000794 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200795 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200796 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000797 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200798 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600799 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200800 self.assertTrue(closed) # flush() called
801 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200802 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200803
804 def test_flush_error_on_close(self):
805 # raw file
806 # Issue #5700: io.FileIO calls flush() after file closed
807 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
808 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
809 self.check_flush_error_on_close(fd, 'wb', buffering=0)
810 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
811 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
812 os.close(fd)
813 # buffered io
814 self.check_flush_error_on_close(support.TESTFN, 'wb')
815 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
816 self.check_flush_error_on_close(fd, 'wb')
817 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
818 self.check_flush_error_on_close(fd, 'wb', closefd=False)
819 os.close(fd)
820 # text io
821 self.check_flush_error_on_close(support.TESTFN, 'w')
822 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
823 self.check_flush_error_on_close(fd, 'w')
824 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
825 self.check_flush_error_on_close(fd, 'w', closefd=False)
826 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000827
828 def test_multi_close(self):
829 f = self.open(support.TESTFN, "wb", buffering=0)
830 f.close()
831 f.close()
832 f.close()
833 self.assertRaises(ValueError, f.flush)
834
Antoine Pitrou328ec742010-09-14 18:37:24 +0000835 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530836 # Exercise the default limited RawIOBase.read(n) implementation (which
837 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000838 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
839 self.assertEqual(rawio.read(2), b"ab")
840 self.assertEqual(rawio.read(2), b"c")
841 self.assertEqual(rawio.read(2), b"d")
842 self.assertEqual(rawio.read(2), None)
843 self.assertEqual(rawio.read(2), b"ef")
844 self.assertEqual(rawio.read(2), b"g")
845 self.assertEqual(rawio.read(2), None)
846 self.assertEqual(rawio.read(2), b"")
847
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400848 def test_types_have_dict(self):
849 test = (
850 self.IOBase(),
851 self.RawIOBase(),
852 self.TextIOBase(),
853 self.StringIO(),
854 self.BytesIO()
855 )
856 for obj in test:
857 self.assertTrue(hasattr(obj, "__dict__"))
858
Ross Lagerwall59142db2011-10-31 20:34:46 +0200859 def test_opener(self):
860 with self.open(support.TESTFN, "w") as f:
861 f.write("egg\n")
862 fd = os.open(support.TESTFN, os.O_RDONLY)
863 def opener(path, flags):
864 return fd
865 with self.open("non-existent", "r", opener=opener) as f:
866 self.assertEqual(f.read(), "egg\n")
867
Barry Warsaw480e2852016-06-08 17:47:26 -0400868 def test_bad_opener_negative_1(self):
869 # Issue #27066.
870 def badopener(fname, flags):
871 return -1
872 with self.assertRaises(ValueError) as cm:
873 open('non-existent', 'r', opener=badopener)
874 self.assertEqual(str(cm.exception), 'opener returned -1')
875
876 def test_bad_opener_other_negative(self):
877 # Issue #27066.
878 def badopener(fname, flags):
879 return -2
880 with self.assertRaises(ValueError) as cm:
881 open('non-existent', 'r', opener=badopener)
882 self.assertEqual(str(cm.exception), 'opener returned -2')
883
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200884 def test_fileio_closefd(self):
885 # Issue #4841
886 with self.open(__file__, 'rb') as f1, \
887 self.open(__file__, 'rb') as f2:
888 fileio = self.FileIO(f1.fileno(), closefd=False)
889 # .__init__() must not close f1
890 fileio.__init__(f2.fileno(), closefd=False)
891 f1.readline()
892 # .close() must not close f2
893 fileio.close()
894 f2.readline()
895
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300896 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200897 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300898 with self.assertRaises(ValueError):
899 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300900
901 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200902 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300903 with self.assertRaises(ValueError):
904 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300905
Martin Panter6bb91f32016-05-28 00:41:57 +0000906 def test_buffered_readinto_mixin(self):
907 # Test the implementation provided by BufferedIOBase
908 class Stream(self.BufferedIOBase):
909 def read(self, size):
910 return b"12345"
911 read1 = read
912 stream = Stream()
913 for method in ("readinto", "readinto1"):
914 with self.subTest(method):
915 buffer = byteslike(5)
916 self.assertEqual(getattr(stream, method)(buffer), 5)
917 self.assertEqual(bytes(buffer), b"12345")
918
Ethan Furmand62548a2016-06-04 14:38:43 -0700919 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700920 def check_path_succeeds(path):
921 with self.open(path, "w") as f:
922 f.write("egg\n")
923
924 with self.open(path, "r") as f:
925 self.assertEqual(f.read(), "egg\n")
926
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200927 check_path_succeeds(FakePath(support.TESTFN))
928 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700929
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200930 with self.open(support.TESTFN, "w") as f:
931 bad_path = FakePath(f.fileno())
932 with self.assertRaises(TypeError):
933 self.open(bad_path, 'w')
934
935 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700936 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700937 self.open(bad_path, 'w')
938
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200939 bad_path = FakePath(FloatingPointError)
940 with self.assertRaises(FloatingPointError):
941 self.open(bad_path, 'w')
942
Ethan Furmand62548a2016-06-04 14:38:43 -0700943 # ensure that refcounting is correct with some error conditions
944 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200945 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700946
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530947 def test_RawIOBase_readall(self):
948 # Exercise the default unlimited RawIOBase.read() and readall()
949 # implementations.
950 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
951 self.assertEqual(rawio.read(), b"abcdefg")
952 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
953 self.assertEqual(rawio.readall(), b"abcdefg")
954
955 def test_BufferedIOBase_readinto(self):
956 # Exercise the default BufferedIOBase.readinto() and readinto1()
957 # implementations (which call read() or read1() internally).
958 class Reader(self.BufferedIOBase):
959 def __init__(self, avail):
960 self.avail = avail
961 def read(self, size):
962 result = self.avail[:size]
963 self.avail = self.avail[size:]
964 return result
965 def read1(self, size):
966 """Returns no more than 5 bytes at once"""
967 return self.read(min(size, 5))
968 tests = (
969 # (test method, total data available, read buffer size, expected
970 # read size)
971 ("readinto", 10, 5, 5),
972 ("readinto", 10, 6, 6), # More than read1() can return
973 ("readinto", 5, 6, 5), # Buffer larger than total available
974 ("readinto", 6, 7, 6),
975 ("readinto", 10, 0, 0), # Empty buffer
976 ("readinto1", 10, 5, 5), # Result limited to single read1() call
977 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
978 ("readinto1", 5, 6, 5), # Buffer larger than total available
979 ("readinto1", 6, 7, 5),
980 ("readinto1", 10, 0, 0), # Empty buffer
981 )
982 UNUSED_BYTE = 0x81
983 for test in tests:
984 with self.subTest(test):
985 method, avail, request, result = test
986 reader = Reader(bytes(range(avail)))
987 buffer = bytearray((UNUSED_BYTE,) * request)
988 method = getattr(reader, method)
989 self.assertEqual(method(buffer), result)
990 self.assertEqual(len(buffer), request)
991 self.assertSequenceEqual(buffer[:result], range(result))
992 unused = (UNUSED_BYTE,) * (request - result)
993 self.assertSequenceEqual(buffer[result:], unused)
994 self.assertEqual(len(reader.avail), avail - result)
995
Zackery Spytz28f07362018-07-17 00:31:44 -0600996 def test_close_assert(self):
997 class R(self.IOBase):
998 def __setattr__(self, name, value):
999 pass
1000 def flush(self):
1001 raise OSError()
1002 f = R()
1003 # This would cause an assertion failure.
1004 self.assertRaises(OSError, f.close)
1005
Victor Stinner472f7942019-04-12 21:58:24 +02001006 # Silence destructor error
1007 R.flush = lambda self: None
1008
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001009
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001010class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001011
1012 def test_IOBase_finalize(self):
1013 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1014 # class which inherits IOBase and an object of this class are caught
1015 # in a reference cycle and close() is already in the method cache.
1016 class MyIO(self.IOBase):
1017 def close(self):
1018 pass
1019
1020 # create an instance to populate the method cache
1021 MyIO()
1022 obj = MyIO()
1023 obj.obj = obj
1024 wr = weakref.ref(obj)
1025 del MyIO
1026 del obj
1027 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001028 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001029
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001030class PyIOTest(IOTest):
1031 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001032
Guido van Rossuma9e20242007-03-08 00:43:48 +00001033
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001034@support.cpython_only
1035class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001036
Gregory P. Smith054b0652015-04-14 12:58:05 -07001037 def test_RawIOBase_io_in_pyio_match(self):
1038 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001039 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1040 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001041 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1042
1043 def test_RawIOBase_pyio_in_io_match(self):
1044 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1045 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1046 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1047
1048
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001049class CommonBufferedTests:
1050 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1051
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001052 def test_detach(self):
1053 raw = self.MockRawIO()
1054 buf = self.tp(raw)
1055 self.assertIs(buf.detach(), raw)
1056 self.assertRaises(ValueError, buf.detach)
1057
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001058 repr(buf) # Should still work
1059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_fileno(self):
1061 rawio = self.MockRawIO()
1062 bufio = self.tp(rawio)
1063
Ezio Melottib3aedd42010-11-20 19:04:17 +00001064 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066 def test_invalid_args(self):
1067 rawio = self.MockRawIO()
1068 bufio = self.tp(rawio)
1069 # Invalid whence
1070 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001071 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001072
1073 def test_override_destructor(self):
1074 tp = self.tp
1075 record = []
1076 class MyBufferedIO(tp):
1077 def __del__(self):
1078 record.append(1)
1079 try:
1080 f = super().__del__
1081 except AttributeError:
1082 pass
1083 else:
1084 f()
1085 def close(self):
1086 record.append(2)
1087 super().close()
1088 def flush(self):
1089 record.append(3)
1090 super().flush()
1091 rawio = self.MockRawIO()
1092 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001094 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001095 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001096
1097 def test_context_manager(self):
1098 # Test usability as a context manager
1099 rawio = self.MockRawIO()
1100 bufio = self.tp(rawio)
1101 def _with():
1102 with bufio:
1103 pass
1104 _with()
1105 # bufio should now be closed, and using it a second time should raise
1106 # a ValueError.
1107 self.assertRaises(ValueError, _with)
1108
1109 def test_error_through_destructor(self):
1110 # Test that the exception state is not modified by a destructor,
1111 # even if close() fails.
1112 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001113 with support.catch_unraisable_exception() as cm:
1114 with self.assertRaises(AttributeError):
1115 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001116
1117 if not IOBASE_EMITS_UNRAISABLE:
1118 self.assertIsNone(cm.unraisable)
1119 elif cm.unraisable is not None:
1120 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001121
Antoine Pitrou716c4442009-05-23 19:04:03 +00001122 def test_repr(self):
1123 raw = self.MockRawIO()
1124 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001125 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1126 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001127 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001128 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001129 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001130 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001131
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001132 def test_recursive_repr(self):
1133 # Issue #25455
1134 raw = self.MockRawIO()
1135 b = self.tp(raw)
1136 with support.swap_attr(raw, 'name', b):
1137 try:
1138 repr(b) # Should not crash
1139 except RuntimeError:
1140 pass
1141
Antoine Pitrou6be88762010-05-03 16:48:20 +00001142 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001143 # Test that buffered file is closed despite failed flush
1144 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001145 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001146 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001147 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001148 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001149 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001150 raw.flush = bad_flush
1151 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001152 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001153 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001154 self.assertTrue(raw.closed)
1155 self.assertTrue(closed) # flush() called
1156 self.assertFalse(closed[0]) # flush() called before file closed
1157 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001158 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001159
1160 def test_close_error_on_close(self):
1161 raw = self.MockRawIO()
1162 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001163 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001164 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001165 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001166 raw.close = bad_close
1167 b = self.tp(raw)
1168 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001169 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001170 b.close()
1171 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001172 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001173 self.assertEqual(err.exception.__context__.args, ('flush',))
1174 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001175
Victor Stinner472f7942019-04-12 21:58:24 +02001176 # Silence destructor error
1177 raw.close = lambda: None
1178 b.flush = lambda: None
1179
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001180 def test_nonnormalized_close_error_on_close(self):
1181 # Issue #21677
1182 raw = self.MockRawIO()
1183 def bad_flush():
1184 raise non_existing_flush
1185 def bad_close():
1186 raise non_existing_close
1187 raw.close = bad_close
1188 b = self.tp(raw)
1189 b.flush = bad_flush
1190 with self.assertRaises(NameError) as err: # exception not swallowed
1191 b.close()
1192 self.assertIn('non_existing_close', str(err.exception))
1193 self.assertIsInstance(err.exception.__context__, NameError)
1194 self.assertIn('non_existing_flush', str(err.exception.__context__))
1195 self.assertFalse(b.closed)
1196
Victor Stinner472f7942019-04-12 21:58:24 +02001197 # Silence destructor error
1198 b.flush = lambda: None
1199 raw.close = lambda: None
1200
Antoine Pitrou6be88762010-05-03 16:48:20 +00001201 def test_multi_close(self):
1202 raw = self.MockRawIO()
1203 b = self.tp(raw)
1204 b.close()
1205 b.close()
1206 b.close()
1207 self.assertRaises(ValueError, b.flush)
1208
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001209 def test_unseekable(self):
1210 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1211 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1212 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1213
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001214 def test_readonly_attributes(self):
1215 raw = self.MockRawIO()
1216 buf = self.tp(raw)
1217 x = self.MockRawIO()
1218 with self.assertRaises(AttributeError):
1219 buf.raw = x
1220
Guido van Rossum78892e42007-04-06 17:31:18 +00001221
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001222class SizeofTest:
1223
1224 @support.cpython_only
1225 def test_sizeof(self):
1226 bufsize1 = 4096
1227 bufsize2 = 8192
1228 rawio = self.MockRawIO()
1229 bufio = self.tp(rawio, buffer_size=bufsize1)
1230 size = sys.getsizeof(bufio) - bufsize1
1231 rawio = self.MockRawIO()
1232 bufio = self.tp(rawio, buffer_size=bufsize2)
1233 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1234
Jesus Ceadc469452012-10-04 12:37:56 +02001235 @support.cpython_only
1236 def test_buffer_freeing(self) :
1237 bufsize = 4096
1238 rawio = self.MockRawIO()
1239 bufio = self.tp(rawio, buffer_size=bufsize)
1240 size = sys.getsizeof(bufio) - bufsize
1241 bufio.close()
1242 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1245 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001246
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001247 def test_constructor(self):
1248 rawio = self.MockRawIO([b"abc"])
1249 bufio = self.tp(rawio)
1250 bufio.__init__(rawio)
1251 bufio.__init__(rawio, buffer_size=1024)
1252 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001253 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001254 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1255 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1256 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1257 rawio = self.MockRawIO([b"abc"])
1258 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001259 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001260
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001261 def test_uninitialized(self):
1262 bufio = self.tp.__new__(self.tp)
1263 del bufio
1264 bufio = self.tp.__new__(self.tp)
1265 self.assertRaisesRegex((ValueError, AttributeError),
1266 'uninitialized|has no attribute',
1267 bufio.read, 0)
1268 bufio.__init__(self.MockRawIO())
1269 self.assertEqual(bufio.read(0), b'')
1270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001272 for arg in (None, 7):
1273 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1274 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001275 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001276 # Invalid args
1277 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001278
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001279 def test_read1(self):
1280 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1281 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001282 self.assertEqual(b"a", bufio.read(1))
1283 self.assertEqual(b"b", bufio.read1(1))
1284 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001285 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001286 self.assertEqual(b"c", bufio.read1(100))
1287 self.assertEqual(rawio._reads, 1)
1288 self.assertEqual(b"d", bufio.read1(100))
1289 self.assertEqual(rawio._reads, 2)
1290 self.assertEqual(b"efg", bufio.read1(100))
1291 self.assertEqual(rawio._reads, 3)
1292 self.assertEqual(b"", bufio.read1(100))
1293 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001294
1295 def test_read1_arbitrary(self):
1296 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1297 bufio = self.tp(rawio)
1298 self.assertEqual(b"a", bufio.read(1))
1299 self.assertEqual(b"bc", bufio.read1())
1300 self.assertEqual(b"d", bufio.read1())
1301 self.assertEqual(b"efg", bufio.read1(-1))
1302 self.assertEqual(rawio._reads, 3)
1303 self.assertEqual(b"", bufio.read1())
1304 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001305
1306 def test_readinto(self):
1307 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1308 bufio = self.tp(rawio)
1309 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001310 self.assertEqual(bufio.readinto(b), 2)
1311 self.assertEqual(b, b"ab")
1312 self.assertEqual(bufio.readinto(b), 2)
1313 self.assertEqual(b, b"cd")
1314 self.assertEqual(bufio.readinto(b), 2)
1315 self.assertEqual(b, b"ef")
1316 self.assertEqual(bufio.readinto(b), 1)
1317 self.assertEqual(b, b"gf")
1318 self.assertEqual(bufio.readinto(b), 0)
1319 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001320 rawio = self.MockRawIO((b"abc", None))
1321 bufio = self.tp(rawio)
1322 self.assertEqual(bufio.readinto(b), 2)
1323 self.assertEqual(b, b"ab")
1324 self.assertEqual(bufio.readinto(b), 1)
1325 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001326
Benjamin Petersona96fea02014-06-22 14:17:44 -07001327 def test_readinto1(self):
1328 buffer_size = 10
1329 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1330 bufio = self.tp(rawio, buffer_size=buffer_size)
1331 b = bytearray(2)
1332 self.assertEqual(bufio.peek(3), b'abc')
1333 self.assertEqual(rawio._reads, 1)
1334 self.assertEqual(bufio.readinto1(b), 2)
1335 self.assertEqual(b, b"ab")
1336 self.assertEqual(rawio._reads, 1)
1337 self.assertEqual(bufio.readinto1(b), 1)
1338 self.assertEqual(b[:1], b"c")
1339 self.assertEqual(rawio._reads, 1)
1340 self.assertEqual(bufio.readinto1(b), 2)
1341 self.assertEqual(b, b"de")
1342 self.assertEqual(rawio._reads, 2)
1343 b = bytearray(2*buffer_size)
1344 self.assertEqual(bufio.peek(3), b'fgh')
1345 self.assertEqual(rawio._reads, 3)
1346 self.assertEqual(bufio.readinto1(b), 6)
1347 self.assertEqual(b[:6], b"fghjkl")
1348 self.assertEqual(rawio._reads, 4)
1349
1350 def test_readinto_array(self):
1351 buffer_size = 60
1352 data = b"a" * 26
1353 rawio = self.MockRawIO((data,))
1354 bufio = self.tp(rawio, buffer_size=buffer_size)
1355
1356 # Create an array with element size > 1 byte
1357 b = array.array('i', b'x' * 32)
1358 assert len(b) != 16
1359
1360 # Read into it. We should get as many *bytes* as we can fit into b
1361 # (which is more than the number of elements)
1362 n = bufio.readinto(b)
1363 self.assertGreater(n, len(b))
1364
1365 # Check that old contents of b are preserved
1366 bm = memoryview(b).cast('B')
1367 self.assertLess(n, len(bm))
1368 self.assertEqual(bm[:n], data[:n])
1369 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1370
1371 def test_readinto1_array(self):
1372 buffer_size = 60
1373 data = b"a" * 26
1374 rawio = self.MockRawIO((data,))
1375 bufio = self.tp(rawio, buffer_size=buffer_size)
1376
1377 # Create an array with element size > 1 byte
1378 b = array.array('i', b'x' * 32)
1379 assert len(b) != 16
1380
1381 # Read into it. We should get as many *bytes* as we can fit into b
1382 # (which is more than the number of elements)
1383 n = bufio.readinto1(b)
1384 self.assertGreater(n, len(b))
1385
1386 # Check that old contents of b are preserved
1387 bm = memoryview(b).cast('B')
1388 self.assertLess(n, len(bm))
1389 self.assertEqual(bm[:n], data[:n])
1390 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1391
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001392 def test_readlines(self):
1393 def bufio():
1394 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1395 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001396 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1397 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1398 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001399
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001400 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001401 data = b"abcdefghi"
1402 dlen = len(data)
1403
1404 tests = [
1405 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1406 [ 100, [ 3, 3, 3], [ dlen ] ],
1407 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1408 ]
1409
1410 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411 rawio = self.MockFileIO(data)
1412 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001413 pos = 0
1414 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001415 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001416 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001417 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001418 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001419
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001420 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001421 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001422 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1423 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001424 self.assertEqual(b"abcd", bufio.read(6))
1425 self.assertEqual(b"e", bufio.read(1))
1426 self.assertEqual(b"fg", bufio.read())
1427 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001428 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001429 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001430
Victor Stinnera80987f2011-05-25 22:47:16 +02001431 rawio = self.MockRawIO((b"a", None, None))
1432 self.assertEqual(b"a", rawio.readall())
1433 self.assertIsNone(rawio.readall())
1434
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435 def test_read_past_eof(self):
1436 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1437 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001438
Ezio Melottib3aedd42010-11-20 19:04:17 +00001439 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001440
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001441 def test_read_all(self):
1442 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1443 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001444
Ezio Melottib3aedd42010-11-20 19:04:17 +00001445 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001446
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001447 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001448 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001449 try:
1450 # Write out many bytes with exactly the same number of 0's,
1451 # 1's... 255's. This will help us check that concurrent reading
1452 # doesn't duplicate or forget contents.
1453 N = 1000
1454 l = list(range(256)) * N
1455 random.shuffle(l)
1456 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001457 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001458 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001459 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001460 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001461 errors = []
1462 results = []
1463 def f():
1464 try:
1465 # Intra-buffer read then buffer-flushing read
1466 for n in cycle([1, 19]):
1467 s = bufio.read(n)
1468 if not s:
1469 break
1470 # list.append() is atomic
1471 results.append(s)
1472 except Exception as e:
1473 errors.append(e)
1474 raise
1475 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001476 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001477 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001478 self.assertFalse(errors,
1479 "the following exceptions were caught: %r" % errors)
1480 s = b''.join(results)
1481 for i in range(256):
1482 c = bytes(bytearray([i]))
1483 self.assertEqual(s.count(c), N)
1484 finally:
1485 support.unlink(support.TESTFN)
1486
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001487 def test_unseekable(self):
1488 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1489 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1490 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1491 bufio.read(1)
1492 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1493 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1494
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001495 def test_misbehaved_io(self):
1496 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1497 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001498 self.assertRaises(OSError, bufio.seek, 0)
1499 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001500
Victor Stinnerb589cef2019-06-11 03:10:59 +02001501 # Silence destructor error
1502 bufio.close = lambda: None
1503
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001504 def test_no_extraneous_read(self):
1505 # Issue #9550; when the raw IO object has satisfied the read request,
1506 # we should not issue any additional reads, otherwise it may block
1507 # (e.g. socket).
1508 bufsize = 16
1509 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1510 rawio = self.MockRawIO([b"x" * n])
1511 bufio = self.tp(rawio, bufsize)
1512 self.assertEqual(bufio.read(n), b"x" * n)
1513 # Simple case: one raw read is enough to satisfy the request.
1514 self.assertEqual(rawio._extraneous_reads, 0,
1515 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1516 # A more complex case where two raw reads are needed to satisfy
1517 # the request.
1518 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1519 bufio = self.tp(rawio, bufsize)
1520 self.assertEqual(bufio.read(n), b"x" * n)
1521 self.assertEqual(rawio._extraneous_reads, 0,
1522 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1523
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001524 def test_read_on_closed(self):
1525 # Issue #23796
1526 b = io.BufferedReader(io.BytesIO(b"12"))
1527 b.read(1)
1528 b.close()
1529 self.assertRaises(ValueError, b.peek)
1530 self.assertRaises(ValueError, b.read1, 1)
1531
Berker Peksagfd5116c2020-02-21 20:57:26 +03001532 def test_truncate_on_read_only(self):
1533 rawio = self.MockFileIO(b"abc")
1534 bufio = self.tp(rawio)
1535 self.assertFalse(bufio.writable())
1536 self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1537 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1538
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001539
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001540class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541 tp = io.BufferedReader
1542
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001543 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1544 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001545 def test_constructor(self):
1546 BufferedReaderTest.test_constructor(self)
1547 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001548 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001549 if sys.maxsize > 0x7FFFFFFF:
1550 rawio = self.MockRawIO()
1551 bufio = self.tp(rawio)
1552 self.assertRaises((OverflowError, MemoryError, ValueError),
1553 bufio.__init__, rawio, sys.maxsize)
1554
1555 def test_initialization(self):
1556 rawio = self.MockRawIO([b"abc"])
1557 bufio = self.tp(rawio)
1558 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1559 self.assertRaises(ValueError, bufio.read)
1560 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1561 self.assertRaises(ValueError, bufio.read)
1562 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1563 self.assertRaises(ValueError, bufio.read)
1564
1565 def test_misbehaved_io_read(self):
1566 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1567 bufio = self.tp(rawio)
1568 # _pyio.BufferedReader seems to implement reading different, so that
1569 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001570 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571
1572 def test_garbage_collection(self):
1573 # C BufferedReader objects are collected.
1574 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001575 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001576 with support.check_warnings(('', ResourceWarning)):
1577 rawio = self.FileIO(support.TESTFN, "w+b")
1578 f = self.tp(rawio)
1579 f.f = f
1580 wr = weakref.ref(f)
1581 del f
1582 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001583 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001584
R David Murray67bfe802013-02-23 21:51:05 -05001585 def test_args_error(self):
1586 # Issue #17275
1587 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1588 self.tp(io.BytesIO(), 1024, 1024, 1024)
1589
David Szotten86663562020-06-16 00:53:57 +01001590 def test_bad_readinto_value(self):
1591 rawio = io.BufferedReader(io.BytesIO(b"12"))
1592 rawio.readinto = lambda buf: -1
1593 bufio = self.tp(rawio)
1594 with self.assertRaises(OSError) as cm:
1595 bufio.readline()
1596 self.assertIsNone(cm.exception.__cause__)
1597
1598 def test_bad_readinto_type(self):
1599 rawio = io.BufferedReader(io.BytesIO(b"12"))
1600 rawio.readinto = lambda buf: b''
1601 bufio = self.tp(rawio)
1602 with self.assertRaises(OSError) as cm:
1603 bufio.readline()
1604 self.assertIsInstance(cm.exception.__cause__, TypeError)
1605
R David Murray67bfe802013-02-23 21:51:05 -05001606
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001607class PyBufferedReaderTest(BufferedReaderTest):
1608 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001609
Guido van Rossuma9e20242007-03-08 00:43:48 +00001610
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001611class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1612 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001613
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001614 def test_constructor(self):
1615 rawio = self.MockRawIO()
1616 bufio = self.tp(rawio)
1617 bufio.__init__(rawio)
1618 bufio.__init__(rawio, buffer_size=1024)
1619 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001620 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 bufio.flush()
1622 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1623 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1624 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1625 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001626 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001627 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001628 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001630 def test_uninitialized(self):
1631 bufio = self.tp.__new__(self.tp)
1632 del bufio
1633 bufio = self.tp.__new__(self.tp)
1634 self.assertRaisesRegex((ValueError, AttributeError),
1635 'uninitialized|has no attribute',
1636 bufio.write, b'')
1637 bufio.__init__(self.MockRawIO())
1638 self.assertEqual(bufio.write(b''), 0)
1639
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001640 def test_detach_flush(self):
1641 raw = self.MockRawIO()
1642 buf = self.tp(raw)
1643 buf.write(b"howdy!")
1644 self.assertFalse(raw._write_stack)
1645 buf.detach()
1646 self.assertEqual(raw._write_stack, [b"howdy!"])
1647
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001648 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001649 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001650 writer = self.MockRawIO()
1651 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001652 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001653 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001654 buffer = bytearray(b"def")
1655 bufio.write(buffer)
1656 buffer[:] = b"***" # Overwrite our copy of the data
1657 bufio.flush()
1658 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 def test_write_overflow(self):
1661 writer = self.MockRawIO()
1662 bufio = self.tp(writer, 8)
1663 contents = b"abcdefghijklmnop"
1664 for n in range(0, len(contents), 3):
1665 bufio.write(contents[n:n+3])
1666 flushed = b"".join(writer._write_stack)
1667 # At least (total - 8) bytes were implicitly flushed, perhaps more
1668 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001669 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001671 def check_writes(self, intermediate_func):
1672 # Lots of writes, test the flushed output is as expected.
1673 contents = bytes(range(256)) * 1000
1674 n = 0
1675 writer = self.MockRawIO()
1676 bufio = self.tp(writer, 13)
1677 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1678 def gen_sizes():
1679 for size in count(1):
1680 for i in range(15):
1681 yield size
1682 sizes = gen_sizes()
1683 while n < len(contents):
1684 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001685 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001686 intermediate_func(bufio)
1687 n += size
1688 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001689 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001690
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 def test_writes(self):
1692 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 def test_writes_and_flushes(self):
1695 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 def test_writes_and_seeks(self):
1698 def _seekabs(bufio):
1699 pos = bufio.tell()
1700 bufio.seek(pos + 1, 0)
1701 bufio.seek(pos - 1, 0)
1702 bufio.seek(pos, 0)
1703 self.check_writes(_seekabs)
1704 def _seekrel(bufio):
1705 pos = bufio.seek(0, 1)
1706 bufio.seek(+1, 1)
1707 bufio.seek(-1, 1)
1708 bufio.seek(pos, 0)
1709 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001711 def test_writes_and_truncates(self):
1712 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001714 def test_write_non_blocking(self):
1715 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001716 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001717
Ezio Melottib3aedd42010-11-20 19:04:17 +00001718 self.assertEqual(bufio.write(b"abcd"), 4)
1719 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001720 # 1 byte will be written, the rest will be buffered
1721 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001722 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001723
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1725 raw.block_on(b"0")
1726 try:
1727 bufio.write(b"opqrwxyz0123456789")
1728 except self.BlockingIOError as e:
1729 written = e.characters_written
1730 else:
1731 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001732 self.assertEqual(written, 16)
1733 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001734 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001735
Ezio Melottib3aedd42010-11-20 19:04:17 +00001736 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 s = raw.pop_written()
1738 # Previously buffered bytes were flushed
1739 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001740
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001741 def test_write_and_rewind(self):
1742 raw = io.BytesIO()
1743 bufio = self.tp(raw, 4)
1744 self.assertEqual(bufio.write(b"abcdef"), 6)
1745 self.assertEqual(bufio.tell(), 6)
1746 bufio.seek(0, 0)
1747 self.assertEqual(bufio.write(b"XY"), 2)
1748 bufio.seek(6, 0)
1749 self.assertEqual(raw.getvalue(), b"XYcdef")
1750 self.assertEqual(bufio.write(b"123456"), 6)
1751 bufio.flush()
1752 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001753
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001754 def test_flush(self):
1755 writer = self.MockRawIO()
1756 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001757 bufio.write(b"abc")
1758 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001759 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001760
Antoine Pitrou131a4892012-10-16 22:57:11 +02001761 def test_writelines(self):
1762 l = [b'ab', b'cd', b'ef']
1763 writer = self.MockRawIO()
1764 bufio = self.tp(writer, 8)
1765 bufio.writelines(l)
1766 bufio.flush()
1767 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1768
1769 def test_writelines_userlist(self):
1770 l = UserList([b'ab', b'cd', b'ef'])
1771 writer = self.MockRawIO()
1772 bufio = self.tp(writer, 8)
1773 bufio.writelines(l)
1774 bufio.flush()
1775 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1776
1777 def test_writelines_error(self):
1778 writer = self.MockRawIO()
1779 bufio = self.tp(writer, 8)
1780 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1781 self.assertRaises(TypeError, bufio.writelines, None)
1782 self.assertRaises(TypeError, bufio.writelines, 'abc')
1783
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 def test_destructor(self):
1785 writer = self.MockRawIO()
1786 bufio = self.tp(writer, 8)
1787 bufio.write(b"abc")
1788 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001789 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001790 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791
1792 def test_truncate(self):
1793 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001794 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001795 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796 bufio = self.tp(raw, 8)
1797 bufio.write(b"abcdef")
1798 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001799 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001800 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001801 self.assertEqual(f.read(), b"abc")
1802
Nitish Chandra059f58c2018-01-28 21:30:09 +05301803 def test_truncate_after_write(self):
1804 # Ensure that truncate preserves the file position after
1805 # writes longer than the buffer size.
1806 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001807 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301808 with self.open(support.TESTFN, "wb") as f:
1809 # Fill with some buffer
1810 f.write(b'\x00' * 10000)
1811 buffer_sizes = [8192, 4096, 200]
1812 for buffer_size in buffer_sizes:
1813 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1814 f.write(b'\x00' * (buffer_size + 1))
1815 # After write write_pos and write_end are set to 0
1816 f.read(1)
1817 # read operation makes sure that pos != raw_pos
1818 f.truncate()
1819 self.assertEqual(f.tell(), buffer_size + 2)
1820
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001821 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001823 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001824 # Write out many bytes from many threads and test they were
1825 # all flushed.
1826 N = 1000
1827 contents = bytes(range(256)) * N
1828 sizes = cycle([1, 19])
1829 n = 0
1830 queue = deque()
1831 while n < len(contents):
1832 size = next(sizes)
1833 queue.append(contents[n:n+size])
1834 n += size
1835 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001836 # We use a real file object because it allows us to
1837 # exercise situations where the GIL is released before
1838 # writing the buffer to the raw streams. This is in addition
1839 # to concurrency issues due to switching threads in the middle
1840 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001841 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001842 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001843 errors = []
1844 def f():
1845 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001846 while True:
1847 try:
1848 s = queue.popleft()
1849 except IndexError:
1850 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001851 bufio.write(s)
1852 except Exception as e:
1853 errors.append(e)
1854 raise
1855 threads = [threading.Thread(target=f) for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08001856 with threading_helper.start_threads(threads):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001857 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001858 self.assertFalse(errors,
1859 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001860 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001861 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001862 s = f.read()
1863 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001864 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001865 finally:
1866 support.unlink(support.TESTFN)
1867
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001868 def test_misbehaved_io(self):
1869 rawio = self.MisbehavedRawIO()
1870 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001871 self.assertRaises(OSError, bufio.seek, 0)
1872 self.assertRaises(OSError, bufio.tell)
1873 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001874
Victor Stinnerb589cef2019-06-11 03:10:59 +02001875 # Silence destructor error
1876 bufio.close = lambda: None
1877
Florent Xicluna109d5732012-07-07 17:03:22 +02001878 def test_max_buffer_size_removal(self):
1879 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001880 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001881
Benjamin Peterson68623612012-12-20 11:53:11 -06001882 def test_write_error_on_close(self):
1883 raw = self.MockRawIO()
1884 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001885 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001886 raw.write = bad_write
1887 b = self.tp(raw)
1888 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001889 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001890 self.assertTrue(b.closed)
1891
benfogle9703f092017-11-10 16:03:40 -05001892 def test_slow_close_from_thread(self):
1893 # Issue #31976
1894 rawio = self.SlowFlushRawIO()
1895 bufio = self.tp(rawio, 8)
1896 t = threading.Thread(target=bufio.close)
1897 t.start()
1898 rawio.in_flush.wait()
1899 self.assertRaises(ValueError, bufio.write, b'spam')
1900 self.assertTrue(bufio.closed)
1901 t.join()
1902
1903
Benjamin Peterson59406a92009-03-26 17:10:29 +00001904
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001905class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001906 tp = io.BufferedWriter
1907
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001908 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1909 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001910 def test_constructor(self):
1911 BufferedWriterTest.test_constructor(self)
1912 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001913 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001914 if sys.maxsize > 0x7FFFFFFF:
1915 rawio = self.MockRawIO()
1916 bufio = self.tp(rawio)
1917 self.assertRaises((OverflowError, MemoryError, ValueError),
1918 bufio.__init__, rawio, sys.maxsize)
1919
1920 def test_initialization(self):
1921 rawio = self.MockRawIO()
1922 bufio = self.tp(rawio)
1923 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1924 self.assertRaises(ValueError, bufio.write, b"def")
1925 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1926 self.assertRaises(ValueError, bufio.write, b"def")
1927 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1928 self.assertRaises(ValueError, bufio.write, b"def")
1929
1930 def test_garbage_collection(self):
1931 # C BufferedWriter objects are collected, and collecting them flushes
1932 # all data to disk.
1933 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001934 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001935 with support.check_warnings(('', ResourceWarning)):
1936 rawio = self.FileIO(support.TESTFN, "w+b")
1937 f = self.tp(rawio)
1938 f.write(b"123xxx")
1939 f.x = f
1940 wr = weakref.ref(f)
1941 del f
1942 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001943 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001944 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001945 self.assertEqual(f.read(), b"123xxx")
1946
R David Murray67bfe802013-02-23 21:51:05 -05001947 def test_args_error(self):
1948 # Issue #17275
1949 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1950 self.tp(io.BytesIO(), 1024, 1024, 1024)
1951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001952
1953class PyBufferedWriterTest(BufferedWriterTest):
1954 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001955
Guido van Rossum01a27522007-03-07 01:00:12 +00001956class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001957
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001958 def test_constructor(self):
1959 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001960 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001961
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001962 def test_uninitialized(self):
1963 pair = self.tp.__new__(self.tp)
1964 del pair
1965 pair = self.tp.__new__(self.tp)
1966 self.assertRaisesRegex((ValueError, AttributeError),
1967 'uninitialized|has no attribute',
1968 pair.read, 0)
1969 self.assertRaisesRegex((ValueError, AttributeError),
1970 'uninitialized|has no attribute',
1971 pair.write, b'')
1972 pair.__init__(self.MockRawIO(), self.MockRawIO())
1973 self.assertEqual(pair.read(0), b'')
1974 self.assertEqual(pair.write(b''), 0)
1975
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001976 def test_detach(self):
1977 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1978 self.assertRaises(self.UnsupportedOperation, pair.detach)
1979
Florent Xicluna109d5732012-07-07 17:03:22 +02001980 def test_constructor_max_buffer_size_removal(self):
1981 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001982 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001983
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001984 def test_constructor_with_not_readable(self):
1985 class NotReadable(MockRawIO):
1986 def readable(self):
1987 return False
1988
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001989 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001990
1991 def test_constructor_with_not_writeable(self):
1992 class NotWriteable(MockRawIO):
1993 def writable(self):
1994 return False
1995
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001996 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001997
1998 def test_read(self):
1999 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2000
2001 self.assertEqual(pair.read(3), b"abc")
2002 self.assertEqual(pair.read(1), b"d")
2003 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002004 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
2005 self.assertEqual(pair.read(None), b"abc")
2006
2007 def test_readlines(self):
2008 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
2009 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2010 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
2011 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002012
2013 def test_read1(self):
2014 # .read1() is delegated to the underlying reader object, so this test
2015 # can be shallow.
2016 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2017
2018 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00002019 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002020
2021 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00002022 for method in ("readinto", "readinto1"):
2023 with self.subTest(method):
2024 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002025
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002026 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002027 self.assertEqual(getattr(pair, method)(data), 5)
2028 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002029
2030 def test_write(self):
2031 w = self.MockRawIO()
2032 pair = self.tp(self.MockRawIO(), w)
2033
2034 pair.write(b"abc")
2035 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002036 buffer = bytearray(b"def")
2037 pair.write(buffer)
2038 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002039 pair.flush()
2040 self.assertEqual(w._write_stack, [b"abc", b"def"])
2041
2042 def test_peek(self):
2043 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2044
2045 self.assertTrue(pair.peek(3).startswith(b"abc"))
2046 self.assertEqual(pair.read(3), b"abc")
2047
2048 def test_readable(self):
2049 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2050 self.assertTrue(pair.readable())
2051
2052 def test_writeable(self):
2053 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2054 self.assertTrue(pair.writable())
2055
2056 def test_seekable(self):
2057 # BufferedRWPairs are never seekable, even if their readers and writers
2058 # are.
2059 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2060 self.assertFalse(pair.seekable())
2061
2062 # .flush() is delegated to the underlying writer object and has been
2063 # tested in the test_write method.
2064
2065 def test_close_and_closed(self):
2066 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2067 self.assertFalse(pair.closed)
2068 pair.close()
2069 self.assertTrue(pair.closed)
2070
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002071 def test_reader_close_error_on_close(self):
2072 def reader_close():
2073 reader_non_existing
2074 reader = self.MockRawIO()
2075 reader.close = reader_close
2076 writer = self.MockRawIO()
2077 pair = self.tp(reader, writer)
2078 with self.assertRaises(NameError) as err:
2079 pair.close()
2080 self.assertIn('reader_non_existing', str(err.exception))
2081 self.assertTrue(pair.closed)
2082 self.assertFalse(reader.closed)
2083 self.assertTrue(writer.closed)
2084
Victor Stinner472f7942019-04-12 21:58:24 +02002085 # Silence destructor error
2086 reader.close = lambda: None
2087
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002088 def test_writer_close_error_on_close(self):
2089 def writer_close():
2090 writer_non_existing
2091 reader = self.MockRawIO()
2092 writer = self.MockRawIO()
2093 writer.close = writer_close
2094 pair = self.tp(reader, writer)
2095 with self.assertRaises(NameError) as err:
2096 pair.close()
2097 self.assertIn('writer_non_existing', str(err.exception))
2098 self.assertFalse(pair.closed)
2099 self.assertTrue(reader.closed)
2100 self.assertFalse(writer.closed)
2101
Victor Stinner472f7942019-04-12 21:58:24 +02002102 # Silence destructor error
2103 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002104 writer = None
2105
Victor Stinner212646c2019-06-14 18:03:22 +02002106 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002107 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002108 # Ignore BufferedRWPair unraisable exception
2109 with support.catch_unraisable_exception():
2110 pair = None
2111 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002112 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002113
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002114 def test_reader_writer_close_error_on_close(self):
2115 def reader_close():
2116 reader_non_existing
2117 def writer_close():
2118 writer_non_existing
2119 reader = self.MockRawIO()
2120 reader.close = reader_close
2121 writer = self.MockRawIO()
2122 writer.close = writer_close
2123 pair = self.tp(reader, writer)
2124 with self.assertRaises(NameError) as err:
2125 pair.close()
2126 self.assertIn('reader_non_existing', str(err.exception))
2127 self.assertIsInstance(err.exception.__context__, NameError)
2128 self.assertIn('writer_non_existing', str(err.exception.__context__))
2129 self.assertFalse(pair.closed)
2130 self.assertFalse(reader.closed)
2131 self.assertFalse(writer.closed)
2132
Victor Stinner472f7942019-04-12 21:58:24 +02002133 # Silence destructor error
2134 reader.close = lambda: None
2135 writer.close = lambda: None
2136
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002137 def test_isatty(self):
2138 class SelectableIsAtty(MockRawIO):
2139 def __init__(self, isatty):
2140 MockRawIO.__init__(self)
2141 self._isatty = isatty
2142
2143 def isatty(self):
2144 return self._isatty
2145
2146 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2147 self.assertFalse(pair.isatty())
2148
2149 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2150 self.assertTrue(pair.isatty())
2151
2152 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2153 self.assertTrue(pair.isatty())
2154
2155 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2156 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002157
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002158 def test_weakref_clearing(self):
2159 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2160 ref = weakref.ref(brw)
2161 brw = None
2162 ref = None # Shouldn't segfault.
2163
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002164class CBufferedRWPairTest(BufferedRWPairTest):
2165 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002166
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167class PyBufferedRWPairTest(BufferedRWPairTest):
2168 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170
2171class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2172 read_mode = "rb+"
2173 write_mode = "wb+"
2174
2175 def test_constructor(self):
2176 BufferedReaderTest.test_constructor(self)
2177 BufferedWriterTest.test_constructor(self)
2178
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002179 def test_uninitialized(self):
2180 BufferedReaderTest.test_uninitialized(self)
2181 BufferedWriterTest.test_uninitialized(self)
2182
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002183 def test_read_and_write(self):
2184 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002185 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002186
2187 self.assertEqual(b"as", rw.read(2))
2188 rw.write(b"ddd")
2189 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002190 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002191 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002193
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 def test_seek_and_tell(self):
2195 raw = self.BytesIO(b"asdfghjkl")
2196 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002197
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(b"as", rw.read(2))
2199 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002200 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002201 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002202
Antoine Pitroue05565e2011-08-20 14:39:23 +02002203 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002204 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002205 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002206 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002207 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002208 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002209 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002210 self.assertEqual(7, rw.tell())
2211 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002212 rw.flush()
2213 self.assertEqual(b"asdf123fl", raw.getvalue())
2214
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002215 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002216
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002217 def check_flush_and_read(self, read_func):
2218 raw = self.BytesIO(b"abcdefghi")
2219 bufio = self.tp(raw)
2220
Ezio Melottib3aedd42010-11-20 19:04:17 +00002221 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002222 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002223 self.assertEqual(b"ef", read_func(bufio, 2))
2224 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002225 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002226 self.assertEqual(6, bufio.tell())
2227 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002228 raw.seek(0, 0)
2229 raw.write(b"XYZ")
2230 # flush() resets the read buffer
2231 bufio.flush()
2232 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002233 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002234
2235 def test_flush_and_read(self):
2236 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2237
2238 def test_flush_and_readinto(self):
2239 def _readinto(bufio, n=-1):
2240 b = bytearray(n if n >= 0 else 9999)
2241 n = bufio.readinto(b)
2242 return bytes(b[:n])
2243 self.check_flush_and_read(_readinto)
2244
2245 def test_flush_and_peek(self):
2246 def _peek(bufio, n=-1):
2247 # This relies on the fact that the buffer can contain the whole
2248 # raw stream, otherwise peek() can return less.
2249 b = bufio.peek(n)
2250 if n != -1:
2251 b = b[:n]
2252 bufio.seek(len(b), 1)
2253 return b
2254 self.check_flush_and_read(_peek)
2255
2256 def test_flush_and_write(self):
2257 raw = self.BytesIO(b"abcdefghi")
2258 bufio = self.tp(raw)
2259
2260 bufio.write(b"123")
2261 bufio.flush()
2262 bufio.write(b"45")
2263 bufio.flush()
2264 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002265 self.assertEqual(b"12345fghi", raw.getvalue())
2266 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002267
2268 def test_threads(self):
2269 BufferedReaderTest.test_threads(self)
2270 BufferedWriterTest.test_threads(self)
2271
2272 def test_writes_and_peek(self):
2273 def _peek(bufio):
2274 bufio.peek(1)
2275 self.check_writes(_peek)
2276 def _peek(bufio):
2277 pos = bufio.tell()
2278 bufio.seek(-1, 1)
2279 bufio.peek(1)
2280 bufio.seek(pos, 0)
2281 self.check_writes(_peek)
2282
2283 def test_writes_and_reads(self):
2284 def _read(bufio):
2285 bufio.seek(-1, 1)
2286 bufio.read(1)
2287 self.check_writes(_read)
2288
2289 def test_writes_and_read1s(self):
2290 def _read1(bufio):
2291 bufio.seek(-1, 1)
2292 bufio.read1(1)
2293 self.check_writes(_read1)
2294
2295 def test_writes_and_readintos(self):
2296 def _read(bufio):
2297 bufio.seek(-1, 1)
2298 bufio.readinto(bytearray(1))
2299 self.check_writes(_read)
2300
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002301 def test_write_after_readahead(self):
2302 # Issue #6629: writing after the buffer was filled by readahead should
2303 # first rewind the raw stream.
2304 for overwrite_size in [1, 5]:
2305 raw = self.BytesIO(b"A" * 10)
2306 bufio = self.tp(raw, 4)
2307 # Trigger readahead
2308 self.assertEqual(bufio.read(1), b"A")
2309 self.assertEqual(bufio.tell(), 1)
2310 # Overwriting should rewind the raw stream if it needs so
2311 bufio.write(b"B" * overwrite_size)
2312 self.assertEqual(bufio.tell(), overwrite_size + 1)
2313 # If the write size was smaller than the buffer size, flush() and
2314 # check that rewind happens.
2315 bufio.flush()
2316 self.assertEqual(bufio.tell(), overwrite_size + 1)
2317 s = raw.getvalue()
2318 self.assertEqual(s,
2319 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2320
Antoine Pitrou7c404892011-05-13 00:13:33 +02002321 def test_write_rewind_write(self):
2322 # Various combinations of reading / writing / seeking backwards / writing again
2323 def mutate(bufio, pos1, pos2):
2324 assert pos2 >= pos1
2325 # Fill the buffer
2326 bufio.seek(pos1)
2327 bufio.read(pos2 - pos1)
2328 bufio.write(b'\x02')
2329 # This writes earlier than the previous write, but still inside
2330 # the buffer.
2331 bufio.seek(pos1)
2332 bufio.write(b'\x01')
2333
2334 b = b"\x80\x81\x82\x83\x84"
2335 for i in range(0, len(b)):
2336 for j in range(i, len(b)):
2337 raw = self.BytesIO(b)
2338 bufio = self.tp(raw, 100)
2339 mutate(bufio, i, j)
2340 bufio.flush()
2341 expected = bytearray(b)
2342 expected[j] = 2
2343 expected[i] = 1
2344 self.assertEqual(raw.getvalue(), expected,
2345 "failed result for i=%d, j=%d" % (i, j))
2346
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002347 def test_truncate_after_read_or_write(self):
2348 raw = self.BytesIO(b"A" * 10)
2349 bufio = self.tp(raw, 100)
2350 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2351 self.assertEqual(bufio.truncate(), 2)
2352 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2353 self.assertEqual(bufio.truncate(), 4)
2354
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002355 def test_misbehaved_io(self):
2356 BufferedReaderTest.test_misbehaved_io(self)
2357 BufferedWriterTest.test_misbehaved_io(self)
2358
Antoine Pitroue05565e2011-08-20 14:39:23 +02002359 def test_interleaved_read_write(self):
2360 # Test for issue #12213
2361 with self.BytesIO(b'abcdefgh') as raw:
2362 with self.tp(raw, 100) as f:
2363 f.write(b"1")
2364 self.assertEqual(f.read(1), b'b')
2365 f.write(b'2')
2366 self.assertEqual(f.read1(1), b'd')
2367 f.write(b'3')
2368 buf = bytearray(1)
2369 f.readinto(buf)
2370 self.assertEqual(buf, b'f')
2371 f.write(b'4')
2372 self.assertEqual(f.peek(1), b'h')
2373 f.flush()
2374 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2375
2376 with self.BytesIO(b'abc') as raw:
2377 with self.tp(raw, 100) as f:
2378 self.assertEqual(f.read(1), b'a')
2379 f.write(b"2")
2380 self.assertEqual(f.read(1), b'c')
2381 f.flush()
2382 self.assertEqual(raw.getvalue(), b'a2c')
2383
2384 def test_interleaved_readline_write(self):
2385 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2386 with self.tp(raw) as f:
2387 f.write(b'1')
2388 self.assertEqual(f.readline(), b'b\n')
2389 f.write(b'2')
2390 self.assertEqual(f.readline(), b'def\n')
2391 f.write(b'3')
2392 self.assertEqual(f.readline(), b'\n')
2393 f.flush()
2394 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2395
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002396 # You can't construct a BufferedRandom over a non-seekable stream.
2397 test_unseekable = None
2398
Berker Peksagfd5116c2020-02-21 20:57:26 +03002399 # writable() returns True, so there's no point to test it over
2400 # a writable stream.
2401 test_truncate_on_read_only = None
2402
R David Murray67bfe802013-02-23 21:51:05 -05002403
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002404class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002405 tp = io.BufferedRandom
2406
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002407 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2408 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002409 def test_constructor(self):
2410 BufferedRandomTest.test_constructor(self)
2411 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002412 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002413 if sys.maxsize > 0x7FFFFFFF:
2414 rawio = self.MockRawIO()
2415 bufio = self.tp(rawio)
2416 self.assertRaises((OverflowError, MemoryError, ValueError),
2417 bufio.__init__, rawio, sys.maxsize)
2418
2419 def test_garbage_collection(self):
2420 CBufferedReaderTest.test_garbage_collection(self)
2421 CBufferedWriterTest.test_garbage_collection(self)
2422
R David Murray67bfe802013-02-23 21:51:05 -05002423 def test_args_error(self):
2424 # Issue #17275
2425 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2426 self.tp(io.BytesIO(), 1024, 1024, 1024)
2427
2428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002429class PyBufferedRandomTest(BufferedRandomTest):
2430 tp = pyio.BufferedRandom
2431
2432
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002433# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2434# properties:
2435# - A single output character can correspond to many bytes of input.
2436# - The number of input bytes to complete the character can be
2437# undetermined until the last input byte is received.
2438# - The number of input bytes can vary depending on previous input.
2439# - A single input byte can correspond to many characters of output.
2440# - The number of output characters can be undetermined until the
2441# last input byte is received.
2442# - The number of output characters can vary depending on previous input.
2443
2444class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2445 """
2446 For testing seek/tell behavior with a stateful, buffering decoder.
2447
2448 Input is a sequence of words. Words may be fixed-length (length set
2449 by input) or variable-length (period-terminated). In variable-length
2450 mode, extra periods are ignored. Possible words are:
2451 - 'i' followed by a number sets the input length, I (maximum 99).
2452 When I is set to 0, words are space-terminated.
2453 - 'o' followed by a number sets the output length, O (maximum 99).
2454 - Any other word is converted into a word followed by a period on
2455 the output. The output word consists of the input word truncated
2456 or padded out with hyphens to make its length equal to O. If O
2457 is 0, the word is output verbatim without truncating or padding.
2458 I and O are initially set to 1. When I changes, any buffered input is
2459 re-scanned according to the new I. EOF also terminates the last word.
2460 """
2461
2462 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002463 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002464 self.reset()
2465
2466 def __repr__(self):
2467 return '<SID %x>' % id(self)
2468
2469 def reset(self):
2470 self.i = 1
2471 self.o = 1
2472 self.buffer = bytearray()
2473
2474 def getstate(self):
2475 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2476 return bytes(self.buffer), i*100 + o
2477
2478 def setstate(self, state):
2479 buffer, io = state
2480 self.buffer = bytearray(buffer)
2481 i, o = divmod(io, 100)
2482 self.i, self.o = i ^ 1, o ^ 1
2483
2484 def decode(self, input, final=False):
2485 output = ''
2486 for b in input:
2487 if self.i == 0: # variable-length, terminated with period
2488 if b == ord('.'):
2489 if self.buffer:
2490 output += self.process_word()
2491 else:
2492 self.buffer.append(b)
2493 else: # fixed-length, terminate after self.i bytes
2494 self.buffer.append(b)
2495 if len(self.buffer) == self.i:
2496 output += self.process_word()
2497 if final and self.buffer: # EOF terminates the last word
2498 output += self.process_word()
2499 return output
2500
2501 def process_word(self):
2502 output = ''
2503 if self.buffer[0] == ord('i'):
2504 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2505 elif self.buffer[0] == ord('o'):
2506 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2507 else:
2508 output = self.buffer.decode('ascii')
2509 if len(output) < self.o:
2510 output += '-'*self.o # pad out with hyphens
2511 if self.o:
2512 output = output[:self.o] # truncate to output length
2513 output += '.'
2514 self.buffer = bytearray()
2515 return output
2516
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002517 codecEnabled = False
2518
2519 @classmethod
2520 def lookupTestDecoder(cls, name):
2521 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002522 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002523 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002524 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002525 incrementalencoder=None,
2526 streamreader=None, streamwriter=None,
2527 incrementaldecoder=cls)
2528
2529# Register the previous decoder for testing.
2530# Disabled by default, tests will enable it.
2531codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2532
2533
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002534class StatefulIncrementalDecoderTest(unittest.TestCase):
2535 """
2536 Make sure the StatefulIncrementalDecoder actually works.
2537 """
2538
2539 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002540 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002541 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002542 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002543 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002544 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002545 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002546 # I=0, O=6 (variable-length input, fixed-length output)
2547 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2548 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002549 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002550 # I=6, O=3 (fixed-length input > fixed-length output)
2551 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2552 # I=0, then 3; O=29, then 15 (with longer output)
2553 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2554 'a----------------------------.' +
2555 'b----------------------------.' +
2556 'cde--------------------------.' +
2557 'abcdefghijabcde.' +
2558 'a.b------------.' +
2559 '.c.------------.' +
2560 'd.e------------.' +
2561 'k--------------.' +
2562 'l--------------.' +
2563 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002564 ]
2565
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002566 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002567 # Try a few one-shot test cases.
2568 for input, eof, output in self.test_cases:
2569 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002570 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002571
2572 # Also test an unfinished decode, followed by forcing EOF.
2573 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002574 self.assertEqual(d.decode(b'oiabcd'), '')
2575 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002576
2577class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002578
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002579 def setUp(self):
2580 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2581 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002582 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002583
Guido van Rossumd0712812007-04-11 16:32:43 +00002584 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002585 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002586
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002587 def test_constructor(self):
2588 r = self.BytesIO(b"\xc3\xa9\n\n")
2589 b = self.BufferedReader(r, 1000)
2590 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002591 t.__init__(b, encoding="latin-1", newline="\r\n")
2592 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002593 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002594 t.__init__(b, encoding="utf-8", line_buffering=True)
2595 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002596 self.assertEqual(t.line_buffering, True)
2597 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002598 self.assertRaises(TypeError, t.__init__, b, newline=42)
2599 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2600
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002601 def test_uninitialized(self):
2602 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2603 del t
2604 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2605 self.assertRaises(Exception, repr, t)
2606 self.assertRaisesRegex((ValueError, AttributeError),
2607 'uninitialized|has no attribute',
2608 t.read, 0)
2609 t.__init__(self.MockRawIO())
2610 self.assertEqual(t.read(0), '')
2611
Nick Coghlana9b15242014-02-04 22:11:18 +10002612 def test_non_text_encoding_codecs_are_rejected(self):
2613 # Ensure the constructor complains if passed a codec that isn't
2614 # marked as a text encoding
2615 # http://bugs.python.org/issue20404
2616 r = self.BytesIO()
2617 b = self.BufferedWriter(r)
2618 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2619 self.TextIOWrapper(b, encoding="hex")
2620
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002621 def test_detach(self):
2622 r = self.BytesIO()
2623 b = self.BufferedWriter(r)
2624 t = self.TextIOWrapper(b)
2625 self.assertIs(t.detach(), b)
2626
2627 t = self.TextIOWrapper(b, encoding="ascii")
2628 t.write("howdy")
2629 self.assertFalse(r.getvalue())
2630 t.detach()
2631 self.assertEqual(r.getvalue(), b"howdy")
2632 self.assertRaises(ValueError, t.detach)
2633
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002634 # Operations independent of the detached stream should still work
2635 repr(t)
2636 self.assertEqual(t.encoding, "ascii")
2637 self.assertEqual(t.errors, "strict")
2638 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002639 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002640
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002641 def test_repr(self):
2642 raw = self.BytesIO("hello".encode("utf-8"))
2643 b = self.BufferedReader(raw)
2644 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002645 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002646 self.assertRegex(repr(t),
2647 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002648 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002649 self.assertRegex(repr(t),
2650 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002651 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002652 self.assertRegex(repr(t),
2653 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002654 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002655 self.assertRegex(repr(t),
2656 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002657
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002658 t.buffer.detach()
2659 repr(t) # Should not raise an exception
2660
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002661 def test_recursive_repr(self):
2662 # Issue #25455
2663 raw = self.BytesIO()
2664 t = self.TextIOWrapper(raw)
2665 with support.swap_attr(raw, 'name', t):
2666 try:
2667 repr(t) # Should not crash
2668 except RuntimeError:
2669 pass
2670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002671 def test_line_buffering(self):
2672 r = self.BytesIO()
2673 b = self.BufferedWriter(r, 1000)
2674 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002675 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002676 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002677 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002678 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002679 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002680 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002681
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002682 def test_reconfigure_line_buffering(self):
2683 r = self.BytesIO()
2684 b = self.BufferedWriter(r, 1000)
2685 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2686 t.write("AB\nC")
2687 self.assertEqual(r.getvalue(), b"")
2688
2689 t.reconfigure(line_buffering=True) # implicit flush
2690 self.assertEqual(r.getvalue(), b"AB\nC")
2691 t.write("DEF\nG")
2692 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2693 t.write("H")
2694 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2695 t.reconfigure(line_buffering=False) # implicit flush
2696 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2697 t.write("IJ")
2698 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2699
2700 # Keeping default value
2701 t.reconfigure()
2702 t.reconfigure(line_buffering=None)
2703 self.assertEqual(t.line_buffering, False)
2704 t.reconfigure(line_buffering=True)
2705 t.reconfigure()
2706 t.reconfigure(line_buffering=None)
2707 self.assertEqual(t.line_buffering, True)
2708
Victor Stinner91106cd2017-12-13 12:29:09 +01002709 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002710 def test_default_encoding(self):
2711 old_environ = dict(os.environ)
2712 try:
2713 # try to get a user preferred encoding different than the current
2714 # locale encoding to check that TextIOWrapper() uses the current
2715 # locale encoding and not the user preferred encoding
2716 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2717 if key in os.environ:
2718 del os.environ[key]
2719
2720 current_locale_encoding = locale.getpreferredencoding(False)
2721 b = self.BytesIO()
2722 t = self.TextIOWrapper(b)
2723 self.assertEqual(t.encoding, current_locale_encoding)
2724 finally:
2725 os.environ.clear()
2726 os.environ.update(old_environ)
2727
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002728 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002729 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002730 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002731 # Issue 15989
2732 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002733 b = self.BytesIO()
2734 b.fileno = lambda: _testcapi.INT_MAX + 1
2735 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2736 b.fileno = lambda: _testcapi.UINT_MAX + 1
2737 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2738
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002739 def test_encoding(self):
2740 # Check the encoding attribute is always set, and valid
2741 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002742 t = self.TextIOWrapper(b, encoding="utf-8")
2743 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002744 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002745 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 codecs.lookup(t.encoding)
2747
2748 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002749 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002750 b = self.BytesIO(b"abc\n\xff\n")
2751 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002752 self.assertRaises(UnicodeError, t.read)
2753 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002754 b = self.BytesIO(b"abc\n\xff\n")
2755 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002756 self.assertRaises(UnicodeError, t.read)
2757 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002758 b = self.BytesIO(b"abc\n\xff\n")
2759 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002760 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002761 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002762 b = self.BytesIO(b"abc\n\xff\n")
2763 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002764 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002765
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002766 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002767 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002768 b = self.BytesIO()
2769 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002770 self.assertRaises(UnicodeError, t.write, "\xff")
2771 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002772 b = self.BytesIO()
2773 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002774 self.assertRaises(UnicodeError, t.write, "\xff")
2775 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002776 b = self.BytesIO()
2777 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002778 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002779 t.write("abc\xffdef\n")
2780 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002781 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002782 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002783 b = self.BytesIO()
2784 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002785 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002786 t.write("abc\xffdef\n")
2787 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002788 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002789
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002790 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002791 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2792
2793 tests = [
2794 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002795 [ '', input_lines ],
2796 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2797 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2798 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002799 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002800 encodings = (
2801 'utf-8', 'latin-1',
2802 'utf-16', 'utf-16-le', 'utf-16-be',
2803 'utf-32', 'utf-32-le', 'utf-32-be',
2804 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002805
Guido van Rossum8358db22007-08-18 21:39:55 +00002806 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002807 # character in TextIOWrapper._pending_line.
2808 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002809 # XXX: str.encode() should return bytes
2810 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002811 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002812 for bufsize in range(1, 10):
2813 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002814 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2815 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002816 encoding=encoding)
2817 if do_reads:
2818 got_lines = []
2819 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002820 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002821 if c2 == '':
2822 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002823 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002824 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002825 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002826 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002827
2828 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002829 self.assertEqual(got_line, exp_line)
2830 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002831
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002832 def test_newlines_input(self):
2833 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002834 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2835 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002836 (None, normalized.decode("ascii").splitlines(keepends=True)),
2837 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002838 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2839 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2840 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002841 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002842 buf = self.BytesIO(testdata)
2843 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002844 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002845 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002846 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002848 def test_newlines_output(self):
2849 testdict = {
2850 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2851 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2852 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2853 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2854 }
2855 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2856 for newline, expected in tests:
2857 buf = self.BytesIO()
2858 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2859 txt.write("AAA\nB")
2860 txt.write("BB\nCCC\n")
2861 txt.write("X\rY\r\nZ")
2862 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002863 self.assertEqual(buf.closed, False)
2864 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002865
2866 def test_destructor(self):
2867 l = []
2868 base = self.BytesIO
2869 class MyBytesIO(base):
2870 def close(self):
2871 l.append(self.getvalue())
2872 base.close(self)
2873 b = MyBytesIO()
2874 t = self.TextIOWrapper(b, encoding="ascii")
2875 t.write("abc")
2876 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002877 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002878 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002879
2880 def test_override_destructor(self):
2881 record = []
2882 class MyTextIO(self.TextIOWrapper):
2883 def __del__(self):
2884 record.append(1)
2885 try:
2886 f = super().__del__
2887 except AttributeError:
2888 pass
2889 else:
2890 f()
2891 def close(self):
2892 record.append(2)
2893 super().close()
2894 def flush(self):
2895 record.append(3)
2896 super().flush()
2897 b = self.BytesIO()
2898 t = MyTextIO(b, encoding="ascii")
2899 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002900 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002901 self.assertEqual(record, [1, 2, 3])
2902
2903 def test_error_through_destructor(self):
2904 # Test that the exception state is not modified by a destructor,
2905 # even if close() fails.
2906 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002907 with support.catch_unraisable_exception() as cm:
2908 with self.assertRaises(AttributeError):
2909 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002910
2911 if not IOBASE_EMITS_UNRAISABLE:
2912 self.assertIsNone(cm.unraisable)
2913 elif cm.unraisable is not None:
2914 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002915
Guido van Rossum9b76da62007-04-11 01:09:03 +00002916 # Systematic tests of the text I/O API
2917
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002918 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002919 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002920 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002921 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002922 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002923 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002924 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002925 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002926 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002927 self.assertEqual(f.tell(), 0)
2928 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002929 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002930 self.assertEqual(f.seek(0), 0)
2931 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002932 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002933 self.assertEqual(f.read(2), "ab")
2934 self.assertEqual(f.read(1), "c")
2935 self.assertEqual(f.read(1), "")
2936 self.assertEqual(f.read(), "")
2937 self.assertEqual(f.tell(), cookie)
2938 self.assertEqual(f.seek(0), 0)
2939 self.assertEqual(f.seek(0, 2), cookie)
2940 self.assertEqual(f.write("def"), 3)
2941 self.assertEqual(f.seek(cookie), cookie)
2942 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002943 if enc.startswith("utf"):
2944 self.multi_line_test(f, enc)
2945 f.close()
2946
2947 def multi_line_test(self, f, enc):
2948 f.seek(0)
2949 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002950 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002951 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002952 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002953 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002954 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002955 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002956 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002957 wlines.append((f.tell(), line))
2958 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002959 f.seek(0)
2960 rlines = []
2961 while True:
2962 pos = f.tell()
2963 line = f.readline()
2964 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002965 break
2966 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002967 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002968
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002969 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002970 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002971 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002972 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002973 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002974 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002975 p2 = f.tell()
2976 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002977 self.assertEqual(f.tell(), p0)
2978 self.assertEqual(f.readline(), "\xff\n")
2979 self.assertEqual(f.tell(), p1)
2980 self.assertEqual(f.readline(), "\xff\n")
2981 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002982 f.seek(0)
2983 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002984 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002985 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002986 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002987 f.close()
2988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002989 def test_seeking(self):
2990 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002991 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002992 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002993 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002994 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002995 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002996 suffix = bytes(u_suffix.encode("utf-8"))
2997 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002998 with self.open(support.TESTFN, "wb") as f:
2999 f.write(line*2)
3000 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
3001 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003002 self.assertEqual(s, str(prefix, "ascii"))
3003 self.assertEqual(f.tell(), prefix_size)
3004 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00003005
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003006 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00003007 # Regression test for a specific bug
3008 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00003009 with self.open(support.TESTFN, "wb") as f:
3010 f.write(data)
3011 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
3012 f._CHUNK_SIZE # Just test that it exists
3013 f._CHUNK_SIZE = 2
3014 f.readline()
3015 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00003016
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003017 def test_seek_and_tell(self):
3018 #Test seek/tell using the StatefulIncrementalDecoder.
3019 # Make test faster by doing smaller seeks
3020 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003021
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003022 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003023 """Tell/seek to various points within a data stream and ensure
3024 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003025 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003026 f.write(data)
3027 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003028 f = self.open(support.TESTFN, encoding='test_decoder')
3029 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003030 decoded = f.read()
3031 f.close()
3032
Neal Norwitze2b07052008-03-18 19:52:05 +00003033 for i in range(min_pos, len(decoded) + 1): # seek positions
3034 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003035 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003036 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003037 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003038 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003039 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003040 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003041 f.close()
3042
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003043 # Enable the test decoder.
3044 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003045
3046 # Run the tests.
3047 try:
3048 # Try each test case.
3049 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003050 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003051
3052 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003053 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3054 offset = CHUNK_SIZE - len(input)//2
3055 prefix = b'.'*offset
3056 # Don't bother seeking into the prefix (takes too long).
3057 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003058 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003059
3060 # Ensure our test decoder won't interfere with subsequent tests.
3061 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003062 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003063
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003064 def test_multibyte_seek_and_tell(self):
3065 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3066 f.write("AB\n\u3046\u3048\n")
3067 f.close()
3068
3069 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3070 self.assertEqual(f.readline(), "AB\n")
3071 p0 = f.tell()
3072 self.assertEqual(f.readline(), "\u3046\u3048\n")
3073 p1 = f.tell()
3074 f.seek(p0)
3075 self.assertEqual(f.readline(), "\u3046\u3048\n")
3076 self.assertEqual(f.tell(), p1)
3077 f.close()
3078
3079 def test_seek_with_encoder_state(self):
3080 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3081 f.write("\u00e6\u0300")
3082 p0 = f.tell()
3083 f.write("\u00e6")
3084 f.seek(p0)
3085 f.write("\u0300")
3086 f.close()
3087
3088 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3089 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3090 f.close()
3091
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003092 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003093 data = "1234567890"
3094 tests = ("utf-16",
3095 "utf-16-le",
3096 "utf-16-be",
3097 "utf-32",
3098 "utf-32-le",
3099 "utf-32-be")
3100 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003101 buf = self.BytesIO()
3102 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003103 # Check if the BOM is written only once (see issue1753).
3104 f.write(data)
3105 f.write(data)
3106 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003107 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003108 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003109 self.assertEqual(f.read(), data * 2)
3110 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003111
Benjamin Petersona1b49012009-03-31 23:11:32 +00003112 def test_unreadable(self):
3113 class UnReadable(self.BytesIO):
3114 def readable(self):
3115 return False
3116 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003117 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003118
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003119 def test_read_one_by_one(self):
3120 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003121 reads = ""
3122 while True:
3123 c = txt.read(1)
3124 if not c:
3125 break
3126 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003127 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003128
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003129 def test_readlines(self):
3130 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3131 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3132 txt.seek(0)
3133 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3134 txt.seek(0)
3135 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3136
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003137 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003138 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003139 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003140 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003141 reads = ""
3142 while True:
3143 c = txt.read(128)
3144 if not c:
3145 break
3146 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003147 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003148
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003149 def test_writelines(self):
3150 l = ['ab', 'cd', 'ef']
3151 buf = self.BytesIO()
3152 txt = self.TextIOWrapper(buf)
3153 txt.writelines(l)
3154 txt.flush()
3155 self.assertEqual(buf.getvalue(), b'abcdef')
3156
3157 def test_writelines_userlist(self):
3158 l = UserList(['ab', 'cd', 'ef'])
3159 buf = self.BytesIO()
3160 txt = self.TextIOWrapper(buf)
3161 txt.writelines(l)
3162 txt.flush()
3163 self.assertEqual(buf.getvalue(), b'abcdef')
3164
3165 def test_writelines_error(self):
3166 txt = self.TextIOWrapper(self.BytesIO())
3167 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3168 self.assertRaises(TypeError, txt.writelines, None)
3169 self.assertRaises(TypeError, txt.writelines, b'abc')
3170
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003171 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003172 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003173
3174 # read one char at a time
3175 reads = ""
3176 while True:
3177 c = txt.read(1)
3178 if not c:
3179 break
3180 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003181 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003182
3183 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003184 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003185 txt._CHUNK_SIZE = 4
3186
3187 reads = ""
3188 while True:
3189 c = txt.read(4)
3190 if not c:
3191 break
3192 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003193 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003194
3195 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003196 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003197 txt._CHUNK_SIZE = 4
3198
3199 reads = txt.read(4)
3200 reads += txt.read(4)
3201 reads += txt.readline()
3202 reads += txt.readline()
3203 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003204 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003205
3206 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003207 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003208 txt._CHUNK_SIZE = 4
3209
3210 reads = txt.read(4)
3211 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003212 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003213
3214 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003215 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003216 txt._CHUNK_SIZE = 4
3217
3218 reads = txt.read(4)
3219 pos = txt.tell()
3220 txt.seek(0)
3221 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003222 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003223
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003224 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003225 buffer = self.BytesIO(self.testdata)
3226 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003227
3228 self.assertEqual(buffer.seekable(), txt.seekable())
3229
Antoine Pitroue4501852009-05-14 18:55:55 +00003230 def test_append_bom(self):
3231 # The BOM is not written again when appending to a non-empty file
3232 filename = support.TESTFN
3233 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3234 with self.open(filename, 'w', encoding=charset) as f:
3235 f.write('aaa')
3236 pos = f.tell()
3237 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003238 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003239
3240 with self.open(filename, 'a', encoding=charset) as f:
3241 f.write('xxx')
3242 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003243 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003244
3245 def test_seek_bom(self):
3246 # Same test, but when seeking manually
3247 filename = support.TESTFN
3248 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3249 with self.open(filename, 'w', encoding=charset) as f:
3250 f.write('aaa')
3251 pos = f.tell()
3252 with self.open(filename, 'r+', encoding=charset) as f:
3253 f.seek(pos)
3254 f.write('zzz')
3255 f.seek(0)
3256 f.write('bbb')
3257 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003258 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003259
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003260 def test_seek_append_bom(self):
3261 # Same test, but first seek to the start and then to the end
3262 filename = support.TESTFN
3263 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3264 with self.open(filename, 'w', encoding=charset) as f:
3265 f.write('aaa')
3266 with self.open(filename, 'a', encoding=charset) as f:
3267 f.seek(0)
3268 f.seek(0, self.SEEK_END)
3269 f.write('xxx')
3270 with self.open(filename, 'rb') as f:
3271 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3272
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003273 def test_errors_property(self):
3274 with self.open(support.TESTFN, "w") as f:
3275 self.assertEqual(f.errors, "strict")
3276 with self.open(support.TESTFN, "w", errors="replace") as f:
3277 self.assertEqual(f.errors, "replace")
3278
Brett Cannon31f59292011-02-21 19:29:56 +00003279 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003280 def test_threads_write(self):
3281 # Issue6750: concurrent writes could duplicate data
3282 event = threading.Event()
3283 with self.open(support.TESTFN, "w", buffering=1) as f:
3284 def run(n):
3285 text = "Thread%03d\n" % n
3286 event.wait()
3287 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003288 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003289 for x in range(20)]
Hai Shie80697d2020-05-28 06:10:27 +08003290 with threading_helper.start_threads(threads, event.set):
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003291 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003292 with self.open(support.TESTFN) as f:
3293 content = f.read()
3294 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003295 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003296
Antoine Pitrou6be88762010-05-03 16:48:20 +00003297 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003298 # Test that text file is closed despite failed flush
3299 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003300 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003301 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003302 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003303 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003304 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003305 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003306 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003307 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003308 self.assertTrue(txt.buffer.closed)
3309 self.assertTrue(closed) # flush() called
3310 self.assertFalse(closed[0]) # flush() called before file closed
3311 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003312 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003313
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003314 def test_close_error_on_close(self):
3315 buffer = self.BytesIO(self.testdata)
3316 def bad_flush():
3317 raise OSError('flush')
3318 def bad_close():
3319 raise OSError('close')
3320 buffer.close = bad_close
3321 txt = self.TextIOWrapper(buffer, encoding="ascii")
3322 txt.flush = bad_flush
3323 with self.assertRaises(OSError) as err: # exception not swallowed
3324 txt.close()
3325 self.assertEqual(err.exception.args, ('close',))
3326 self.assertIsInstance(err.exception.__context__, OSError)
3327 self.assertEqual(err.exception.__context__.args, ('flush',))
3328 self.assertFalse(txt.closed)
3329
Victor Stinner472f7942019-04-12 21:58:24 +02003330 # Silence destructor error
3331 buffer.close = lambda: None
3332 txt.flush = lambda: None
3333
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003334 def test_nonnormalized_close_error_on_close(self):
3335 # Issue #21677
3336 buffer = self.BytesIO(self.testdata)
3337 def bad_flush():
3338 raise non_existing_flush
3339 def bad_close():
3340 raise non_existing_close
3341 buffer.close = bad_close
3342 txt = self.TextIOWrapper(buffer, encoding="ascii")
3343 txt.flush = bad_flush
3344 with self.assertRaises(NameError) as err: # exception not swallowed
3345 txt.close()
3346 self.assertIn('non_existing_close', str(err.exception))
3347 self.assertIsInstance(err.exception.__context__, NameError)
3348 self.assertIn('non_existing_flush', str(err.exception.__context__))
3349 self.assertFalse(txt.closed)
3350
Victor Stinner472f7942019-04-12 21:58:24 +02003351 # Silence destructor error
3352 buffer.close = lambda: None
3353 txt.flush = lambda: None
3354
Antoine Pitrou6be88762010-05-03 16:48:20 +00003355 def test_multi_close(self):
3356 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3357 txt.close()
3358 txt.close()
3359 txt.close()
3360 self.assertRaises(ValueError, txt.flush)
3361
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003362 def test_unseekable(self):
3363 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3364 self.assertRaises(self.UnsupportedOperation, txt.tell)
3365 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3366
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003367 def test_readonly_attributes(self):
3368 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3369 buf = self.BytesIO(self.testdata)
3370 with self.assertRaises(AttributeError):
3371 txt.buffer = buf
3372
Antoine Pitroue96ec682011-07-23 21:46:35 +02003373 def test_rawio(self):
3374 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3375 # that subprocess.Popen() can have the required unbuffered
3376 # semantics with universal_newlines=True.
3377 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3378 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3379 # Reads
3380 self.assertEqual(txt.read(4), 'abcd')
3381 self.assertEqual(txt.readline(), 'efghi\n')
3382 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3383
3384 def test_rawio_write_through(self):
3385 # Issue #12591: with write_through=True, writes don't need a flush
3386 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3387 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3388 write_through=True)
3389 txt.write('1')
3390 txt.write('23\n4')
3391 txt.write('5')
3392 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3393
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003394 def test_bufio_write_through(self):
3395 # Issue #21396: write_through=True doesn't force a flush()
3396 # on the underlying binary buffered object.
3397 flush_called, write_called = [], []
3398 class BufferedWriter(self.BufferedWriter):
3399 def flush(self, *args, **kwargs):
3400 flush_called.append(True)
3401 return super().flush(*args, **kwargs)
3402 def write(self, *args, **kwargs):
3403 write_called.append(True)
3404 return super().write(*args, **kwargs)
3405
3406 rawio = self.BytesIO()
3407 data = b"a"
3408 bufio = BufferedWriter(rawio, len(data)*2)
3409 textio = self.TextIOWrapper(bufio, encoding='ascii',
3410 write_through=True)
3411 # write to the buffered io but don't overflow the buffer
3412 text = data.decode('ascii')
3413 textio.write(text)
3414
3415 # buffer.flush is not called with write_through=True
3416 self.assertFalse(flush_called)
3417 # buffer.write *is* called with write_through=True
3418 self.assertTrue(write_called)
3419 self.assertEqual(rawio.getvalue(), b"") # no flush
3420
3421 write_called = [] # reset
3422 textio.write(text * 10) # total content is larger than bufio buffer
3423 self.assertTrue(write_called)
3424 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3425
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003426 def test_reconfigure_write_through(self):
3427 raw = self.MockRawIO([])
3428 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3429 t.write('1')
3430 t.reconfigure(write_through=True) # implied flush
3431 self.assertEqual(t.write_through, True)
3432 self.assertEqual(b''.join(raw._write_stack), b'1')
3433 t.write('23')
3434 self.assertEqual(b''.join(raw._write_stack), b'123')
3435 t.reconfigure(write_through=False)
3436 self.assertEqual(t.write_through, False)
3437 t.write('45')
3438 t.flush()
3439 self.assertEqual(b''.join(raw._write_stack), b'12345')
3440 # Keeping default value
3441 t.reconfigure()
3442 t.reconfigure(write_through=None)
3443 self.assertEqual(t.write_through, False)
3444 t.reconfigure(write_through=True)
3445 t.reconfigure()
3446 t.reconfigure(write_through=None)
3447 self.assertEqual(t.write_through, True)
3448
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003449 def test_read_nonbytes(self):
3450 # Issue #17106
3451 # Crash when underlying read() returns non-bytes
3452 t = self.TextIOWrapper(self.StringIO('a'))
3453 self.assertRaises(TypeError, t.read, 1)
3454 t = self.TextIOWrapper(self.StringIO('a'))
3455 self.assertRaises(TypeError, t.readline)
3456 t = self.TextIOWrapper(self.StringIO('a'))
3457 self.assertRaises(TypeError, t.read)
3458
Oren Milmana5b4ea12017-08-25 21:14:54 +03003459 def test_illegal_encoder(self):
3460 # Issue 31271: Calling write() while the return value of encoder's
3461 # encode() is invalid shouldn't cause an assertion failure.
3462 rot13 = codecs.lookup("rot13")
3463 with support.swap_attr(rot13, '_is_text_encoding', True):
3464 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3465 self.assertRaises(TypeError, t.write, 'bar')
3466
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003467 def test_illegal_decoder(self):
3468 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003469 # Bypass the early encoding check added in issue 20404
3470 def _make_illegal_wrapper():
3471 quopri = codecs.lookup("quopri")
3472 quopri._is_text_encoding = True
3473 try:
3474 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3475 newline='\n', encoding="quopri")
3476 finally:
3477 quopri._is_text_encoding = False
3478 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003479 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003480 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003481 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003482 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003483 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003484 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003485 self.assertRaises(TypeError, t.read)
3486
Oren Milmanba7d7362017-08-29 11:58:27 +03003487 # Issue 31243: calling read() while the return value of decoder's
3488 # getstate() is invalid should neither crash the interpreter nor
3489 # raise a SystemError.
3490 def _make_very_illegal_wrapper(getstate_ret_val):
3491 class BadDecoder:
3492 def getstate(self):
3493 return getstate_ret_val
3494 def _get_bad_decoder(dummy):
3495 return BadDecoder()
3496 quopri = codecs.lookup("quopri")
3497 with support.swap_attr(quopri, 'incrementaldecoder',
3498 _get_bad_decoder):
3499 return _make_illegal_wrapper()
3500 t = _make_very_illegal_wrapper(42)
3501 self.assertRaises(TypeError, t.read, 42)
3502 t = _make_very_illegal_wrapper(())
3503 self.assertRaises(TypeError, t.read, 42)
3504 t = _make_very_illegal_wrapper((1, 2))
3505 self.assertRaises(TypeError, t.read, 42)
3506
Antoine Pitrou712cb732013-12-21 15:51:54 +01003507 def _check_create_at_shutdown(self, **kwargs):
3508 # Issue #20037: creating a TextIOWrapper at shutdown
3509 # shouldn't crash the interpreter.
3510 iomod = self.io.__name__
3511 code = """if 1:
3512 import codecs
3513 import {iomod} as io
3514
3515 # Avoid looking up codecs at shutdown
3516 codecs.lookup('utf-8')
3517
3518 class C:
3519 def __init__(self):
3520 self.buf = io.BytesIO()
3521 def __del__(self):
3522 io.TextIOWrapper(self.buf, **{kwargs})
3523 print("ok")
3524 c = C()
3525 """.format(iomod=iomod, kwargs=kwargs)
3526 return assert_python_ok("-c", code)
3527
3528 def test_create_at_shutdown_without_encoding(self):
3529 rc, out, err = self._check_create_at_shutdown()
3530 if err:
3531 # Can error out with a RuntimeError if the module state
3532 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003533 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003534 else:
3535 self.assertEqual("ok", out.decode().strip())
3536
3537 def test_create_at_shutdown_with_encoding(self):
3538 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3539 errors='strict')
3540 self.assertFalse(err)
3541 self.assertEqual("ok", out.decode().strip())
3542
Antoine Pitroub8503892014-04-29 10:14:02 +02003543 def test_read_byteslike(self):
3544 r = MemviewBytesIO(b'Just some random string\n')
3545 t = self.TextIOWrapper(r, 'utf-8')
3546
3547 # TextIOwrapper will not read the full string, because
3548 # we truncate it to a multiple of the native int size
3549 # so that we can construct a more complex memoryview.
3550 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3551
3552 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3553
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003554 def test_issue22849(self):
3555 class F(object):
3556 def readable(self): return True
3557 def writable(self): return True
3558 def seekable(self): return True
3559
3560 for i in range(10):
3561 try:
3562 self.TextIOWrapper(F(), encoding='utf-8')
3563 except Exception:
3564 pass
3565
3566 F.tell = lambda x: 0
3567 t = self.TextIOWrapper(F(), encoding='utf-8')
3568
INADA Naoki507434f2017-12-21 09:59:53 +09003569 def test_reconfigure_encoding_read(self):
3570 # latin1 -> utf8
3571 # (latin1 can decode utf-8 encoded string)
3572 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3573 raw = self.BytesIO(data)
3574 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3575 self.assertEqual(txt.readline(), 'abc\xe9\n')
3576 with self.assertRaises(self.UnsupportedOperation):
3577 txt.reconfigure(encoding='utf-8')
3578 with self.assertRaises(self.UnsupportedOperation):
3579 txt.reconfigure(newline=None)
3580
3581 def test_reconfigure_write_fromascii(self):
3582 # ascii has a specific encodefunc in the C implementation,
3583 # but utf-8-sig has not. Make sure that we get rid of the
3584 # cached encodefunc when we switch encoders.
3585 raw = self.BytesIO()
3586 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3587 txt.write('foo\n')
3588 txt.reconfigure(encoding='utf-8-sig')
3589 txt.write('\xe9\n')
3590 txt.flush()
3591 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3592
3593 def test_reconfigure_write(self):
3594 # latin -> utf8
3595 raw = self.BytesIO()
3596 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3597 txt.write('abc\xe9\n')
3598 txt.reconfigure(encoding='utf-8')
3599 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3600 txt.write('d\xe9f\n')
3601 txt.flush()
3602 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3603
3604 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3605 # the file
3606 raw = self.BytesIO()
3607 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3608 txt.write('abc\n')
3609 txt.reconfigure(encoding='utf-8-sig')
3610 txt.write('d\xe9f\n')
3611 txt.flush()
3612 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3613
3614 def test_reconfigure_write_non_seekable(self):
3615 raw = self.BytesIO()
3616 raw.seekable = lambda: False
3617 raw.seek = None
3618 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3619 txt.write('abc\n')
3620 txt.reconfigure(encoding='utf-8-sig')
3621 txt.write('d\xe9f\n')
3622 txt.flush()
3623
3624 # If the raw stream is not seekable, there'll be a BOM
3625 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3626
3627 def test_reconfigure_defaults(self):
3628 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3629 txt.reconfigure(encoding=None)
3630 self.assertEqual(txt.encoding, 'ascii')
3631 self.assertEqual(txt.errors, 'replace')
3632 txt.write('LF\n')
3633
3634 txt.reconfigure(newline='\r\n')
3635 self.assertEqual(txt.encoding, 'ascii')
3636 self.assertEqual(txt.errors, 'replace')
3637
3638 txt.reconfigure(errors='ignore')
3639 self.assertEqual(txt.encoding, 'ascii')
3640 self.assertEqual(txt.errors, 'ignore')
3641 txt.write('CRLF\n')
3642
3643 txt.reconfigure(encoding='utf-8', newline=None)
3644 self.assertEqual(txt.errors, 'strict')
3645 txt.seek(0)
3646 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3647
3648 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3649
3650 def test_reconfigure_newline(self):
3651 raw = self.BytesIO(b'CR\rEOF')
3652 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3653 txt.reconfigure(newline=None)
3654 self.assertEqual(txt.readline(), 'CR\n')
3655 raw = self.BytesIO(b'CR\rEOF')
3656 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3657 txt.reconfigure(newline='')
3658 self.assertEqual(txt.readline(), 'CR\r')
3659 raw = self.BytesIO(b'CR\rLF\nEOF')
3660 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3661 txt.reconfigure(newline='\n')
3662 self.assertEqual(txt.readline(), 'CR\rLF\n')
3663 raw = self.BytesIO(b'LF\nCR\rEOF')
3664 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3665 txt.reconfigure(newline='\r')
3666 self.assertEqual(txt.readline(), 'LF\nCR\r')
3667 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3668 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3669 txt.reconfigure(newline='\r\n')
3670 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3671
3672 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3673 txt.reconfigure(newline=None)
3674 txt.write('linesep\n')
3675 txt.reconfigure(newline='')
3676 txt.write('LF\n')
3677 txt.reconfigure(newline='\n')
3678 txt.write('LF\n')
3679 txt.reconfigure(newline='\r')
3680 txt.write('CR\n')
3681 txt.reconfigure(newline='\r\n')
3682 txt.write('CRLF\n')
3683 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3684 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3685
Zackery Spytz23db9352018-06-29 04:14:58 -06003686 def test_issue25862(self):
3687 # Assertion failures occurred in tell() after read() and write().
3688 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3689 t.read(1)
3690 t.read()
3691 t.tell()
3692 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3693 t.read(1)
3694 t.write('x')
3695 t.tell()
3696
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003697
Antoine Pitroub8503892014-04-29 10:14:02 +02003698class MemviewBytesIO(io.BytesIO):
3699 '''A BytesIO object whose read method returns memoryviews
3700 rather than bytes'''
3701
3702 def read1(self, len_):
3703 return _to_memoryview(super().read1(len_))
3704
3705 def read(self, len_):
3706 return _to_memoryview(super().read(len_))
3707
3708def _to_memoryview(buf):
3709 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3710
3711 arr = array.array('i')
3712 idx = len(buf) - len(buf) % arr.itemsize
3713 arr.frombytes(buf[:idx])
3714 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003715
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003717class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003718 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003719 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003720
3721 def test_initialization(self):
3722 r = self.BytesIO(b"\xc3\xa9\n\n")
3723 b = self.BufferedReader(r, 1000)
3724 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003725 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3726 self.assertRaises(ValueError, t.read)
3727
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003728 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3729 self.assertRaises(Exception, repr, t)
3730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003731 def test_garbage_collection(self):
3732 # C TextIOWrapper objects are collected, and collecting them flushes
3733 # all data to disk.
3734 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003735 with support.check_warnings(('', ResourceWarning)):
3736 rawio = io.FileIO(support.TESTFN, "wb")
3737 b = self.BufferedWriter(rawio)
3738 t = self.TextIOWrapper(b, encoding="ascii")
3739 t.write("456def")
3740 t.x = t
3741 wr = weakref.ref(t)
3742 del t
3743 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003744 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003745 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003746 self.assertEqual(f.read(), b"456def")
3747
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003748 def test_rwpair_cleared_before_textio(self):
3749 # Issue 13070: TextIOWrapper's finalization would crash when called
3750 # after the reference to the underlying BufferedRWPair's writer got
3751 # cleared by the GC.
3752 for i in range(1000):
3753 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3754 t1 = self.TextIOWrapper(b1, encoding="ascii")
3755 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3756 t2 = self.TextIOWrapper(b2, encoding="ascii")
3757 # circular references
3758 t1.buddy = t2
3759 t2.buddy = t1
3760 support.gc_collect()
3761
Zackery Spytz842acaa2018-12-17 07:52:45 -07003762 def test_del__CHUNK_SIZE_SystemError(self):
3763 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3764 with self.assertRaises(AttributeError):
3765 del t._CHUNK_SIZE
3766
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003767
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003768class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003769 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003770 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003771
3772
3773class IncrementalNewlineDecoderTest(unittest.TestCase):
3774
3775 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003776 # UTF-8 specific tests for a newline decoder
3777 def _check_decode(b, s, **kwargs):
3778 # We exercise getstate() / setstate() as well as decode()
3779 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003780 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003781 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003782 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003783
Antoine Pitrou180a3362008-12-14 16:36:46 +00003784 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003785
Antoine Pitrou180a3362008-12-14 16:36:46 +00003786 _check_decode(b'\xe8', "")
3787 _check_decode(b'\xa2', "")
3788 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003789
Antoine Pitrou180a3362008-12-14 16:36:46 +00003790 _check_decode(b'\xe8', "")
3791 _check_decode(b'\xa2', "")
3792 _check_decode(b'\x88', "\u8888")
3793
3794 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003795 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3796
Antoine Pitrou180a3362008-12-14 16:36:46 +00003797 decoder.reset()
3798 _check_decode(b'\n', "\n")
3799 _check_decode(b'\r', "")
3800 _check_decode(b'', "\n", final=True)
3801 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003802
Antoine Pitrou180a3362008-12-14 16:36:46 +00003803 _check_decode(b'\r', "")
3804 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003805
Antoine Pitrou180a3362008-12-14 16:36:46 +00003806 _check_decode(b'\r\r\n', "\n\n")
3807 _check_decode(b'\r', "")
3808 _check_decode(b'\r', "\n")
3809 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003810
Antoine Pitrou180a3362008-12-14 16:36:46 +00003811 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3812 _check_decode(b'\xe8\xa2\x88', "\u8888")
3813 _check_decode(b'\n', "\n")
3814 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3815 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003816
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003817 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003818 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003819 if encoding is not None:
3820 encoder = codecs.getincrementalencoder(encoding)()
3821 def _decode_bytewise(s):
3822 # Decode one byte at a time
3823 for b in encoder.encode(s):
3824 result.append(decoder.decode(bytes([b])))
3825 else:
3826 encoder = None
3827 def _decode_bytewise(s):
3828 # Decode one char at a time
3829 for c in s:
3830 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003831 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003832 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003833 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003834 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003835 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003836 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003837 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003838 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003839 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003840 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003841 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003842 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003843 input = "abc"
3844 if encoder is not None:
3845 encoder.reset()
3846 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003847 self.assertEqual(decoder.decode(input), "abc")
3848 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003849
3850 def test_newline_decoder(self):
3851 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003852 # None meaning the IncrementalNewlineDecoder takes unicode input
3853 # rather than bytes input
3854 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003855 'utf-16', 'utf-16-le', 'utf-16-be',
3856 'utf-32', 'utf-32-le', 'utf-32-be',
3857 )
3858 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003859 decoder = enc and codecs.getincrementaldecoder(enc)()
3860 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3861 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003862 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003863 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3864 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003865 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003866
Antoine Pitrou66913e22009-03-06 23:40:56 +00003867 def test_newline_bytes(self):
3868 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3869 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003870 self.assertEqual(dec.newlines, None)
3871 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3872 self.assertEqual(dec.newlines, None)
3873 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3874 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003875 dec = self.IncrementalNewlineDecoder(None, translate=False)
3876 _check(dec)
3877 dec = self.IncrementalNewlineDecoder(None, translate=True)
3878 _check(dec)
3879
Xiang Zhangb08746b2018-10-31 19:49:16 +08003880 def test_translate(self):
3881 # issue 35062
3882 for translate in (-2, -1, 1, 2):
3883 decoder = codecs.getincrementaldecoder("utf-8")()
3884 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3885 self.check_newline_decoding_utf8(decoder)
3886 decoder = codecs.getincrementaldecoder("utf-8")()
3887 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3888 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3889
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003890class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3891 pass
3892
3893class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3894 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003895
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003896
Guido van Rossum01a27522007-03-07 01:00:12 +00003897# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003898
Guido van Rossum5abbf752007-08-27 17:39:33 +00003899class MiscIOTest(unittest.TestCase):
3900
Barry Warsaw40e82462008-11-20 20:14:50 +00003901 def tearDown(self):
3902 support.unlink(support.TESTFN)
3903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003904 def test___all__(self):
3905 for name in self.io.__all__:
3906 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003907 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003908 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003909 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003910 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003911 self.assertTrue(issubclass(obj, Exception), name)
3912 elif not name.startswith("SEEK_"):
3913 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003914
Barry Warsaw40e82462008-11-20 20:14:50 +00003915 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003916 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003917 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003918 f.close()
3919
Victor Stinner942f7a22020-03-04 18:50:22 +01003920 with support.check_warnings(('', DeprecationWarning)):
3921 f = self.open(support.TESTFN, "U")
3922 self.assertEqual(f.name, support.TESTFN)
3923 self.assertEqual(f.buffer.name, support.TESTFN)
3924 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3925 self.assertEqual(f.mode, "U")
3926 self.assertEqual(f.buffer.mode, "rb")
3927 self.assertEqual(f.buffer.raw.mode, "rb")
3928 f.close()
3929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003930 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003931 self.assertEqual(f.mode, "w+")
3932 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3933 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003934
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003935 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003936 self.assertEqual(g.mode, "wb")
3937 self.assertEqual(g.raw.mode, "wb")
3938 self.assertEqual(g.name, f.fileno())
3939 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003940 f.close()
3941 g.close()
3942
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003943 def test_open_pipe_with_append(self):
3944 # bpo-27805: Ignore ESPIPE from lseek() in open().
3945 r, w = os.pipe()
3946 self.addCleanup(os.close, r)
3947 f = self.open(w, 'a')
3948 self.addCleanup(f.close)
3949 # Check that the file is marked non-seekable. On Windows, however, lseek
3950 # somehow succeeds on pipes.
3951 if sys.platform != 'win32':
3952 self.assertFalse(f.seekable())
3953
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003954 def test_io_after_close(self):
3955 for kwargs in [
3956 {"mode": "w"},
3957 {"mode": "wb"},
3958 {"mode": "w", "buffering": 1},
3959 {"mode": "w", "buffering": 2},
3960 {"mode": "wb", "buffering": 0},
3961 {"mode": "r"},
3962 {"mode": "rb"},
3963 {"mode": "r", "buffering": 1},
3964 {"mode": "r", "buffering": 2},
3965 {"mode": "rb", "buffering": 0},
3966 {"mode": "w+"},
3967 {"mode": "w+b"},
3968 {"mode": "w+", "buffering": 1},
3969 {"mode": "w+", "buffering": 2},
3970 {"mode": "w+b", "buffering": 0},
3971 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003972 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003973 f.close()
3974 self.assertRaises(ValueError, f.flush)
3975 self.assertRaises(ValueError, f.fileno)
3976 self.assertRaises(ValueError, f.isatty)
3977 self.assertRaises(ValueError, f.__iter__)
3978 if hasattr(f, "peek"):
3979 self.assertRaises(ValueError, f.peek, 1)
3980 self.assertRaises(ValueError, f.read)
3981 if hasattr(f, "read1"):
3982 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003983 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003984 if hasattr(f, "readall"):
3985 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003986 if hasattr(f, "readinto"):
3987 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003988 if hasattr(f, "readinto1"):
3989 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003990 self.assertRaises(ValueError, f.readline)
3991 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003992 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003993 self.assertRaises(ValueError, f.seek, 0)
3994 self.assertRaises(ValueError, f.tell)
3995 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003996 self.assertRaises(ValueError, f.write,
3997 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003998 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003999 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00004000
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004001 def test_blockingioerror(self):
4002 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004003 class C(str):
4004 pass
4005 c = C("")
4006 b = self.BlockingIOError(1, c)
4007 c.b = b
4008 b.c = c
4009 wr = weakref.ref(c)
4010 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00004011 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004012 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004013
4014 def test_abcs(self):
4015 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00004016 self.assertIsInstance(self.IOBase, abc.ABCMeta)
4017 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
4018 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
4019 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004020
4021 def _check_abc_inheritance(self, abcmodule):
4022 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004023 self.assertIsInstance(f, abcmodule.IOBase)
4024 self.assertIsInstance(f, abcmodule.RawIOBase)
4025 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4026 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004027 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004028 self.assertIsInstance(f, abcmodule.IOBase)
4029 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4030 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4031 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004032 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004033 self.assertIsInstance(f, abcmodule.IOBase)
4034 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4035 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4036 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004037
4038 def test_abc_inheritance(self):
4039 # Test implementations inherit from their respective ABCs
4040 self._check_abc_inheritance(self)
4041
4042 def test_abc_inheritance_official(self):
4043 # Test implementations inherit from the official ABCs of the
4044 # baseline "io" module.
4045 self._check_abc_inheritance(io)
4046
Antoine Pitroue033e062010-10-29 10:38:18 +00004047 def _check_warn_on_dealloc(self, *args, **kwargs):
4048 f = open(*args, **kwargs)
4049 r = repr(f)
4050 with self.assertWarns(ResourceWarning) as cm:
4051 f = None
4052 support.gc_collect()
4053 self.assertIn(r, str(cm.warning.args[0]))
4054
4055 def test_warn_on_dealloc(self):
4056 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4057 self._check_warn_on_dealloc(support.TESTFN, "wb")
4058 self._check_warn_on_dealloc(support.TESTFN, "w")
4059
4060 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4061 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004062 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004063 for fd in fds:
4064 try:
4065 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004066 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004067 if e.errno != errno.EBADF:
4068 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004069 self.addCleanup(cleanup_fds)
4070 r, w = os.pipe()
4071 fds += r, w
4072 self._check_warn_on_dealloc(r, *args, **kwargs)
4073 # When using closefd=False, there's no warning
4074 r, w = os.pipe()
4075 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004076 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004077 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004078
4079 def test_warn_on_dealloc_fd(self):
4080 self._check_warn_on_dealloc_fd("rb", buffering=0)
4081 self._check_warn_on_dealloc_fd("rb")
4082 self._check_warn_on_dealloc_fd("r")
4083
4084
Antoine Pitrou243757e2010-11-05 21:15:39 +00004085 def test_pickling(self):
4086 # Pickling file objects is forbidden
4087 for kwargs in [
4088 {"mode": "w"},
4089 {"mode": "wb"},
4090 {"mode": "wb", "buffering": 0},
4091 {"mode": "r"},
4092 {"mode": "rb"},
4093 {"mode": "rb", "buffering": 0},
4094 {"mode": "w+"},
4095 {"mode": "w+b"},
4096 {"mode": "w+b", "buffering": 0},
4097 ]:
4098 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4099 with self.open(support.TESTFN, **kwargs) as f:
4100 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4101
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004102 def test_nonblock_pipe_write_bigbuf(self):
4103 self._test_nonblock_pipe_write(16*1024)
4104
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004105 def test_nonblock_pipe_write_smallbuf(self):
4106 self._test_nonblock_pipe_write(1024)
4107
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004108 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4109 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004110 def _test_nonblock_pipe_write(self, bufsize):
4111 sent = []
4112 received = []
4113 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004114 os.set_blocking(r, False)
4115 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004116
4117 # To exercise all code paths in the C implementation we need
4118 # to play with buffer sizes. For instance, if we choose a
4119 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4120 # then we will never get a partial write of the buffer.
4121 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4122 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4123
4124 with rf, wf:
4125 for N in 9999, 73, 7574:
4126 try:
4127 i = 0
4128 while True:
4129 msg = bytes([i % 26 + 97]) * N
4130 sent.append(msg)
4131 wf.write(msg)
4132 i += 1
4133
4134 except self.BlockingIOError as e:
4135 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004136 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004137 sent[-1] = sent[-1][:e.characters_written]
4138 received.append(rf.read())
4139 msg = b'BLOCKED'
4140 wf.write(msg)
4141 sent.append(msg)
4142
4143 while True:
4144 try:
4145 wf.flush()
4146 break
4147 except self.BlockingIOError as e:
4148 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004149 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004150 self.assertEqual(e.characters_written, 0)
4151 received.append(rf.read())
4152
4153 received += iter(rf.read, None)
4154
4155 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004156 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004157 self.assertTrue(wf.closed)
4158 self.assertTrue(rf.closed)
4159
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004160 def test_create_fail(self):
4161 # 'x' mode fails if file is existing
4162 with self.open(support.TESTFN, 'w'):
4163 pass
4164 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4165
4166 def test_create_writes(self):
4167 # 'x' mode opens for writing
4168 with self.open(support.TESTFN, 'xb') as f:
4169 f.write(b"spam")
4170 with self.open(support.TESTFN, 'rb') as f:
4171 self.assertEqual(b"spam", f.read())
4172
Christian Heimes7b648752012-09-10 14:48:43 +02004173 def test_open_allargs(self):
4174 # there used to be a buffer overflow in the parser for rawmode
4175 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4176
Victor Stinner22eb6892019-06-26 00:51:05 +02004177 def test_check_encoding_errors(self):
4178 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4179 # arguments in dev mode
4180 mod = self.io.__name__
4181 filename = __file__
4182 invalid = 'Boom, Shaka Laka, Boom!'
4183 code = textwrap.dedent(f'''
4184 import sys
4185 from {mod} import open, TextIOWrapper
4186
4187 try:
4188 open({filename!r}, encoding={invalid!r})
4189 except LookupError:
4190 pass
4191 else:
4192 sys.exit(21)
4193
4194 try:
4195 open({filename!r}, errors={invalid!r})
4196 except LookupError:
4197 pass
4198 else:
4199 sys.exit(22)
4200
4201 fp = open({filename!r}, "rb")
4202 with fp:
4203 try:
4204 TextIOWrapper(fp, encoding={invalid!r})
4205 except LookupError:
4206 pass
4207 else:
4208 sys.exit(23)
4209
4210 try:
4211 TextIOWrapper(fp, errors={invalid!r})
4212 except LookupError:
4213 pass
4214 else:
4215 sys.exit(24)
4216
4217 sys.exit(10)
4218 ''')
4219 proc = assert_python_failure('-X', 'dev', '-c', code)
4220 self.assertEqual(proc.rc, 10, proc)
4221
Christian Heimes7b648752012-09-10 14:48:43 +02004222
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004223class CMiscIOTest(MiscIOTest):
4224 io = io
4225
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004226 def test_readinto_buffer_overflow(self):
4227 # Issue #18025
4228 class BadReader(self.io.BufferedIOBase):
4229 def read(self, n=-1):
4230 return b'x' * 10**6
4231 bufio = BadReader()
4232 b = bytearray(2)
4233 self.assertRaises(ValueError, bufio.readinto, b)
4234
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004235 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4236 # Issue #23309: deadlocks at shutdown should be avoided when a
4237 # daemon thread and the main thread both write to a file.
4238 code = """if 1:
4239 import sys
4240 import time
4241 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004242 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004243
4244 file = sys.{stream_name}
4245
4246 def run():
4247 while True:
4248 file.write('.')
4249 file.flush()
4250
Victor Stinner2a1aed02017-04-21 17:59:23 +02004251 crash = SuppressCrashReport()
4252 crash.__enter__()
4253 # don't call __exit__(): the crash occurs at Python shutdown
4254
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004255 thread = threading.Thread(target=run)
4256 thread.daemon = True
4257 thread.start()
4258
4259 time.sleep(0.5)
4260 file.write('!')
4261 file.flush()
4262 """.format_map(locals())
4263 res, _ = run_python_until_end("-c", code)
4264 err = res.err.decode()
4265 if res.rc != 0:
4266 # Failure: should be a fatal error
Victor Stinner9e5d30c2020-03-07 00:54:20 +01004267 pattern = (r"Fatal Python error: _enter_buffered_busy: "
4268 r"could not acquire lock "
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004269 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4270 r"at interpreter shutdown, possibly due to "
4271 r"daemon threads".format_map(locals()))
4272 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004273 else:
4274 self.assertFalse(err.strip('.!'))
4275
4276 def test_daemon_threads_shutdown_stdout_deadlock(self):
4277 self.check_daemon_threads_shutdown_deadlock('stdout')
4278
4279 def test_daemon_threads_shutdown_stderr_deadlock(self):
4280 self.check_daemon_threads_shutdown_deadlock('stderr')
4281
4282
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004283class PyMiscIOTest(MiscIOTest):
4284 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004285
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004286
4287@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4288class SignalsTest(unittest.TestCase):
4289
4290 def setUp(self):
4291 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4292
4293 def tearDown(self):
4294 signal.signal(signal.SIGALRM, self.oldalrm)
4295
4296 def alarm_interrupt(self, sig, frame):
4297 1/0
4298
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004299 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4300 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004301 invokes the signal handler, and bubbles up the exception raised
4302 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004303 read_results = []
4304 def _read():
4305 s = os.read(r, 1)
4306 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004307
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004308 t = threading.Thread(target=_read)
4309 t.daemon = True
4310 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004311 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004312 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004313 try:
4314 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004315 if hasattr(signal, 'pthread_sigmask'):
4316 # create the thread with SIGALRM signal blocked
4317 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4318 t.start()
4319 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4320 else:
4321 t.start()
4322
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004323 # Fill the pipe enough that the write will be blocking.
4324 # It will be interrupted by the timer armed above. Since the
4325 # other thread has read one byte, the low-level write will
4326 # return with a successful (partial) result rather than an EINTR.
4327 # The buffered IO layer must check for pending signal
4328 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004329 signal.alarm(1)
4330 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004331 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004332 finally:
4333 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004334 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004335 # We got one byte, get another one and check that it isn't a
4336 # repeat of the first one.
4337 read_results.append(os.read(r, 1))
4338 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4339 finally:
4340 os.close(w)
4341 os.close(r)
4342 # This is deliberate. If we didn't close the file descriptor
4343 # before closing wio, wio would try to flush its internal
4344 # buffer, and block again.
4345 try:
4346 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004347 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004348 if e.errno != errno.EBADF:
4349 raise
4350
4351 def test_interrupted_write_unbuffered(self):
4352 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4353
4354 def test_interrupted_write_buffered(self):
4355 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4356
4357 def test_interrupted_write_text(self):
4358 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4359
Brett Cannon31f59292011-02-21 19:29:56 +00004360 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004361 def check_reentrant_write(self, data, **fdopen_kwargs):
4362 def on_alarm(*args):
4363 # Will be called reentrantly from the same thread
4364 wio.write(data)
4365 1/0
4366 signal.signal(signal.SIGALRM, on_alarm)
4367 r, w = os.pipe()
4368 wio = self.io.open(w, **fdopen_kwargs)
4369 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004370 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004371 # Either the reentrant call to wio.write() fails with RuntimeError,
4372 # or the signal handler raises ZeroDivisionError.
4373 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4374 while 1:
4375 for i in range(100):
4376 wio.write(data)
4377 wio.flush()
4378 # Make sure the buffer doesn't fill up and block further writes
4379 os.read(r, len(data) * 100)
4380 exc = cm.exception
4381 if isinstance(exc, RuntimeError):
4382 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4383 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004384 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004385 wio.close()
4386 os.close(r)
4387
4388 def test_reentrant_write_buffered(self):
4389 self.check_reentrant_write(b"xy", mode="wb")
4390
4391 def test_reentrant_write_text(self):
4392 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4393
Antoine Pitrou707ce822011-02-25 21:24:11 +00004394 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4395 """Check that a buffered read, when it gets interrupted (either
4396 returning a partial result or EINTR), properly invokes the signal
4397 handler and retries if the latter returned successfully."""
4398 r, w = os.pipe()
4399 fdopen_kwargs["closefd"] = False
4400 def alarm_handler(sig, frame):
4401 os.write(w, b"bar")
4402 signal.signal(signal.SIGALRM, alarm_handler)
4403 try:
4404 rio = self.io.open(r, **fdopen_kwargs)
4405 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004406 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004407 # Expected behaviour:
4408 # - first raw read() returns partial b"foo"
4409 # - second raw read() returns EINTR
4410 # - third raw read() returns b"bar"
4411 self.assertEqual(decode(rio.read(6)), "foobar")
4412 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004413 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004414 rio.close()
4415 os.close(w)
4416 os.close(r)
4417
Antoine Pitrou20db5112011-08-19 20:32:34 +02004418 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004419 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4420 mode="rb")
4421
Antoine Pitrou20db5112011-08-19 20:32:34 +02004422 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004423 self.check_interrupted_read_retry(lambda x: x,
4424 mode="r")
4425
Antoine Pitrou707ce822011-02-25 21:24:11 +00004426 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4427 """Check that a buffered write, when it gets interrupted (either
4428 returning a partial result or EINTR), properly invokes the signal
4429 handler and retries if the latter returned successfully."""
4430 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004431
Antoine Pitrou707ce822011-02-25 21:24:11 +00004432 # A quantity that exceeds the buffer size of an anonymous pipe's
4433 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004434 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004435 r, w = os.pipe()
4436 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004437
Antoine Pitrou707ce822011-02-25 21:24:11 +00004438 # We need a separate thread to read from the pipe and allow the
4439 # write() to finish. This thread is started after the SIGALRM is
4440 # received (forcing a first EINTR in write()).
4441 read_results = []
4442 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004443 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004444 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004445 try:
4446 while not write_finished:
4447 while r in select.select([r], [], [], 1.0)[0]:
4448 s = os.read(r, 1024)
4449 read_results.append(s)
4450 except BaseException as exc:
4451 nonlocal error
4452 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004453 t = threading.Thread(target=_read)
4454 t.daemon = True
4455 def alarm1(sig, frame):
4456 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004457 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004458 def alarm2(sig, frame):
4459 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004460
4461 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004462 signal.signal(signal.SIGALRM, alarm1)
4463 try:
4464 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004465 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004466 # Expected behaviour:
4467 # - first raw write() is partial (because of the limited pipe buffer
4468 # and the first alarm)
4469 # - second raw write() returns EINTR (because of the second alarm)
4470 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004471 written = wio.write(large_data)
4472 self.assertEqual(N, written)
4473
Antoine Pitrou707ce822011-02-25 21:24:11 +00004474 wio.flush()
4475 write_finished = True
4476 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004477
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004478 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004479 self.assertEqual(N, sum(len(x) for x in read_results))
4480 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004481 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004482 write_finished = True
4483 os.close(w)
4484 os.close(r)
4485 # This is deliberate. If we didn't close the file descriptor
4486 # before closing wio, wio would try to flush its internal
4487 # buffer, and could block (in case of failure).
4488 try:
4489 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004490 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004491 if e.errno != errno.EBADF:
4492 raise
4493
Antoine Pitrou20db5112011-08-19 20:32:34 +02004494 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004495 self.check_interrupted_write_retry(b"x", mode="wb")
4496
Antoine Pitrou20db5112011-08-19 20:32:34 +02004497 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004498 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4499
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004500
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004501class CSignalsTest(SignalsTest):
4502 io = io
4503
4504class PySignalsTest(SignalsTest):
4505 io = pyio
4506
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004507 # Handling reentrancy issues would slow down _pyio even more, so the
4508 # tests are disabled.
4509 test_reentrant_write_buffered = None
4510 test_reentrant_write_text = None
4511
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004512
Ezio Melottidaa42c72013-03-23 16:30:16 +02004513def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004514 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004515 CBufferedReaderTest, PyBufferedReaderTest,
4516 CBufferedWriterTest, PyBufferedWriterTest,
4517 CBufferedRWPairTest, PyBufferedRWPairTest,
4518 CBufferedRandomTest, PyBufferedRandomTest,
4519 StatefulIncrementalDecoderTest,
4520 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4521 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004522 CMiscIOTest, PyMiscIOTest,
4523 CSignalsTest, PySignalsTest,
4524 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004525
4526 # Put the namespaces of the IO module we are testing and some useful mock
4527 # classes in the __dict__ of each test.
4528 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004529 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4530 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004531 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4532 c_io_ns = {name : getattr(io, name) for name in all_members}
4533 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4534 globs = globals()
4535 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4536 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4537 # Avoid turning open into a bound method.
4538 py_io_ns["open"] = pyio.OpenWrapper
4539 for test in tests:
4540 if test.__name__.startswith("C"):
4541 for name, obj in c_io_ns.items():
4542 setattr(test, name, obj)
4543 elif test.__name__.startswith("Py"):
4544 for name, obj in py_io_ns.items():
4545 setattr(test, name, obj)
4546
Ezio Melottidaa42c72013-03-23 16:30:16 +02004547 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4548 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004549
4550if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004551 unittest.main()