blob: 0bfa4d249e9a6bfcfec8303d2b53f2d60a7d8a92 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Serhiy Storchakab21d1552018-03-02 11:53:51 +020043from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000044
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000045import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000046import io # C implementation of io
47import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Martin Panter6bb91f32016-05-28 00:41:57 +000049try:
50 import ctypes
51except ImportError:
52 def byteslike(*pos, **kw):
53 return array.array("b", bytes(*pos, **kw))
54else:
55 def byteslike(*pos, **kw):
56 """Create a bytes-like object having no string or sequence methods"""
57 data = bytes(*pos, **kw)
58 obj = EmptyStruct()
59 ctypes.resize(obj, len(data))
60 memoryview(obj).cast("B")[:] = data
61 return obj
62 class EmptyStruct(ctypes.Structure):
63 pass
64
Gregory P. Smithe5796c42018-12-30 20:17:57 -080065_cflags = sysconfig.get_config_var('CFLAGS') or ''
66_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
67MEMORY_SANITIZER = (
68 '-fsanitize=memory' in _cflags or
69 '--with-memory-sanitizer' in _config_args
70)
71
Victor Stinnerbc2aa812019-05-23 03:45:09 +020072# Does io.IOBase finalizer log the exception if the close() method fails?
73# The exception is ignored silently by default in release build.
74IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020075
76
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077def _default_chunk_size():
78 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000079 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return f._CHUNK_SIZE
81
82
Antoine Pitrou328ec742010-09-14 18:37:24 +000083class MockRawIOWithoutRead:
84 """A RawIO implementation without read(), so as to exercise the default
85 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000086
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000087 def __init__(self, read_stack=()):
88 self._read_stack = list(read_stack)
89 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000090 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000092
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000095 return len(b)
96
97 def writable(self):
98 return True
99
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000100 def fileno(self):
101 return 42
102
103 def readable(self):
104 return True
105
Guido van Rossum01a27522007-03-07 01:00:12 +0000106 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000107 return True
108
Guido van Rossum01a27522007-03-07 01:00:12 +0000109 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000110 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000111
112 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000113 return 0 # same comment as above
114
115 def readinto(self, buf):
116 self._reads += 1
117 max_len = len(buf)
118 try:
119 data = self._read_stack[0]
120 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000121 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000122 return 0
123 if data is None:
124 del self._read_stack[0]
125 return None
126 n = len(data)
127 if len(data) <= max_len:
128 del self._read_stack[0]
129 buf[:n] = data
130 return n
131 else:
132 buf[:] = data[:max_len]
133 self._read_stack[0] = data[max_len:]
134 return max_len
135
136 def truncate(self, pos=None):
137 return pos
138
Antoine Pitrou328ec742010-09-14 18:37:24 +0000139class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
140 pass
141
142class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
143 pass
144
145
146class MockRawIO(MockRawIOWithoutRead):
147
148 def read(self, n=None):
149 self._reads += 1
150 try:
151 return self._read_stack.pop(0)
152 except:
153 self._extraneous_reads += 1
154 return b""
155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000156class CMockRawIO(MockRawIO, io.RawIOBase):
157 pass
158
159class PyMockRawIO(MockRawIO, pyio.RawIOBase):
160 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000161
Guido van Rossuma9e20242007-03-08 00:43:48 +0000162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163class MisbehavedRawIO(MockRawIO):
164 def write(self, b):
165 return super().write(b) * 2
166
167 def read(self, n=None):
168 return super().read(n) * 2
169
170 def seek(self, pos, whence):
171 return -123
172
173 def tell(self):
174 return -456
175
176 def readinto(self, buf):
177 super().readinto(buf)
178 return len(buf) * 5
179
180class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
181 pass
182
183class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
184 pass
185
186
benfogle9703f092017-11-10 16:03:40 -0500187class SlowFlushRawIO(MockRawIO):
188 def __init__(self):
189 super().__init__()
190 self.in_flush = threading.Event()
191
192 def flush(self):
193 self.in_flush.set()
194 time.sleep(0.25)
195
196class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
197 pass
198
199class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
200 pass
201
202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000203class CloseFailureIO(MockRawIO):
204 closed = 0
205
206 def close(self):
207 if not self.closed:
208 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200209 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000210
211class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
212 pass
213
214class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
215 pass
216
217
218class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000219
220 def __init__(self, data):
221 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000223
224 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000225 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000226 self.read_history.append(None if res is None else len(res))
227 return res
228
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000229 def readinto(self, b):
230 res = super().readinto(b)
231 self.read_history.append(res)
232 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000234class CMockFileIO(MockFileIO, io.BytesIO):
235 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237class PyMockFileIO(MockFileIO, pyio.BytesIO):
238 pass
239
240
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000241class MockUnseekableIO:
242 def seekable(self):
243 return False
244
245 def seek(self, *args):
246 raise self.UnsupportedOperation("not seekable")
247
248 def tell(self, *args):
249 raise self.UnsupportedOperation("not seekable")
250
Martin Panter754aab22016-03-31 07:21:56 +0000251 def truncate(self, *args):
252 raise self.UnsupportedOperation("not seekable")
253
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000254class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
255 UnsupportedOperation = io.UnsupportedOperation
256
257class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
258 UnsupportedOperation = pyio.UnsupportedOperation
259
260
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000261class MockNonBlockWriterIO:
262
263 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000264 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000267 def pop_written(self):
268 s = b"".join(self._write_stack)
269 self._write_stack[:] = []
270 return s
271
272 def block_on(self, char):
273 """Block when a given char is encountered."""
274 self._blocker_char = char
275
276 def readable(self):
277 return True
278
279 def seekable(self):
280 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000281
Victor Stinnerb589cef2019-06-11 03:10:59 +0200282 def seek(self, pos, whence=0):
283 # naive implementation, enough for tests
284 return 0
285
Guido van Rossum01a27522007-03-07 01:00:12 +0000286 def writable(self):
287 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000289 def write(self, b):
290 b = bytes(b)
291 n = -1
292 if self._blocker_char:
293 try:
294 n = b.index(self._blocker_char)
295 except ValueError:
296 pass
297 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100298 if n > 0:
299 # write data up to the first blocker
300 self._write_stack.append(b[:n])
301 return n
302 else:
303 # cancel blocker and indicate would block
304 self._blocker_char = None
305 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000306 self._write_stack.append(b)
307 return len(b)
308
309class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
310 BlockingIOError = io.BlockingIOError
311
312class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
313 BlockingIOError = pyio.BlockingIOError
314
Guido van Rossuma9e20242007-03-08 00:43:48 +0000315
Guido van Rossum28524c72007-02-27 05:47:44 +0000316class IOTest(unittest.TestCase):
317
Neal Norwitze7789b12008-03-24 06:18:09 +0000318 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000319 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000320
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000321 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000322 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000323
Guido van Rossum28524c72007-02-27 05:47:44 +0000324 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000326 f.truncate(0)
327 self.assertEqual(f.tell(), 5)
328 f.seek(0)
329
330 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000331 self.assertEqual(f.seek(0), 0)
332 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000335 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000336 buffer = bytearray(b" world\n\n\n")
337 self.assertEqual(f.write(buffer), 9)
338 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000340 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000341 self.assertEqual(f.seek(-1, 2), 13)
342 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000343
Guido van Rossum87429772007-04-10 21:06:59 +0000344 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000345 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000346 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000347
Guido van Rossum9b76da62007-04-11 01:09:03 +0000348 def read_ops(self, f, buffered=False):
349 data = f.read(5)
350 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000351 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000352 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000353 self.assertEqual(bytes(data), b" worl")
354 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000355 self.assertEqual(f.readinto(data), 2)
356 self.assertEqual(len(data), 5)
357 self.assertEqual(data[:2], b"d\n")
358 self.assertEqual(f.seek(0), 0)
359 self.assertEqual(f.read(20), b"hello world\n")
360 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000361 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000362 self.assertEqual(f.seek(-6, 2), 6)
363 self.assertEqual(f.read(5), b"world")
364 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000365 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000366 self.assertEqual(f.seek(-6, 1), 5)
367 self.assertEqual(f.read(5), b" worl")
368 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000369 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000370 if buffered:
371 f.seek(0)
372 self.assertEqual(f.read(), b"hello world\n")
373 f.seek(6)
374 self.assertEqual(f.read(), b"world\n")
375 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000376 f.seek(0)
377 data = byteslike(5)
378 self.assertEqual(f.readinto1(data), 5)
379 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000380
Guido van Rossum34d69e52007-04-10 20:08:41 +0000381 LARGE = 2**31
382
Guido van Rossum53807da2007-04-10 19:01:47 +0000383 def large_file_ops(self, f):
384 assert f.readable()
385 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100386 try:
387 self.assertEqual(f.seek(self.LARGE), self.LARGE)
388 except (OverflowError, ValueError):
389 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000390 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000391 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000392 self.assertEqual(f.tell(), self.LARGE + 3)
393 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000394 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000395 self.assertEqual(f.tell(), self.LARGE + 2)
396 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000397 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000398 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000399 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
400 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000401 self.assertEqual(f.read(2), b"x")
402
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000403 def test_invalid_operations(self):
404 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000405 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000406 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000407 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000408 self.assertRaises(exc, fp.read)
409 self.assertRaises(exc, fp.readline)
410 with self.open(support.TESTFN, "wb", buffering=0) as fp:
411 self.assertRaises(exc, fp.read)
412 self.assertRaises(exc, fp.readline)
413 with self.open(support.TESTFN, "rb", buffering=0) as fp:
414 self.assertRaises(exc, fp.write, b"blah")
415 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000416 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000417 self.assertRaises(exc, fp.write, b"blah")
418 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000419 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000420 self.assertRaises(exc, fp.write, "blah")
421 self.assertRaises(exc, fp.writelines, ["blah\n"])
422 # Non-zero seeking from current or end pos
423 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
424 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000425
Martin Panter754aab22016-03-31 07:21:56 +0000426 def test_optional_abilities(self):
427 # Test for OSError when optional APIs are not supported
428 # The purpose of this test is to try fileno(), reading, writing and
429 # seeking operations with various objects that indicate they do not
430 # support these operations.
431
432 def pipe_reader():
433 [r, w] = os.pipe()
434 os.close(w) # So that read() is harmless
435 return self.FileIO(r, "r")
436
437 def pipe_writer():
438 [r, w] = os.pipe()
439 self.addCleanup(os.close, r)
440 # Guarantee that we can write into the pipe without blocking
441 thread = threading.Thread(target=os.read, args=(r, 100))
442 thread.start()
443 self.addCleanup(thread.join)
444 return self.FileIO(w, "w")
445
446 def buffered_reader():
447 return self.BufferedReader(self.MockUnseekableIO())
448
449 def buffered_writer():
450 return self.BufferedWriter(self.MockUnseekableIO())
451
452 def buffered_random():
453 return self.BufferedRandom(self.BytesIO())
454
455 def buffered_rw_pair():
456 return self.BufferedRWPair(self.MockUnseekableIO(),
457 self.MockUnseekableIO())
458
459 def text_reader():
460 class UnseekableReader(self.MockUnseekableIO):
461 writable = self.BufferedIOBase.writable
462 write = self.BufferedIOBase.write
463 return self.TextIOWrapper(UnseekableReader(), "ascii")
464
465 def text_writer():
466 class UnseekableWriter(self.MockUnseekableIO):
467 readable = self.BufferedIOBase.readable
468 read = self.BufferedIOBase.read
469 return self.TextIOWrapper(UnseekableWriter(), "ascii")
470
471 tests = (
472 (pipe_reader, "fr"), (pipe_writer, "fw"),
473 (buffered_reader, "r"), (buffered_writer, "w"),
474 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
475 (text_reader, "r"), (text_writer, "w"),
476 (self.BytesIO, "rws"), (self.StringIO, "rws"),
477 )
478 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000479 with self.subTest(test), test() as obj:
480 readable = "r" in abilities
481 self.assertEqual(obj.readable(), readable)
482 writable = "w" in abilities
483 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000484
485 if isinstance(obj, self.TextIOBase):
486 data = "3"
487 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
488 data = b"3"
489 else:
490 self.fail("Unknown base class")
491
492 if "f" in abilities:
493 obj.fileno()
494 else:
495 self.assertRaises(OSError, obj.fileno)
496
497 if readable:
498 obj.read(1)
499 obj.read()
500 else:
501 self.assertRaises(OSError, obj.read, 1)
502 self.assertRaises(OSError, obj.read)
503
504 if writable:
505 obj.write(data)
506 else:
507 self.assertRaises(OSError, obj.write, data)
508
Martin Panter3ee147f2016-03-31 21:05:31 +0000509 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000510 pipe_reader, pipe_writer):
511 # Pipes seem to appear as seekable on Windows
512 continue
513 seekable = "s" in abilities
514 self.assertEqual(obj.seekable(), seekable)
515
Martin Panter754aab22016-03-31 07:21:56 +0000516 if seekable:
517 obj.tell()
518 obj.seek(0)
519 else:
520 self.assertRaises(OSError, obj.tell)
521 self.assertRaises(OSError, obj.seek, 0)
522
523 if writable and seekable:
524 obj.truncate()
525 obj.truncate(0)
526 else:
527 self.assertRaises(OSError, obj.truncate)
528 self.assertRaises(OSError, obj.truncate, 0)
529
Antoine Pitrou13348842012-01-29 18:36:34 +0100530 def test_open_handles_NUL_chars(self):
531 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300532 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100533
534 bytes_fn = bytes(fn_with_NUL, 'ascii')
535 with warnings.catch_warnings():
536 warnings.simplefilter("ignore", DeprecationWarning)
537 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100538
Guido van Rossum28524c72007-02-27 05:47:44 +0000539 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000540 with self.open(support.TESTFN, "wb", buffering=0) as f:
541 self.assertEqual(f.readable(), False)
542 self.assertEqual(f.writable(), True)
543 self.assertEqual(f.seekable(), True)
544 self.write_ops(f)
545 with self.open(support.TESTFN, "rb", buffering=0) as f:
546 self.assertEqual(f.readable(), True)
547 self.assertEqual(f.writable(), False)
548 self.assertEqual(f.seekable(), True)
549 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000550
Guido van Rossum87429772007-04-10 21:06:59 +0000551 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000552 with self.open(support.TESTFN, "wb") as f:
553 self.assertEqual(f.readable(), False)
554 self.assertEqual(f.writable(), True)
555 self.assertEqual(f.seekable(), True)
556 self.write_ops(f)
557 with self.open(support.TESTFN, "rb") as f:
558 self.assertEqual(f.readable(), True)
559 self.assertEqual(f.writable(), False)
560 self.assertEqual(f.seekable(), True)
561 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000562
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000563 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000564 with self.open(support.TESTFN, "wb") as f:
565 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
566 with self.open(support.TESTFN, "rb") as f:
567 self.assertEqual(f.readline(), b"abc\n")
568 self.assertEqual(f.readline(10), b"def\n")
569 self.assertEqual(f.readline(2), b"xy")
570 self.assertEqual(f.readline(4), b"zzy\n")
571 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000572 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000573 self.assertRaises(TypeError, f.readline, 5.3)
574 with self.open(support.TESTFN, "r") as f:
575 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000576
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300577 def test_readline_nonsizeable(self):
578 # Issue #30061
579 # Crash when readline() returns an object without __len__
580 class R(self.IOBase):
581 def readline(self):
582 return None
583 self.assertRaises((TypeError, StopIteration), next, R())
584
585 def test_next_nonsizeable(self):
586 # Issue #30061
587 # Crash when __next__() returns an object without __len__
588 class R(self.IOBase):
589 def __next__(self):
590 return None
591 self.assertRaises(TypeError, R().readlines, 1)
592
Guido van Rossum28524c72007-02-27 05:47:44 +0000593 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000594 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000595 self.write_ops(f)
596 data = f.getvalue()
597 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000599 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000600
Guido van Rossum53807da2007-04-10 19:01:47 +0000601 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300602 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800603 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000604 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200605 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600606 support.requires(
607 'largefile',
608 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 with self.open(support.TESTFN, "w+b", 0) as f:
610 self.large_file_ops(f)
611 with self.open(support.TESTFN, "w+b") as f:
612 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000613
614 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300615 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000616 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000617 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000618 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000619 self.assertEqual(f.closed, True)
620 f = None
621 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000622 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000623 1/0
624 except ZeroDivisionError:
625 self.assertEqual(f.closed, True)
626 else:
627 self.fail("1/0 didn't raise an exception")
628
Antoine Pitrou08838b62009-01-21 00:55:13 +0000629 # issue 5008
630 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000632 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000634 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000636 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300638 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000639
Guido van Rossum87429772007-04-10 21:06:59 +0000640 def test_destructor(self):
641 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000642 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000643 def __del__(self):
644 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000645 try:
646 f = super().__del__
647 except AttributeError:
648 pass
649 else:
650 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000651 def close(self):
652 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000653 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000654 def flush(self):
655 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000657 with support.check_warnings(('', ResourceWarning)):
658 f = MyFileIO(support.TESTFN, "wb")
659 f.write(b"xxx")
660 del f
661 support.gc_collect()
662 self.assertEqual(record, [1, 2, 3])
663 with self.open(support.TESTFN, "rb") as f:
664 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665
666 def _check_base_destructor(self, base):
667 record = []
668 class MyIO(base):
669 def __init__(self):
670 # This exercises the availability of attributes on object
671 # destruction.
672 # (in the C version, close() is called by the tp_dealloc
673 # function, not by __del__)
674 self.on_del = 1
675 self.on_close = 2
676 self.on_flush = 3
677 def __del__(self):
678 record.append(self.on_del)
679 try:
680 f = super().__del__
681 except AttributeError:
682 pass
683 else:
684 f()
685 def close(self):
686 record.append(self.on_close)
687 super().close()
688 def flush(self):
689 record.append(self.on_flush)
690 super().flush()
691 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000692 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000693 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000694 self.assertEqual(record, [1, 2, 3])
695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 def test_IOBase_destructor(self):
697 self._check_base_destructor(self.IOBase)
698
699 def test_RawIOBase_destructor(self):
700 self._check_base_destructor(self.RawIOBase)
701
702 def test_BufferedIOBase_destructor(self):
703 self._check_base_destructor(self.BufferedIOBase)
704
705 def test_TextIOBase_destructor(self):
706 self._check_base_destructor(self.TextIOBase)
707
Guido van Rossum87429772007-04-10 21:06:59 +0000708 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000709 with self.open(support.TESTFN, "wb") as f:
710 f.write(b"xxx")
711 with self.open(support.TESTFN, "rb") as f:
712 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000713
Guido van Rossumd4103952007-04-12 05:44:49 +0000714 def test_array_writes(self):
715 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000716 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000717 def check(f):
718 with f:
719 self.assertEqual(f.write(a), n)
720 f.writelines((a,))
721 check(self.BytesIO())
722 check(self.FileIO(support.TESTFN, "w"))
723 check(self.BufferedWriter(self.MockRawIO()))
724 check(self.BufferedRandom(self.MockRawIO()))
725 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000726
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000727 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000728 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000729 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 def test_read_closed(self):
732 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000733 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 with self.open(support.TESTFN, "r") as f:
735 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000736 self.assertEqual(file.read(), "egg\n")
737 file.seek(0)
738 file.close()
739 self.assertRaises(ValueError, file.read)
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100740 with self.open(support.TESTFN, "rb") as f:
741 file = self.open(f.fileno(), "rb", closefd=False)
742 self.assertEqual(file.read()[:3], b"egg")
743 file.close()
744 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000745
746 def test_no_closefd_with_filename(self):
747 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000749
750 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000752 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000754 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000756 self.assertEqual(file.buffer.raw.closefd, False)
757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 def test_garbage_collection(self):
759 # FileIO objects are collected, and collecting them flushes
760 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000761 with support.check_warnings(('', ResourceWarning)):
762 f = self.FileIO(support.TESTFN, "wb")
763 f.write(b"abcxxx")
764 f.f = f
765 wr = weakref.ref(f)
766 del f
767 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300768 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000769 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000770 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000771
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000772 def test_unbounded_file(self):
773 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
774 zero = "/dev/zero"
775 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000776 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000777 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000778 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000779 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800780 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000781 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000782 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000783 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000784 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000785 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000786 self.assertRaises(OverflowError, f.read)
787
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200788 def check_flush_error_on_close(self, *args, **kwargs):
789 # Test that the file is closed despite failed flush
790 # and that flush() is called before file closed.
791 f = self.open(*args, **kwargs)
792 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000793 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200794 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200795 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000796 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200797 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600798 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200799 self.assertTrue(closed) # flush() called
800 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200801 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200802
803 def test_flush_error_on_close(self):
804 # raw file
805 # Issue #5700: io.FileIO calls flush() after file closed
806 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808 self.check_flush_error_on_close(fd, 'wb', buffering=0)
809 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
810 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
811 os.close(fd)
812 # buffered io
813 self.check_flush_error_on_close(support.TESTFN, 'wb')
814 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
815 self.check_flush_error_on_close(fd, 'wb')
816 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
817 self.check_flush_error_on_close(fd, 'wb', closefd=False)
818 os.close(fd)
819 # text io
820 self.check_flush_error_on_close(support.TESTFN, 'w')
821 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
822 self.check_flush_error_on_close(fd, 'w')
823 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
824 self.check_flush_error_on_close(fd, 'w', closefd=False)
825 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000826
827 def test_multi_close(self):
828 f = self.open(support.TESTFN, "wb", buffering=0)
829 f.close()
830 f.close()
831 f.close()
832 self.assertRaises(ValueError, f.flush)
833
Antoine Pitrou328ec742010-09-14 18:37:24 +0000834 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530835 # Exercise the default limited RawIOBase.read(n) implementation (which
836 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000837 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
838 self.assertEqual(rawio.read(2), b"ab")
839 self.assertEqual(rawio.read(2), b"c")
840 self.assertEqual(rawio.read(2), b"d")
841 self.assertEqual(rawio.read(2), None)
842 self.assertEqual(rawio.read(2), b"ef")
843 self.assertEqual(rawio.read(2), b"g")
844 self.assertEqual(rawio.read(2), None)
845 self.assertEqual(rawio.read(2), b"")
846
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400847 def test_types_have_dict(self):
848 test = (
849 self.IOBase(),
850 self.RawIOBase(),
851 self.TextIOBase(),
852 self.StringIO(),
853 self.BytesIO()
854 )
855 for obj in test:
856 self.assertTrue(hasattr(obj, "__dict__"))
857
Ross Lagerwall59142db2011-10-31 20:34:46 +0200858 def test_opener(self):
859 with self.open(support.TESTFN, "w") as f:
860 f.write("egg\n")
861 fd = os.open(support.TESTFN, os.O_RDONLY)
862 def opener(path, flags):
863 return fd
864 with self.open("non-existent", "r", opener=opener) as f:
865 self.assertEqual(f.read(), "egg\n")
866
Barry Warsaw480e2852016-06-08 17:47:26 -0400867 def test_bad_opener_negative_1(self):
868 # Issue #27066.
869 def badopener(fname, flags):
870 return -1
871 with self.assertRaises(ValueError) as cm:
872 open('non-existent', 'r', opener=badopener)
873 self.assertEqual(str(cm.exception), 'opener returned -1')
874
875 def test_bad_opener_other_negative(self):
876 # Issue #27066.
877 def badopener(fname, flags):
878 return -2
879 with self.assertRaises(ValueError) as cm:
880 open('non-existent', 'r', opener=badopener)
881 self.assertEqual(str(cm.exception), 'opener returned -2')
882
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200883 def test_fileio_closefd(self):
884 # Issue #4841
885 with self.open(__file__, 'rb') as f1, \
886 self.open(__file__, 'rb') as f2:
887 fileio = self.FileIO(f1.fileno(), closefd=False)
888 # .__init__() must not close f1
889 fileio.__init__(f2.fileno(), closefd=False)
890 f1.readline()
891 # .close() must not close f2
892 fileio.close()
893 f2.readline()
894
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300895 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200896 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300897 with self.assertRaises(ValueError):
898 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300899
900 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200901 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300902 with self.assertRaises(ValueError):
903 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300904
Martin Panter6bb91f32016-05-28 00:41:57 +0000905 def test_buffered_readinto_mixin(self):
906 # Test the implementation provided by BufferedIOBase
907 class Stream(self.BufferedIOBase):
908 def read(self, size):
909 return b"12345"
910 read1 = read
911 stream = Stream()
912 for method in ("readinto", "readinto1"):
913 with self.subTest(method):
914 buffer = byteslike(5)
915 self.assertEqual(getattr(stream, method)(buffer), 5)
916 self.assertEqual(bytes(buffer), b"12345")
917
Ethan Furmand62548a2016-06-04 14:38:43 -0700918 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700919 def check_path_succeeds(path):
920 with self.open(path, "w") as f:
921 f.write("egg\n")
922
923 with self.open(path, "r") as f:
924 self.assertEqual(f.read(), "egg\n")
925
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200926 check_path_succeeds(FakePath(support.TESTFN))
927 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700928
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200929 with self.open(support.TESTFN, "w") as f:
930 bad_path = FakePath(f.fileno())
931 with self.assertRaises(TypeError):
932 self.open(bad_path, 'w')
933
934 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700935 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700936 self.open(bad_path, 'w')
937
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200938 bad_path = FakePath(FloatingPointError)
939 with self.assertRaises(FloatingPointError):
940 self.open(bad_path, 'w')
941
Ethan Furmand62548a2016-06-04 14:38:43 -0700942 # ensure that refcounting is correct with some error conditions
943 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200944 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700945
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530946 def test_RawIOBase_readall(self):
947 # Exercise the default unlimited RawIOBase.read() and readall()
948 # implementations.
949 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
950 self.assertEqual(rawio.read(), b"abcdefg")
951 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
952 self.assertEqual(rawio.readall(), b"abcdefg")
953
954 def test_BufferedIOBase_readinto(self):
955 # Exercise the default BufferedIOBase.readinto() and readinto1()
956 # implementations (which call read() or read1() internally).
957 class Reader(self.BufferedIOBase):
958 def __init__(self, avail):
959 self.avail = avail
960 def read(self, size):
961 result = self.avail[:size]
962 self.avail = self.avail[size:]
963 return result
964 def read1(self, size):
965 """Returns no more than 5 bytes at once"""
966 return self.read(min(size, 5))
967 tests = (
968 # (test method, total data available, read buffer size, expected
969 # read size)
970 ("readinto", 10, 5, 5),
971 ("readinto", 10, 6, 6), # More than read1() can return
972 ("readinto", 5, 6, 5), # Buffer larger than total available
973 ("readinto", 6, 7, 6),
974 ("readinto", 10, 0, 0), # Empty buffer
975 ("readinto1", 10, 5, 5), # Result limited to single read1() call
976 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
977 ("readinto1", 5, 6, 5), # Buffer larger than total available
978 ("readinto1", 6, 7, 5),
979 ("readinto1", 10, 0, 0), # Empty buffer
980 )
981 UNUSED_BYTE = 0x81
982 for test in tests:
983 with self.subTest(test):
984 method, avail, request, result = test
985 reader = Reader(bytes(range(avail)))
986 buffer = bytearray((UNUSED_BYTE,) * request)
987 method = getattr(reader, method)
988 self.assertEqual(method(buffer), result)
989 self.assertEqual(len(buffer), request)
990 self.assertSequenceEqual(buffer[:result], range(result))
991 unused = (UNUSED_BYTE,) * (request - result)
992 self.assertSequenceEqual(buffer[result:], unused)
993 self.assertEqual(len(reader.avail), avail - result)
994
Zackery Spytz28f07362018-07-17 00:31:44 -0600995 def test_close_assert(self):
996 class R(self.IOBase):
997 def __setattr__(self, name, value):
998 pass
999 def flush(self):
1000 raise OSError()
1001 f = R()
1002 # This would cause an assertion failure.
1003 self.assertRaises(OSError, f.close)
1004
Victor Stinner472f7942019-04-12 21:58:24 +02001005 # Silence destructor error
1006 R.flush = lambda self: None
1007
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001008
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001009class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001010
1011 def test_IOBase_finalize(self):
1012 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1013 # class which inherits IOBase and an object of this class are caught
1014 # in a reference cycle and close() is already in the method cache.
1015 class MyIO(self.IOBase):
1016 def close(self):
1017 pass
1018
1019 # create an instance to populate the method cache
1020 MyIO()
1021 obj = MyIO()
1022 obj.obj = obj
1023 wr = weakref.ref(obj)
1024 del MyIO
1025 del obj
1026 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001027 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001028
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001029class PyIOTest(IOTest):
1030 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001031
Guido van Rossuma9e20242007-03-08 00:43:48 +00001032
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001033@support.cpython_only
1034class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001035
Gregory P. Smith054b0652015-04-14 12:58:05 -07001036 def test_RawIOBase_io_in_pyio_match(self):
1037 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001038 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1039 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001040 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1041
1042 def test_RawIOBase_pyio_in_io_match(self):
1043 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1044 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1045 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1046
1047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048class CommonBufferedTests:
1049 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1050
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001051 def test_detach(self):
1052 raw = self.MockRawIO()
1053 buf = self.tp(raw)
1054 self.assertIs(buf.detach(), raw)
1055 self.assertRaises(ValueError, buf.detach)
1056
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001057 repr(buf) # Should still work
1058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001059 def test_fileno(self):
1060 rawio = self.MockRawIO()
1061 bufio = self.tp(rawio)
1062
Ezio Melottib3aedd42010-11-20 19:04:17 +00001063 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 def test_invalid_args(self):
1066 rawio = self.MockRawIO()
1067 bufio = self.tp(rawio)
1068 # Invalid whence
1069 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001070 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071
1072 def test_override_destructor(self):
1073 tp = self.tp
1074 record = []
1075 class MyBufferedIO(tp):
1076 def __del__(self):
1077 record.append(1)
1078 try:
1079 f = super().__del__
1080 except AttributeError:
1081 pass
1082 else:
1083 f()
1084 def close(self):
1085 record.append(2)
1086 super().close()
1087 def flush(self):
1088 record.append(3)
1089 super().flush()
1090 rawio = self.MockRawIO()
1091 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001093 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001094 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095
1096 def test_context_manager(self):
1097 # Test usability as a context manager
1098 rawio = self.MockRawIO()
1099 bufio = self.tp(rawio)
1100 def _with():
1101 with bufio:
1102 pass
1103 _with()
1104 # bufio should now be closed, and using it a second time should raise
1105 # a ValueError.
1106 self.assertRaises(ValueError, _with)
1107
1108 def test_error_through_destructor(self):
1109 # Test that the exception state is not modified by a destructor,
1110 # even if close() fails.
1111 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001112 with support.catch_unraisable_exception() as cm:
1113 with self.assertRaises(AttributeError):
1114 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001115
1116 if not IOBASE_EMITS_UNRAISABLE:
1117 self.assertIsNone(cm.unraisable)
1118 elif cm.unraisable is not None:
1119 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001120
Antoine Pitrou716c4442009-05-23 19:04:03 +00001121 def test_repr(self):
1122 raw = self.MockRawIO()
1123 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001124 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1125 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001126 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001127 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001128 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001129 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001130
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001131 def test_recursive_repr(self):
1132 # Issue #25455
1133 raw = self.MockRawIO()
1134 b = self.tp(raw)
1135 with support.swap_attr(raw, 'name', b):
1136 try:
1137 repr(b) # Should not crash
1138 except RuntimeError:
1139 pass
1140
Antoine Pitrou6be88762010-05-03 16:48:20 +00001141 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001142 # Test that buffered file is closed despite failed flush
1143 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001144 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001145 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001146 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001147 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001148 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001149 raw.flush = bad_flush
1150 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001151 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001152 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001153 self.assertTrue(raw.closed)
1154 self.assertTrue(closed) # flush() called
1155 self.assertFalse(closed[0]) # flush() called before file closed
1156 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001157 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001158
1159 def test_close_error_on_close(self):
1160 raw = self.MockRawIO()
1161 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001162 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001163 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001164 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001165 raw.close = bad_close
1166 b = self.tp(raw)
1167 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001168 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001169 b.close()
1170 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001171 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001172 self.assertEqual(err.exception.__context__.args, ('flush',))
1173 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001174
Victor Stinner472f7942019-04-12 21:58:24 +02001175 # Silence destructor error
1176 raw.close = lambda: None
1177 b.flush = lambda: None
1178
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001179 def test_nonnormalized_close_error_on_close(self):
1180 # Issue #21677
1181 raw = self.MockRawIO()
1182 def bad_flush():
1183 raise non_existing_flush
1184 def bad_close():
1185 raise non_existing_close
1186 raw.close = bad_close
1187 b = self.tp(raw)
1188 b.flush = bad_flush
1189 with self.assertRaises(NameError) as err: # exception not swallowed
1190 b.close()
1191 self.assertIn('non_existing_close', str(err.exception))
1192 self.assertIsInstance(err.exception.__context__, NameError)
1193 self.assertIn('non_existing_flush', str(err.exception.__context__))
1194 self.assertFalse(b.closed)
1195
Victor Stinner472f7942019-04-12 21:58:24 +02001196 # Silence destructor error
1197 b.flush = lambda: None
1198 raw.close = lambda: None
1199
Antoine Pitrou6be88762010-05-03 16:48:20 +00001200 def test_multi_close(self):
1201 raw = self.MockRawIO()
1202 b = self.tp(raw)
1203 b.close()
1204 b.close()
1205 b.close()
1206 self.assertRaises(ValueError, b.flush)
1207
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001208 def test_unseekable(self):
1209 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1210 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1211 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1212
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001213 def test_readonly_attributes(self):
1214 raw = self.MockRawIO()
1215 buf = self.tp(raw)
1216 x = self.MockRawIO()
1217 with self.assertRaises(AttributeError):
1218 buf.raw = x
1219
Guido van Rossum78892e42007-04-06 17:31:18 +00001220
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001221class SizeofTest:
1222
1223 @support.cpython_only
1224 def test_sizeof(self):
1225 bufsize1 = 4096
1226 bufsize2 = 8192
1227 rawio = self.MockRawIO()
1228 bufio = self.tp(rawio, buffer_size=bufsize1)
1229 size = sys.getsizeof(bufio) - bufsize1
1230 rawio = self.MockRawIO()
1231 bufio = self.tp(rawio, buffer_size=bufsize2)
1232 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1233
Jesus Ceadc469452012-10-04 12:37:56 +02001234 @support.cpython_only
1235 def test_buffer_freeing(self) :
1236 bufsize = 4096
1237 rawio = self.MockRawIO()
1238 bufio = self.tp(rawio, buffer_size=bufsize)
1239 size = sys.getsizeof(bufio) - bufsize
1240 bufio.close()
1241 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001242
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1244 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001245
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001246 def test_constructor(self):
1247 rawio = self.MockRawIO([b"abc"])
1248 bufio = self.tp(rawio)
1249 bufio.__init__(rawio)
1250 bufio.__init__(rawio, buffer_size=1024)
1251 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001252 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001253 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1254 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1255 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1256 rawio = self.MockRawIO([b"abc"])
1257 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001258 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001259
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001260 def test_uninitialized(self):
1261 bufio = self.tp.__new__(self.tp)
1262 del bufio
1263 bufio = self.tp.__new__(self.tp)
1264 self.assertRaisesRegex((ValueError, AttributeError),
1265 'uninitialized|has no attribute',
1266 bufio.read, 0)
1267 bufio.__init__(self.MockRawIO())
1268 self.assertEqual(bufio.read(0), b'')
1269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001270 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001271 for arg in (None, 7):
1272 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1273 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001274 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001275 # Invalid args
1276 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001277
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001278 def test_read1(self):
1279 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1280 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001281 self.assertEqual(b"a", bufio.read(1))
1282 self.assertEqual(b"b", bufio.read1(1))
1283 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001284 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001285 self.assertEqual(b"c", bufio.read1(100))
1286 self.assertEqual(rawio._reads, 1)
1287 self.assertEqual(b"d", bufio.read1(100))
1288 self.assertEqual(rawio._reads, 2)
1289 self.assertEqual(b"efg", bufio.read1(100))
1290 self.assertEqual(rawio._reads, 3)
1291 self.assertEqual(b"", bufio.read1(100))
1292 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001293
1294 def test_read1_arbitrary(self):
1295 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1296 bufio = self.tp(rawio)
1297 self.assertEqual(b"a", bufio.read(1))
1298 self.assertEqual(b"bc", bufio.read1())
1299 self.assertEqual(b"d", bufio.read1())
1300 self.assertEqual(b"efg", bufio.read1(-1))
1301 self.assertEqual(rawio._reads, 3)
1302 self.assertEqual(b"", bufio.read1())
1303 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001304
1305 def test_readinto(self):
1306 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1307 bufio = self.tp(rawio)
1308 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001309 self.assertEqual(bufio.readinto(b), 2)
1310 self.assertEqual(b, b"ab")
1311 self.assertEqual(bufio.readinto(b), 2)
1312 self.assertEqual(b, b"cd")
1313 self.assertEqual(bufio.readinto(b), 2)
1314 self.assertEqual(b, b"ef")
1315 self.assertEqual(bufio.readinto(b), 1)
1316 self.assertEqual(b, b"gf")
1317 self.assertEqual(bufio.readinto(b), 0)
1318 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001319 rawio = self.MockRawIO((b"abc", None))
1320 bufio = self.tp(rawio)
1321 self.assertEqual(bufio.readinto(b), 2)
1322 self.assertEqual(b, b"ab")
1323 self.assertEqual(bufio.readinto(b), 1)
1324 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001325
Benjamin Petersona96fea02014-06-22 14:17:44 -07001326 def test_readinto1(self):
1327 buffer_size = 10
1328 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1329 bufio = self.tp(rawio, buffer_size=buffer_size)
1330 b = bytearray(2)
1331 self.assertEqual(bufio.peek(3), b'abc')
1332 self.assertEqual(rawio._reads, 1)
1333 self.assertEqual(bufio.readinto1(b), 2)
1334 self.assertEqual(b, b"ab")
1335 self.assertEqual(rawio._reads, 1)
1336 self.assertEqual(bufio.readinto1(b), 1)
1337 self.assertEqual(b[:1], b"c")
1338 self.assertEqual(rawio._reads, 1)
1339 self.assertEqual(bufio.readinto1(b), 2)
1340 self.assertEqual(b, b"de")
1341 self.assertEqual(rawio._reads, 2)
1342 b = bytearray(2*buffer_size)
1343 self.assertEqual(bufio.peek(3), b'fgh')
1344 self.assertEqual(rawio._reads, 3)
1345 self.assertEqual(bufio.readinto1(b), 6)
1346 self.assertEqual(b[:6], b"fghjkl")
1347 self.assertEqual(rawio._reads, 4)
1348
1349 def test_readinto_array(self):
1350 buffer_size = 60
1351 data = b"a" * 26
1352 rawio = self.MockRawIO((data,))
1353 bufio = self.tp(rawio, buffer_size=buffer_size)
1354
1355 # Create an array with element size > 1 byte
1356 b = array.array('i', b'x' * 32)
1357 assert len(b) != 16
1358
1359 # Read into it. We should get as many *bytes* as we can fit into b
1360 # (which is more than the number of elements)
1361 n = bufio.readinto(b)
1362 self.assertGreater(n, len(b))
1363
1364 # Check that old contents of b are preserved
1365 bm = memoryview(b).cast('B')
1366 self.assertLess(n, len(bm))
1367 self.assertEqual(bm[:n], data[:n])
1368 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1369
1370 def test_readinto1_array(self):
1371 buffer_size = 60
1372 data = b"a" * 26
1373 rawio = self.MockRawIO((data,))
1374 bufio = self.tp(rawio, buffer_size=buffer_size)
1375
1376 # Create an array with element size > 1 byte
1377 b = array.array('i', b'x' * 32)
1378 assert len(b) != 16
1379
1380 # Read into it. We should get as many *bytes* as we can fit into b
1381 # (which is more than the number of elements)
1382 n = bufio.readinto1(b)
1383 self.assertGreater(n, len(b))
1384
1385 # Check that old contents of b are preserved
1386 bm = memoryview(b).cast('B')
1387 self.assertLess(n, len(bm))
1388 self.assertEqual(bm[:n], data[:n])
1389 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1390
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001391 def test_readlines(self):
1392 def bufio():
1393 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1394 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001395 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1396 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1397 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001398
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001399 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001400 data = b"abcdefghi"
1401 dlen = len(data)
1402
1403 tests = [
1404 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1405 [ 100, [ 3, 3, 3], [ dlen ] ],
1406 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1407 ]
1408
1409 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001410 rawio = self.MockFileIO(data)
1411 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001412 pos = 0
1413 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001414 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001415 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001416 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001417 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001418
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001420 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001421 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1422 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001423 self.assertEqual(b"abcd", bufio.read(6))
1424 self.assertEqual(b"e", bufio.read(1))
1425 self.assertEqual(b"fg", bufio.read())
1426 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001427 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001428 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001429
Victor Stinnera80987f2011-05-25 22:47:16 +02001430 rawio = self.MockRawIO((b"a", None, None))
1431 self.assertEqual(b"a", rawio.readall())
1432 self.assertIsNone(rawio.readall())
1433
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001434 def test_read_past_eof(self):
1435 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1436 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001437
Ezio Melottib3aedd42010-11-20 19:04:17 +00001438 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001439
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 def test_read_all(self):
1441 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1442 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001443
Ezio Melottib3aedd42010-11-20 19:04:17 +00001444 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001445
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001446 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001447 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001448 try:
1449 # Write out many bytes with exactly the same number of 0's,
1450 # 1's... 255's. This will help us check that concurrent reading
1451 # doesn't duplicate or forget contents.
1452 N = 1000
1453 l = list(range(256)) * N
1454 random.shuffle(l)
1455 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001456 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001457 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001458 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001459 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001460 errors = []
1461 results = []
1462 def f():
1463 try:
1464 # Intra-buffer read then buffer-flushing read
1465 for n in cycle([1, 19]):
1466 s = bufio.read(n)
1467 if not s:
1468 break
1469 # list.append() is atomic
1470 results.append(s)
1471 except Exception as e:
1472 errors.append(e)
1473 raise
1474 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001475 with support.start_threads(threads):
1476 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001477 self.assertFalse(errors,
1478 "the following exceptions were caught: %r" % errors)
1479 s = b''.join(results)
1480 for i in range(256):
1481 c = bytes(bytearray([i]))
1482 self.assertEqual(s.count(c), N)
1483 finally:
1484 support.unlink(support.TESTFN)
1485
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001486 def test_unseekable(self):
1487 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1488 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1489 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1490 bufio.read(1)
1491 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1492 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494 def test_misbehaved_io(self):
1495 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1496 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001497 self.assertRaises(OSError, bufio.seek, 0)
1498 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001499
Victor Stinnerb589cef2019-06-11 03:10:59 +02001500 # Silence destructor error
1501 bufio.close = lambda: None
1502
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001503 def test_no_extraneous_read(self):
1504 # Issue #9550; when the raw IO object has satisfied the read request,
1505 # we should not issue any additional reads, otherwise it may block
1506 # (e.g. socket).
1507 bufsize = 16
1508 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1509 rawio = self.MockRawIO([b"x" * n])
1510 bufio = self.tp(rawio, bufsize)
1511 self.assertEqual(bufio.read(n), b"x" * n)
1512 # Simple case: one raw read is enough to satisfy the request.
1513 self.assertEqual(rawio._extraneous_reads, 0,
1514 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1515 # A more complex case where two raw reads are needed to satisfy
1516 # the request.
1517 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1518 bufio = self.tp(rawio, bufsize)
1519 self.assertEqual(bufio.read(n), b"x" * n)
1520 self.assertEqual(rawio._extraneous_reads, 0,
1521 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1522
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001523 def test_read_on_closed(self):
1524 # Issue #23796
1525 b = io.BufferedReader(io.BytesIO(b"12"))
1526 b.read(1)
1527 b.close()
1528 self.assertRaises(ValueError, b.peek)
1529 self.assertRaises(ValueError, b.read1, 1)
1530
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001531
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001532class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001533 tp = io.BufferedReader
1534
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001535 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1536 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001537 def test_constructor(self):
1538 BufferedReaderTest.test_constructor(self)
1539 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001540 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001541 if sys.maxsize > 0x7FFFFFFF:
1542 rawio = self.MockRawIO()
1543 bufio = self.tp(rawio)
1544 self.assertRaises((OverflowError, MemoryError, ValueError),
1545 bufio.__init__, rawio, sys.maxsize)
1546
1547 def test_initialization(self):
1548 rawio = self.MockRawIO([b"abc"])
1549 bufio = self.tp(rawio)
1550 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1551 self.assertRaises(ValueError, bufio.read)
1552 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1553 self.assertRaises(ValueError, bufio.read)
1554 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1555 self.assertRaises(ValueError, bufio.read)
1556
1557 def test_misbehaved_io_read(self):
1558 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1559 bufio = self.tp(rawio)
1560 # _pyio.BufferedReader seems to implement reading different, so that
1561 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001562 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001563
1564 def test_garbage_collection(self):
1565 # C BufferedReader objects are collected.
1566 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001567 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001568 with support.check_warnings(('', ResourceWarning)):
1569 rawio = self.FileIO(support.TESTFN, "w+b")
1570 f = self.tp(rawio)
1571 f.f = f
1572 wr = weakref.ref(f)
1573 del f
1574 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001575 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576
R David Murray67bfe802013-02-23 21:51:05 -05001577 def test_args_error(self):
1578 # Issue #17275
1579 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1580 self.tp(io.BytesIO(), 1024, 1024, 1024)
1581
1582
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001583class PyBufferedReaderTest(BufferedReaderTest):
1584 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001585
Guido van Rossuma9e20242007-03-08 00:43:48 +00001586
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001587class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1588 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001589
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001590 def test_constructor(self):
1591 rawio = self.MockRawIO()
1592 bufio = self.tp(rawio)
1593 bufio.__init__(rawio)
1594 bufio.__init__(rawio, buffer_size=1024)
1595 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001596 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001597 bufio.flush()
1598 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1599 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1600 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1601 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001602 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001603 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001604 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001605
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001606 def test_uninitialized(self):
1607 bufio = self.tp.__new__(self.tp)
1608 del bufio
1609 bufio = self.tp.__new__(self.tp)
1610 self.assertRaisesRegex((ValueError, AttributeError),
1611 'uninitialized|has no attribute',
1612 bufio.write, b'')
1613 bufio.__init__(self.MockRawIO())
1614 self.assertEqual(bufio.write(b''), 0)
1615
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001616 def test_detach_flush(self):
1617 raw = self.MockRawIO()
1618 buf = self.tp(raw)
1619 buf.write(b"howdy!")
1620 self.assertFalse(raw._write_stack)
1621 buf.detach()
1622 self.assertEqual(raw._write_stack, [b"howdy!"])
1623
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001625 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001626 writer = self.MockRawIO()
1627 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001628 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001629 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001630 buffer = bytearray(b"def")
1631 bufio.write(buffer)
1632 buffer[:] = b"***" # Overwrite our copy of the data
1633 bufio.flush()
1634 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001635
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001636 def test_write_overflow(self):
1637 writer = self.MockRawIO()
1638 bufio = self.tp(writer, 8)
1639 contents = b"abcdefghijklmnop"
1640 for n in range(0, len(contents), 3):
1641 bufio.write(contents[n:n+3])
1642 flushed = b"".join(writer._write_stack)
1643 # At least (total - 8) bytes were implicitly flushed, perhaps more
1644 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001645 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001646
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001647 def check_writes(self, intermediate_func):
1648 # Lots of writes, test the flushed output is as expected.
1649 contents = bytes(range(256)) * 1000
1650 n = 0
1651 writer = self.MockRawIO()
1652 bufio = self.tp(writer, 13)
1653 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1654 def gen_sizes():
1655 for size in count(1):
1656 for i in range(15):
1657 yield size
1658 sizes = gen_sizes()
1659 while n < len(contents):
1660 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001661 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 intermediate_func(bufio)
1663 n += size
1664 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001665 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001666
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001667 def test_writes(self):
1668 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001669
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001670 def test_writes_and_flushes(self):
1671 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001672
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001673 def test_writes_and_seeks(self):
1674 def _seekabs(bufio):
1675 pos = bufio.tell()
1676 bufio.seek(pos + 1, 0)
1677 bufio.seek(pos - 1, 0)
1678 bufio.seek(pos, 0)
1679 self.check_writes(_seekabs)
1680 def _seekrel(bufio):
1681 pos = bufio.seek(0, 1)
1682 bufio.seek(+1, 1)
1683 bufio.seek(-1, 1)
1684 bufio.seek(pos, 0)
1685 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001686
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001687 def test_writes_and_truncates(self):
1688 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001689
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001690 def test_write_non_blocking(self):
1691 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001692 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001693
Ezio Melottib3aedd42010-11-20 19:04:17 +00001694 self.assertEqual(bufio.write(b"abcd"), 4)
1695 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001696 # 1 byte will be written, the rest will be buffered
1697 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001698 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001700 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1701 raw.block_on(b"0")
1702 try:
1703 bufio.write(b"opqrwxyz0123456789")
1704 except self.BlockingIOError as e:
1705 written = e.characters_written
1706 else:
1707 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001708 self.assertEqual(written, 16)
1709 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001710 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001711
Ezio Melottib3aedd42010-11-20 19:04:17 +00001712 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001713 s = raw.pop_written()
1714 # Previously buffered bytes were flushed
1715 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001716
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 def test_write_and_rewind(self):
1718 raw = io.BytesIO()
1719 bufio = self.tp(raw, 4)
1720 self.assertEqual(bufio.write(b"abcdef"), 6)
1721 self.assertEqual(bufio.tell(), 6)
1722 bufio.seek(0, 0)
1723 self.assertEqual(bufio.write(b"XY"), 2)
1724 bufio.seek(6, 0)
1725 self.assertEqual(raw.getvalue(), b"XYcdef")
1726 self.assertEqual(bufio.write(b"123456"), 6)
1727 bufio.flush()
1728 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001729
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001730 def test_flush(self):
1731 writer = self.MockRawIO()
1732 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001733 bufio.write(b"abc")
1734 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001735 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001736
Antoine Pitrou131a4892012-10-16 22:57:11 +02001737 def test_writelines(self):
1738 l = [b'ab', b'cd', b'ef']
1739 writer = self.MockRawIO()
1740 bufio = self.tp(writer, 8)
1741 bufio.writelines(l)
1742 bufio.flush()
1743 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1744
1745 def test_writelines_userlist(self):
1746 l = UserList([b'ab', b'cd', b'ef'])
1747 writer = self.MockRawIO()
1748 bufio = self.tp(writer, 8)
1749 bufio.writelines(l)
1750 bufio.flush()
1751 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1752
1753 def test_writelines_error(self):
1754 writer = self.MockRawIO()
1755 bufio = self.tp(writer, 8)
1756 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1757 self.assertRaises(TypeError, bufio.writelines, None)
1758 self.assertRaises(TypeError, bufio.writelines, 'abc')
1759
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760 def test_destructor(self):
1761 writer = self.MockRawIO()
1762 bufio = self.tp(writer, 8)
1763 bufio.write(b"abc")
1764 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001765 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001766 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767
1768 def test_truncate(self):
1769 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001770 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001771 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001772 bufio = self.tp(raw, 8)
1773 bufio.write(b"abcdef")
1774 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001775 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001776 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001777 self.assertEqual(f.read(), b"abc")
1778
Nitish Chandra059f58c2018-01-28 21:30:09 +05301779 def test_truncate_after_write(self):
1780 # Ensure that truncate preserves the file position after
1781 # writes longer than the buffer size.
1782 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001783 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301784 with self.open(support.TESTFN, "wb") as f:
1785 # Fill with some buffer
1786 f.write(b'\x00' * 10000)
1787 buffer_sizes = [8192, 4096, 200]
1788 for buffer_size in buffer_sizes:
1789 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1790 f.write(b'\x00' * (buffer_size + 1))
1791 # After write write_pos and write_end are set to 0
1792 f.read(1)
1793 # read operation makes sure that pos != raw_pos
1794 f.truncate()
1795 self.assertEqual(f.tell(), buffer_size + 2)
1796
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001797 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001798 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001799 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001800 # Write out many bytes from many threads and test they were
1801 # all flushed.
1802 N = 1000
1803 contents = bytes(range(256)) * N
1804 sizes = cycle([1, 19])
1805 n = 0
1806 queue = deque()
1807 while n < len(contents):
1808 size = next(sizes)
1809 queue.append(contents[n:n+size])
1810 n += size
1811 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001812 # We use a real file object because it allows us to
1813 # exercise situations where the GIL is released before
1814 # writing the buffer to the raw streams. This is in addition
1815 # to concurrency issues due to switching threads in the middle
1816 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001817 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001818 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001819 errors = []
1820 def f():
1821 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001822 while True:
1823 try:
1824 s = queue.popleft()
1825 except IndexError:
1826 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001827 bufio.write(s)
1828 except Exception as e:
1829 errors.append(e)
1830 raise
1831 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001832 with support.start_threads(threads):
1833 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001834 self.assertFalse(errors,
1835 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001836 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001837 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001838 s = f.read()
1839 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001840 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001841 finally:
1842 support.unlink(support.TESTFN)
1843
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001844 def test_misbehaved_io(self):
1845 rawio = self.MisbehavedRawIO()
1846 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001847 self.assertRaises(OSError, bufio.seek, 0)
1848 self.assertRaises(OSError, bufio.tell)
1849 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001850
Victor Stinnerb589cef2019-06-11 03:10:59 +02001851 # Silence destructor error
1852 bufio.close = lambda: None
1853
Florent Xicluna109d5732012-07-07 17:03:22 +02001854 def test_max_buffer_size_removal(self):
1855 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001856 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001857
Benjamin Peterson68623612012-12-20 11:53:11 -06001858 def test_write_error_on_close(self):
1859 raw = self.MockRawIO()
1860 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001861 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001862 raw.write = bad_write
1863 b = self.tp(raw)
1864 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001865 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001866 self.assertTrue(b.closed)
1867
benfogle9703f092017-11-10 16:03:40 -05001868 def test_slow_close_from_thread(self):
1869 # Issue #31976
1870 rawio = self.SlowFlushRawIO()
1871 bufio = self.tp(rawio, 8)
1872 t = threading.Thread(target=bufio.close)
1873 t.start()
1874 rawio.in_flush.wait()
1875 self.assertRaises(ValueError, bufio.write, b'spam')
1876 self.assertTrue(bufio.closed)
1877 t.join()
1878
1879
Benjamin Peterson59406a92009-03-26 17:10:29 +00001880
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001881class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001882 tp = io.BufferedWriter
1883
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001884 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1885 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001886 def test_constructor(self):
1887 BufferedWriterTest.test_constructor(self)
1888 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001889 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001890 if sys.maxsize > 0x7FFFFFFF:
1891 rawio = self.MockRawIO()
1892 bufio = self.tp(rawio)
1893 self.assertRaises((OverflowError, MemoryError, ValueError),
1894 bufio.__init__, rawio, sys.maxsize)
1895
1896 def test_initialization(self):
1897 rawio = self.MockRawIO()
1898 bufio = self.tp(rawio)
1899 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1900 self.assertRaises(ValueError, bufio.write, b"def")
1901 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1902 self.assertRaises(ValueError, bufio.write, b"def")
1903 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1904 self.assertRaises(ValueError, bufio.write, b"def")
1905
1906 def test_garbage_collection(self):
1907 # C BufferedWriter objects are collected, and collecting them flushes
1908 # all data to disk.
1909 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001910 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001911 with support.check_warnings(('', ResourceWarning)):
1912 rawio = self.FileIO(support.TESTFN, "w+b")
1913 f = self.tp(rawio)
1914 f.write(b"123xxx")
1915 f.x = f
1916 wr = weakref.ref(f)
1917 del f
1918 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001919 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001920 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921 self.assertEqual(f.read(), b"123xxx")
1922
R David Murray67bfe802013-02-23 21:51:05 -05001923 def test_args_error(self):
1924 # Issue #17275
1925 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1926 self.tp(io.BytesIO(), 1024, 1024, 1024)
1927
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001928
1929class PyBufferedWriterTest(BufferedWriterTest):
1930 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001931
Guido van Rossum01a27522007-03-07 01:00:12 +00001932class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001933
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001934 def test_constructor(self):
1935 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001936 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001937
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001938 def test_uninitialized(self):
1939 pair = self.tp.__new__(self.tp)
1940 del pair
1941 pair = self.tp.__new__(self.tp)
1942 self.assertRaisesRegex((ValueError, AttributeError),
1943 'uninitialized|has no attribute',
1944 pair.read, 0)
1945 self.assertRaisesRegex((ValueError, AttributeError),
1946 'uninitialized|has no attribute',
1947 pair.write, b'')
1948 pair.__init__(self.MockRawIO(), self.MockRawIO())
1949 self.assertEqual(pair.read(0), b'')
1950 self.assertEqual(pair.write(b''), 0)
1951
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001952 def test_detach(self):
1953 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1954 self.assertRaises(self.UnsupportedOperation, pair.detach)
1955
Florent Xicluna109d5732012-07-07 17:03:22 +02001956 def test_constructor_max_buffer_size_removal(self):
1957 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001958 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001959
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001960 def test_constructor_with_not_readable(self):
1961 class NotReadable(MockRawIO):
1962 def readable(self):
1963 return False
1964
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001965 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001966
1967 def test_constructor_with_not_writeable(self):
1968 class NotWriteable(MockRawIO):
1969 def writable(self):
1970 return False
1971
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001972 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001973
1974 def test_read(self):
1975 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1976
1977 self.assertEqual(pair.read(3), b"abc")
1978 self.assertEqual(pair.read(1), b"d")
1979 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001980 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1981 self.assertEqual(pair.read(None), b"abc")
1982
1983 def test_readlines(self):
1984 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1985 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1986 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1987 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001988
1989 def test_read1(self):
1990 # .read1() is delegated to the underlying reader object, so this test
1991 # can be shallow.
1992 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1993
1994 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001995 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001996
1997 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001998 for method in ("readinto", "readinto1"):
1999 with self.subTest(method):
2000 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002001
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002002 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002003 self.assertEqual(getattr(pair, method)(data), 5)
2004 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002005
2006 def test_write(self):
2007 w = self.MockRawIO()
2008 pair = self.tp(self.MockRawIO(), w)
2009
2010 pair.write(b"abc")
2011 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002012 buffer = bytearray(b"def")
2013 pair.write(buffer)
2014 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002015 pair.flush()
2016 self.assertEqual(w._write_stack, [b"abc", b"def"])
2017
2018 def test_peek(self):
2019 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2020
2021 self.assertTrue(pair.peek(3).startswith(b"abc"))
2022 self.assertEqual(pair.read(3), b"abc")
2023
2024 def test_readable(self):
2025 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2026 self.assertTrue(pair.readable())
2027
2028 def test_writeable(self):
2029 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2030 self.assertTrue(pair.writable())
2031
2032 def test_seekable(self):
2033 # BufferedRWPairs are never seekable, even if their readers and writers
2034 # are.
2035 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2036 self.assertFalse(pair.seekable())
2037
2038 # .flush() is delegated to the underlying writer object and has been
2039 # tested in the test_write method.
2040
2041 def test_close_and_closed(self):
2042 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2043 self.assertFalse(pair.closed)
2044 pair.close()
2045 self.assertTrue(pair.closed)
2046
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002047 def test_reader_close_error_on_close(self):
2048 def reader_close():
2049 reader_non_existing
2050 reader = self.MockRawIO()
2051 reader.close = reader_close
2052 writer = self.MockRawIO()
2053 pair = self.tp(reader, writer)
2054 with self.assertRaises(NameError) as err:
2055 pair.close()
2056 self.assertIn('reader_non_existing', str(err.exception))
2057 self.assertTrue(pair.closed)
2058 self.assertFalse(reader.closed)
2059 self.assertTrue(writer.closed)
2060
Victor Stinner472f7942019-04-12 21:58:24 +02002061 # Silence destructor error
2062 reader.close = lambda: None
2063
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002064 def test_writer_close_error_on_close(self):
2065 def writer_close():
2066 writer_non_existing
2067 reader = self.MockRawIO()
2068 writer = self.MockRawIO()
2069 writer.close = writer_close
2070 pair = self.tp(reader, writer)
2071 with self.assertRaises(NameError) as err:
2072 pair.close()
2073 self.assertIn('writer_non_existing', str(err.exception))
2074 self.assertFalse(pair.closed)
2075 self.assertTrue(reader.closed)
2076 self.assertFalse(writer.closed)
2077
Victor Stinner472f7942019-04-12 21:58:24 +02002078 # Silence destructor error
2079 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002080 writer = None
2081
Victor Stinner212646c2019-06-14 18:03:22 +02002082 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002083 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002084 # Ignore BufferedRWPair unraisable exception
2085 with support.catch_unraisable_exception():
2086 pair = None
2087 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002088 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002089
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002090 def test_reader_writer_close_error_on_close(self):
2091 def reader_close():
2092 reader_non_existing
2093 def writer_close():
2094 writer_non_existing
2095 reader = self.MockRawIO()
2096 reader.close = reader_close
2097 writer = self.MockRawIO()
2098 writer.close = writer_close
2099 pair = self.tp(reader, writer)
2100 with self.assertRaises(NameError) as err:
2101 pair.close()
2102 self.assertIn('reader_non_existing', str(err.exception))
2103 self.assertIsInstance(err.exception.__context__, NameError)
2104 self.assertIn('writer_non_existing', str(err.exception.__context__))
2105 self.assertFalse(pair.closed)
2106 self.assertFalse(reader.closed)
2107 self.assertFalse(writer.closed)
2108
Victor Stinner472f7942019-04-12 21:58:24 +02002109 # Silence destructor error
2110 reader.close = lambda: None
2111 writer.close = lambda: None
2112
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002113 def test_isatty(self):
2114 class SelectableIsAtty(MockRawIO):
2115 def __init__(self, isatty):
2116 MockRawIO.__init__(self)
2117 self._isatty = isatty
2118
2119 def isatty(self):
2120 return self._isatty
2121
2122 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2123 self.assertFalse(pair.isatty())
2124
2125 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2126 self.assertTrue(pair.isatty())
2127
2128 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2129 self.assertTrue(pair.isatty())
2130
2131 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2132 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002133
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002134 def test_weakref_clearing(self):
2135 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2136 ref = weakref.ref(brw)
2137 brw = None
2138 ref = None # Shouldn't segfault.
2139
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002140class CBufferedRWPairTest(BufferedRWPairTest):
2141 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002143class PyBufferedRWPairTest(BufferedRWPairTest):
2144 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002145
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002146
2147class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2148 read_mode = "rb+"
2149 write_mode = "wb+"
2150
2151 def test_constructor(self):
2152 BufferedReaderTest.test_constructor(self)
2153 BufferedWriterTest.test_constructor(self)
2154
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002155 def test_uninitialized(self):
2156 BufferedReaderTest.test_uninitialized(self)
2157 BufferedWriterTest.test_uninitialized(self)
2158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002159 def test_read_and_write(self):
2160 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002161 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002162
2163 self.assertEqual(b"as", rw.read(2))
2164 rw.write(b"ddd")
2165 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002166 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002167 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002168 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002169
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002170 def test_seek_and_tell(self):
2171 raw = self.BytesIO(b"asdfghjkl")
2172 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002173
Ezio Melottib3aedd42010-11-20 19:04:17 +00002174 self.assertEqual(b"as", rw.read(2))
2175 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002176 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002177 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002178
Antoine Pitroue05565e2011-08-20 14:39:23 +02002179 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002180 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002181 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002182 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002183 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002184 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002185 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(7, rw.tell())
2187 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002188 rw.flush()
2189 self.assertEqual(b"asdf123fl", raw.getvalue())
2190
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002191 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193 def check_flush_and_read(self, read_func):
2194 raw = self.BytesIO(b"abcdefghi")
2195 bufio = self.tp(raw)
2196
Ezio Melottib3aedd42010-11-20 19:04:17 +00002197 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002198 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002199 self.assertEqual(b"ef", read_func(bufio, 2))
2200 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002201 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002202 self.assertEqual(6, bufio.tell())
2203 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002204 raw.seek(0, 0)
2205 raw.write(b"XYZ")
2206 # flush() resets the read buffer
2207 bufio.flush()
2208 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002209 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002210
2211 def test_flush_and_read(self):
2212 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2213
2214 def test_flush_and_readinto(self):
2215 def _readinto(bufio, n=-1):
2216 b = bytearray(n if n >= 0 else 9999)
2217 n = bufio.readinto(b)
2218 return bytes(b[:n])
2219 self.check_flush_and_read(_readinto)
2220
2221 def test_flush_and_peek(self):
2222 def _peek(bufio, n=-1):
2223 # This relies on the fact that the buffer can contain the whole
2224 # raw stream, otherwise peek() can return less.
2225 b = bufio.peek(n)
2226 if n != -1:
2227 b = b[:n]
2228 bufio.seek(len(b), 1)
2229 return b
2230 self.check_flush_and_read(_peek)
2231
2232 def test_flush_and_write(self):
2233 raw = self.BytesIO(b"abcdefghi")
2234 bufio = self.tp(raw)
2235
2236 bufio.write(b"123")
2237 bufio.flush()
2238 bufio.write(b"45")
2239 bufio.flush()
2240 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002241 self.assertEqual(b"12345fghi", raw.getvalue())
2242 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002243
2244 def test_threads(self):
2245 BufferedReaderTest.test_threads(self)
2246 BufferedWriterTest.test_threads(self)
2247
2248 def test_writes_and_peek(self):
2249 def _peek(bufio):
2250 bufio.peek(1)
2251 self.check_writes(_peek)
2252 def _peek(bufio):
2253 pos = bufio.tell()
2254 bufio.seek(-1, 1)
2255 bufio.peek(1)
2256 bufio.seek(pos, 0)
2257 self.check_writes(_peek)
2258
2259 def test_writes_and_reads(self):
2260 def _read(bufio):
2261 bufio.seek(-1, 1)
2262 bufio.read(1)
2263 self.check_writes(_read)
2264
2265 def test_writes_and_read1s(self):
2266 def _read1(bufio):
2267 bufio.seek(-1, 1)
2268 bufio.read1(1)
2269 self.check_writes(_read1)
2270
2271 def test_writes_and_readintos(self):
2272 def _read(bufio):
2273 bufio.seek(-1, 1)
2274 bufio.readinto(bytearray(1))
2275 self.check_writes(_read)
2276
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002277 def test_write_after_readahead(self):
2278 # Issue #6629: writing after the buffer was filled by readahead should
2279 # first rewind the raw stream.
2280 for overwrite_size in [1, 5]:
2281 raw = self.BytesIO(b"A" * 10)
2282 bufio = self.tp(raw, 4)
2283 # Trigger readahead
2284 self.assertEqual(bufio.read(1), b"A")
2285 self.assertEqual(bufio.tell(), 1)
2286 # Overwriting should rewind the raw stream if it needs so
2287 bufio.write(b"B" * overwrite_size)
2288 self.assertEqual(bufio.tell(), overwrite_size + 1)
2289 # If the write size was smaller than the buffer size, flush() and
2290 # check that rewind happens.
2291 bufio.flush()
2292 self.assertEqual(bufio.tell(), overwrite_size + 1)
2293 s = raw.getvalue()
2294 self.assertEqual(s,
2295 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2296
Antoine Pitrou7c404892011-05-13 00:13:33 +02002297 def test_write_rewind_write(self):
2298 # Various combinations of reading / writing / seeking backwards / writing again
2299 def mutate(bufio, pos1, pos2):
2300 assert pos2 >= pos1
2301 # Fill the buffer
2302 bufio.seek(pos1)
2303 bufio.read(pos2 - pos1)
2304 bufio.write(b'\x02')
2305 # This writes earlier than the previous write, but still inside
2306 # the buffer.
2307 bufio.seek(pos1)
2308 bufio.write(b'\x01')
2309
2310 b = b"\x80\x81\x82\x83\x84"
2311 for i in range(0, len(b)):
2312 for j in range(i, len(b)):
2313 raw = self.BytesIO(b)
2314 bufio = self.tp(raw, 100)
2315 mutate(bufio, i, j)
2316 bufio.flush()
2317 expected = bytearray(b)
2318 expected[j] = 2
2319 expected[i] = 1
2320 self.assertEqual(raw.getvalue(), expected,
2321 "failed result for i=%d, j=%d" % (i, j))
2322
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002323 def test_truncate_after_read_or_write(self):
2324 raw = self.BytesIO(b"A" * 10)
2325 bufio = self.tp(raw, 100)
2326 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2327 self.assertEqual(bufio.truncate(), 2)
2328 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2329 self.assertEqual(bufio.truncate(), 4)
2330
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002331 def test_misbehaved_io(self):
2332 BufferedReaderTest.test_misbehaved_io(self)
2333 BufferedWriterTest.test_misbehaved_io(self)
2334
Antoine Pitroue05565e2011-08-20 14:39:23 +02002335 def test_interleaved_read_write(self):
2336 # Test for issue #12213
2337 with self.BytesIO(b'abcdefgh') as raw:
2338 with self.tp(raw, 100) as f:
2339 f.write(b"1")
2340 self.assertEqual(f.read(1), b'b')
2341 f.write(b'2')
2342 self.assertEqual(f.read1(1), b'd')
2343 f.write(b'3')
2344 buf = bytearray(1)
2345 f.readinto(buf)
2346 self.assertEqual(buf, b'f')
2347 f.write(b'4')
2348 self.assertEqual(f.peek(1), b'h')
2349 f.flush()
2350 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2351
2352 with self.BytesIO(b'abc') as raw:
2353 with self.tp(raw, 100) as f:
2354 self.assertEqual(f.read(1), b'a')
2355 f.write(b"2")
2356 self.assertEqual(f.read(1), b'c')
2357 f.flush()
2358 self.assertEqual(raw.getvalue(), b'a2c')
2359
2360 def test_interleaved_readline_write(self):
2361 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2362 with self.tp(raw) as f:
2363 f.write(b'1')
2364 self.assertEqual(f.readline(), b'b\n')
2365 f.write(b'2')
2366 self.assertEqual(f.readline(), b'def\n')
2367 f.write(b'3')
2368 self.assertEqual(f.readline(), b'\n')
2369 f.flush()
2370 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2371
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002372 # You can't construct a BufferedRandom over a non-seekable stream.
2373 test_unseekable = None
2374
R David Murray67bfe802013-02-23 21:51:05 -05002375
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002376class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002377 tp = io.BufferedRandom
2378
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002379 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2380 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002381 def test_constructor(self):
2382 BufferedRandomTest.test_constructor(self)
2383 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002384 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002385 if sys.maxsize > 0x7FFFFFFF:
2386 rawio = self.MockRawIO()
2387 bufio = self.tp(rawio)
2388 self.assertRaises((OverflowError, MemoryError, ValueError),
2389 bufio.__init__, rawio, sys.maxsize)
2390
2391 def test_garbage_collection(self):
2392 CBufferedReaderTest.test_garbage_collection(self)
2393 CBufferedWriterTest.test_garbage_collection(self)
2394
R David Murray67bfe802013-02-23 21:51:05 -05002395 def test_args_error(self):
2396 # Issue #17275
2397 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2398 self.tp(io.BytesIO(), 1024, 1024, 1024)
2399
2400
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002401class PyBufferedRandomTest(BufferedRandomTest):
2402 tp = pyio.BufferedRandom
2403
2404
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002405# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2406# properties:
2407# - A single output character can correspond to many bytes of input.
2408# - The number of input bytes to complete the character can be
2409# undetermined until the last input byte is received.
2410# - The number of input bytes can vary depending on previous input.
2411# - A single input byte can correspond to many characters of output.
2412# - The number of output characters can be undetermined until the
2413# last input byte is received.
2414# - The number of output characters can vary depending on previous input.
2415
2416class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2417 """
2418 For testing seek/tell behavior with a stateful, buffering decoder.
2419
2420 Input is a sequence of words. Words may be fixed-length (length set
2421 by input) or variable-length (period-terminated). In variable-length
2422 mode, extra periods are ignored. Possible words are:
2423 - 'i' followed by a number sets the input length, I (maximum 99).
2424 When I is set to 0, words are space-terminated.
2425 - 'o' followed by a number sets the output length, O (maximum 99).
2426 - Any other word is converted into a word followed by a period on
2427 the output. The output word consists of the input word truncated
2428 or padded out with hyphens to make its length equal to O. If O
2429 is 0, the word is output verbatim without truncating or padding.
2430 I and O are initially set to 1. When I changes, any buffered input is
2431 re-scanned according to the new I. EOF also terminates the last word.
2432 """
2433
2434 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002435 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002436 self.reset()
2437
2438 def __repr__(self):
2439 return '<SID %x>' % id(self)
2440
2441 def reset(self):
2442 self.i = 1
2443 self.o = 1
2444 self.buffer = bytearray()
2445
2446 def getstate(self):
2447 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2448 return bytes(self.buffer), i*100 + o
2449
2450 def setstate(self, state):
2451 buffer, io = state
2452 self.buffer = bytearray(buffer)
2453 i, o = divmod(io, 100)
2454 self.i, self.o = i ^ 1, o ^ 1
2455
2456 def decode(self, input, final=False):
2457 output = ''
2458 for b in input:
2459 if self.i == 0: # variable-length, terminated with period
2460 if b == ord('.'):
2461 if self.buffer:
2462 output += self.process_word()
2463 else:
2464 self.buffer.append(b)
2465 else: # fixed-length, terminate after self.i bytes
2466 self.buffer.append(b)
2467 if len(self.buffer) == self.i:
2468 output += self.process_word()
2469 if final and self.buffer: # EOF terminates the last word
2470 output += self.process_word()
2471 return output
2472
2473 def process_word(self):
2474 output = ''
2475 if self.buffer[0] == ord('i'):
2476 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2477 elif self.buffer[0] == ord('o'):
2478 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2479 else:
2480 output = self.buffer.decode('ascii')
2481 if len(output) < self.o:
2482 output += '-'*self.o # pad out with hyphens
2483 if self.o:
2484 output = output[:self.o] # truncate to output length
2485 output += '.'
2486 self.buffer = bytearray()
2487 return output
2488
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002489 codecEnabled = False
2490
2491 @classmethod
2492 def lookupTestDecoder(cls, name):
2493 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002494 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002495 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002496 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002497 incrementalencoder=None,
2498 streamreader=None, streamwriter=None,
2499 incrementaldecoder=cls)
2500
2501# Register the previous decoder for testing.
2502# Disabled by default, tests will enable it.
2503codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2504
2505
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002506class StatefulIncrementalDecoderTest(unittest.TestCase):
2507 """
2508 Make sure the StatefulIncrementalDecoder actually works.
2509 """
2510
2511 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002512 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002513 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002514 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002515 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002516 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002517 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002518 # I=0, O=6 (variable-length input, fixed-length output)
2519 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2520 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002521 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002522 # I=6, O=3 (fixed-length input > fixed-length output)
2523 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2524 # I=0, then 3; O=29, then 15 (with longer output)
2525 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2526 'a----------------------------.' +
2527 'b----------------------------.' +
2528 'cde--------------------------.' +
2529 'abcdefghijabcde.' +
2530 'a.b------------.' +
2531 '.c.------------.' +
2532 'd.e------------.' +
2533 'k--------------.' +
2534 'l--------------.' +
2535 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002536 ]
2537
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002538 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002539 # Try a few one-shot test cases.
2540 for input, eof, output in self.test_cases:
2541 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002542 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002543
2544 # Also test an unfinished decode, followed by forcing EOF.
2545 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002546 self.assertEqual(d.decode(b'oiabcd'), '')
2547 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002548
2549class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002550
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002551 def setUp(self):
2552 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2553 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002554 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002555
Guido van Rossumd0712812007-04-11 16:32:43 +00002556 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002557 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002558
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002559 def test_constructor(self):
2560 r = self.BytesIO(b"\xc3\xa9\n\n")
2561 b = self.BufferedReader(r, 1000)
2562 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002563 t.__init__(b, encoding="latin-1", newline="\r\n")
2564 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002565 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002566 t.__init__(b, encoding="utf-8", line_buffering=True)
2567 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002568 self.assertEqual(t.line_buffering, True)
2569 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570 self.assertRaises(TypeError, t.__init__, b, newline=42)
2571 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2572
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002573 def test_uninitialized(self):
2574 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2575 del t
2576 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2577 self.assertRaises(Exception, repr, t)
2578 self.assertRaisesRegex((ValueError, AttributeError),
2579 'uninitialized|has no attribute',
2580 t.read, 0)
2581 t.__init__(self.MockRawIO())
2582 self.assertEqual(t.read(0), '')
2583
Nick Coghlana9b15242014-02-04 22:11:18 +10002584 def test_non_text_encoding_codecs_are_rejected(self):
2585 # Ensure the constructor complains if passed a codec that isn't
2586 # marked as a text encoding
2587 # http://bugs.python.org/issue20404
2588 r = self.BytesIO()
2589 b = self.BufferedWriter(r)
2590 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2591 self.TextIOWrapper(b, encoding="hex")
2592
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002593 def test_detach(self):
2594 r = self.BytesIO()
2595 b = self.BufferedWriter(r)
2596 t = self.TextIOWrapper(b)
2597 self.assertIs(t.detach(), b)
2598
2599 t = self.TextIOWrapper(b, encoding="ascii")
2600 t.write("howdy")
2601 self.assertFalse(r.getvalue())
2602 t.detach()
2603 self.assertEqual(r.getvalue(), b"howdy")
2604 self.assertRaises(ValueError, t.detach)
2605
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002606 # Operations independent of the detached stream should still work
2607 repr(t)
2608 self.assertEqual(t.encoding, "ascii")
2609 self.assertEqual(t.errors, "strict")
2610 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002611 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002612
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002613 def test_repr(self):
2614 raw = self.BytesIO("hello".encode("utf-8"))
2615 b = self.BufferedReader(raw)
2616 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002617 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002618 self.assertRegex(repr(t),
2619 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002620 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002621 self.assertRegex(repr(t),
2622 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002623 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002624 self.assertRegex(repr(t),
2625 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002626 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002627 self.assertRegex(repr(t),
2628 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002629
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002630 t.buffer.detach()
2631 repr(t) # Should not raise an exception
2632
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002633 def test_recursive_repr(self):
2634 # Issue #25455
2635 raw = self.BytesIO()
2636 t = self.TextIOWrapper(raw)
2637 with support.swap_attr(raw, 'name', t):
2638 try:
2639 repr(t) # Should not crash
2640 except RuntimeError:
2641 pass
2642
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002643 def test_line_buffering(self):
2644 r = self.BytesIO()
2645 b = self.BufferedWriter(r, 1000)
2646 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002647 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002648 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002649 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002650 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002651 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002652 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002653
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002654 def test_reconfigure_line_buffering(self):
2655 r = self.BytesIO()
2656 b = self.BufferedWriter(r, 1000)
2657 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2658 t.write("AB\nC")
2659 self.assertEqual(r.getvalue(), b"")
2660
2661 t.reconfigure(line_buffering=True) # implicit flush
2662 self.assertEqual(r.getvalue(), b"AB\nC")
2663 t.write("DEF\nG")
2664 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2665 t.write("H")
2666 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2667 t.reconfigure(line_buffering=False) # implicit flush
2668 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2669 t.write("IJ")
2670 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2671
2672 # Keeping default value
2673 t.reconfigure()
2674 t.reconfigure(line_buffering=None)
2675 self.assertEqual(t.line_buffering, False)
2676 t.reconfigure(line_buffering=True)
2677 t.reconfigure()
2678 t.reconfigure(line_buffering=None)
2679 self.assertEqual(t.line_buffering, True)
2680
Victor Stinner91106cd2017-12-13 12:29:09 +01002681 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002682 def test_default_encoding(self):
2683 old_environ = dict(os.environ)
2684 try:
2685 # try to get a user preferred encoding different than the current
2686 # locale encoding to check that TextIOWrapper() uses the current
2687 # locale encoding and not the user preferred encoding
2688 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2689 if key in os.environ:
2690 del os.environ[key]
2691
2692 current_locale_encoding = locale.getpreferredencoding(False)
2693 b = self.BytesIO()
2694 t = self.TextIOWrapper(b)
2695 self.assertEqual(t.encoding, current_locale_encoding)
2696 finally:
2697 os.environ.clear()
2698 os.environ.update(old_environ)
2699
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002700 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002701 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002702 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002703 # Issue 15989
2704 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002705 b = self.BytesIO()
2706 b.fileno = lambda: _testcapi.INT_MAX + 1
2707 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2708 b.fileno = lambda: _testcapi.UINT_MAX + 1
2709 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2710
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002711 def test_encoding(self):
2712 # Check the encoding attribute is always set, and valid
2713 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002714 t = self.TextIOWrapper(b, encoding="utf-8")
2715 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002716 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002717 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002718 codecs.lookup(t.encoding)
2719
2720 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002721 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 b = self.BytesIO(b"abc\n\xff\n")
2723 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002724 self.assertRaises(UnicodeError, t.read)
2725 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002726 b = self.BytesIO(b"abc\n\xff\n")
2727 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002728 self.assertRaises(UnicodeError, t.read)
2729 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002730 b = self.BytesIO(b"abc\n\xff\n")
2731 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002732 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002733 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002734 b = self.BytesIO(b"abc\n\xff\n")
2735 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002736 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002737
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002738 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002739 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002740 b = self.BytesIO()
2741 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002742 self.assertRaises(UnicodeError, t.write, "\xff")
2743 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002744 b = self.BytesIO()
2745 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002746 self.assertRaises(UnicodeError, t.write, "\xff")
2747 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002748 b = self.BytesIO()
2749 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002750 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002751 t.write("abc\xffdef\n")
2752 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002753 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002754 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002755 b = self.BytesIO()
2756 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002757 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002758 t.write("abc\xffdef\n")
2759 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002760 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002761
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002762 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002763 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2764
2765 tests = [
2766 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002767 [ '', input_lines ],
2768 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2769 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2770 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002771 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002772 encodings = (
2773 'utf-8', 'latin-1',
2774 'utf-16', 'utf-16-le', 'utf-16-be',
2775 'utf-32', 'utf-32-le', 'utf-32-be',
2776 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002777
Guido van Rossum8358db22007-08-18 21:39:55 +00002778 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002779 # character in TextIOWrapper._pending_line.
2780 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002781 # XXX: str.encode() should return bytes
2782 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002783 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002784 for bufsize in range(1, 10):
2785 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002786 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2787 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002788 encoding=encoding)
2789 if do_reads:
2790 got_lines = []
2791 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002792 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002793 if c2 == '':
2794 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002795 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002796 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002797 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002798 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002799
2800 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002801 self.assertEqual(got_line, exp_line)
2802 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002803
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002804 def test_newlines_input(self):
2805 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002806 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2807 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002808 (None, normalized.decode("ascii").splitlines(keepends=True)),
2809 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002810 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2811 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2812 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002813 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002814 buf = self.BytesIO(testdata)
2815 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002816 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002817 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002818 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002819
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002820 def test_newlines_output(self):
2821 testdict = {
2822 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2823 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2824 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2825 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2826 }
2827 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2828 for newline, expected in tests:
2829 buf = self.BytesIO()
2830 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2831 txt.write("AAA\nB")
2832 txt.write("BB\nCCC\n")
2833 txt.write("X\rY\r\nZ")
2834 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002835 self.assertEqual(buf.closed, False)
2836 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002837
2838 def test_destructor(self):
2839 l = []
2840 base = self.BytesIO
2841 class MyBytesIO(base):
2842 def close(self):
2843 l.append(self.getvalue())
2844 base.close(self)
2845 b = MyBytesIO()
2846 t = self.TextIOWrapper(b, encoding="ascii")
2847 t.write("abc")
2848 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002849 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002850 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002851
2852 def test_override_destructor(self):
2853 record = []
2854 class MyTextIO(self.TextIOWrapper):
2855 def __del__(self):
2856 record.append(1)
2857 try:
2858 f = super().__del__
2859 except AttributeError:
2860 pass
2861 else:
2862 f()
2863 def close(self):
2864 record.append(2)
2865 super().close()
2866 def flush(self):
2867 record.append(3)
2868 super().flush()
2869 b = self.BytesIO()
2870 t = MyTextIO(b, encoding="ascii")
2871 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002872 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002873 self.assertEqual(record, [1, 2, 3])
2874
2875 def test_error_through_destructor(self):
2876 # Test that the exception state is not modified by a destructor,
2877 # even if close() fails.
2878 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002879 with support.catch_unraisable_exception() as cm:
2880 with self.assertRaises(AttributeError):
2881 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002882
2883 if not IOBASE_EMITS_UNRAISABLE:
2884 self.assertIsNone(cm.unraisable)
2885 elif cm.unraisable is not None:
2886 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002887
Guido van Rossum9b76da62007-04-11 01:09:03 +00002888 # Systematic tests of the text I/O API
2889
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002890 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002891 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002892 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002893 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002894 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002895 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002896 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002897 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002898 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002899 self.assertEqual(f.tell(), 0)
2900 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002901 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002902 self.assertEqual(f.seek(0), 0)
2903 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002904 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002905 self.assertEqual(f.read(2), "ab")
2906 self.assertEqual(f.read(1), "c")
2907 self.assertEqual(f.read(1), "")
2908 self.assertEqual(f.read(), "")
2909 self.assertEqual(f.tell(), cookie)
2910 self.assertEqual(f.seek(0), 0)
2911 self.assertEqual(f.seek(0, 2), cookie)
2912 self.assertEqual(f.write("def"), 3)
2913 self.assertEqual(f.seek(cookie), cookie)
2914 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002915 if enc.startswith("utf"):
2916 self.multi_line_test(f, enc)
2917 f.close()
2918
2919 def multi_line_test(self, f, enc):
2920 f.seek(0)
2921 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002922 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002923 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002924 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002925 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002926 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002927 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002928 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002929 wlines.append((f.tell(), line))
2930 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002931 f.seek(0)
2932 rlines = []
2933 while True:
2934 pos = f.tell()
2935 line = f.readline()
2936 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002937 break
2938 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002939 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002940
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002941 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002942 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002943 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002944 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002945 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002946 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002947 p2 = f.tell()
2948 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002949 self.assertEqual(f.tell(), p0)
2950 self.assertEqual(f.readline(), "\xff\n")
2951 self.assertEqual(f.tell(), p1)
2952 self.assertEqual(f.readline(), "\xff\n")
2953 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002954 f.seek(0)
2955 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002956 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002957 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002958 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002959 f.close()
2960
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002961 def test_seeking(self):
2962 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002963 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002964 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002965 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002966 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002967 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002968 suffix = bytes(u_suffix.encode("utf-8"))
2969 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002970 with self.open(support.TESTFN, "wb") as f:
2971 f.write(line*2)
2972 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2973 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002974 self.assertEqual(s, str(prefix, "ascii"))
2975 self.assertEqual(f.tell(), prefix_size)
2976 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002977
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002978 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002979 # Regression test for a specific bug
2980 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002981 with self.open(support.TESTFN, "wb") as f:
2982 f.write(data)
2983 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2984 f._CHUNK_SIZE # Just test that it exists
2985 f._CHUNK_SIZE = 2
2986 f.readline()
2987 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002989 def test_seek_and_tell(self):
2990 #Test seek/tell using the StatefulIncrementalDecoder.
2991 # Make test faster by doing smaller seeks
2992 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002993
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002994 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002995 """Tell/seek to various points within a data stream and ensure
2996 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002997 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002998 f.write(data)
2999 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003000 f = self.open(support.TESTFN, encoding='test_decoder')
3001 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003002 decoded = f.read()
3003 f.close()
3004
Neal Norwitze2b07052008-03-18 19:52:05 +00003005 for i in range(min_pos, len(decoded) + 1): # seek positions
3006 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003007 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003008 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003009 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003010 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003011 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003012 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003013 f.close()
3014
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003015 # Enable the test decoder.
3016 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003017
3018 # Run the tests.
3019 try:
3020 # Try each test case.
3021 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003022 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003023
3024 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003025 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3026 offset = CHUNK_SIZE - len(input)//2
3027 prefix = b'.'*offset
3028 # Don't bother seeking into the prefix (takes too long).
3029 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003030 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003031
3032 # Ensure our test decoder won't interfere with subsequent tests.
3033 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003034 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003035
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003036 def test_multibyte_seek_and_tell(self):
3037 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3038 f.write("AB\n\u3046\u3048\n")
3039 f.close()
3040
3041 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3042 self.assertEqual(f.readline(), "AB\n")
3043 p0 = f.tell()
3044 self.assertEqual(f.readline(), "\u3046\u3048\n")
3045 p1 = f.tell()
3046 f.seek(p0)
3047 self.assertEqual(f.readline(), "\u3046\u3048\n")
3048 self.assertEqual(f.tell(), p1)
3049 f.close()
3050
3051 def test_seek_with_encoder_state(self):
3052 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3053 f.write("\u00e6\u0300")
3054 p0 = f.tell()
3055 f.write("\u00e6")
3056 f.seek(p0)
3057 f.write("\u0300")
3058 f.close()
3059
3060 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3061 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3062 f.close()
3063
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003064 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003065 data = "1234567890"
3066 tests = ("utf-16",
3067 "utf-16-le",
3068 "utf-16-be",
3069 "utf-32",
3070 "utf-32-le",
3071 "utf-32-be")
3072 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003073 buf = self.BytesIO()
3074 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003075 # Check if the BOM is written only once (see issue1753).
3076 f.write(data)
3077 f.write(data)
3078 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003079 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003080 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003081 self.assertEqual(f.read(), data * 2)
3082 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003083
Benjamin Petersona1b49012009-03-31 23:11:32 +00003084 def test_unreadable(self):
3085 class UnReadable(self.BytesIO):
3086 def readable(self):
3087 return False
3088 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003089 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003090
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003091 def test_read_one_by_one(self):
3092 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003093 reads = ""
3094 while True:
3095 c = txt.read(1)
3096 if not c:
3097 break
3098 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003099 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003100
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003101 def test_readlines(self):
3102 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3103 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3104 txt.seek(0)
3105 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3106 txt.seek(0)
3107 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3108
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003109 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003110 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003111 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003112 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003113 reads = ""
3114 while True:
3115 c = txt.read(128)
3116 if not c:
3117 break
3118 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003119 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003120
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003121 def test_writelines(self):
3122 l = ['ab', 'cd', 'ef']
3123 buf = self.BytesIO()
3124 txt = self.TextIOWrapper(buf)
3125 txt.writelines(l)
3126 txt.flush()
3127 self.assertEqual(buf.getvalue(), b'abcdef')
3128
3129 def test_writelines_userlist(self):
3130 l = UserList(['ab', 'cd', 'ef'])
3131 buf = self.BytesIO()
3132 txt = self.TextIOWrapper(buf)
3133 txt.writelines(l)
3134 txt.flush()
3135 self.assertEqual(buf.getvalue(), b'abcdef')
3136
3137 def test_writelines_error(self):
3138 txt = self.TextIOWrapper(self.BytesIO())
3139 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3140 self.assertRaises(TypeError, txt.writelines, None)
3141 self.assertRaises(TypeError, txt.writelines, b'abc')
3142
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003143 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003144 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003145
3146 # read one char at a time
3147 reads = ""
3148 while True:
3149 c = txt.read(1)
3150 if not c:
3151 break
3152 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003153 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003154
3155 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003156 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003157 txt._CHUNK_SIZE = 4
3158
3159 reads = ""
3160 while True:
3161 c = txt.read(4)
3162 if not c:
3163 break
3164 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003165 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003166
3167 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003168 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003169 txt._CHUNK_SIZE = 4
3170
3171 reads = txt.read(4)
3172 reads += txt.read(4)
3173 reads += txt.readline()
3174 reads += txt.readline()
3175 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003176 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003177
3178 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003179 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003180 txt._CHUNK_SIZE = 4
3181
3182 reads = txt.read(4)
3183 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003184 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003185
3186 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003187 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003188 txt._CHUNK_SIZE = 4
3189
3190 reads = txt.read(4)
3191 pos = txt.tell()
3192 txt.seek(0)
3193 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003194 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003195
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003196 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003197 buffer = self.BytesIO(self.testdata)
3198 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003199
3200 self.assertEqual(buffer.seekable(), txt.seekable())
3201
Antoine Pitroue4501852009-05-14 18:55:55 +00003202 def test_append_bom(self):
3203 # The BOM is not written again when appending to a non-empty file
3204 filename = support.TESTFN
3205 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3206 with self.open(filename, 'w', encoding=charset) as f:
3207 f.write('aaa')
3208 pos = f.tell()
3209 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003210 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003211
3212 with self.open(filename, 'a', encoding=charset) as f:
3213 f.write('xxx')
3214 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003215 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003216
3217 def test_seek_bom(self):
3218 # Same test, but when seeking manually
3219 filename = support.TESTFN
3220 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3221 with self.open(filename, 'w', encoding=charset) as f:
3222 f.write('aaa')
3223 pos = f.tell()
3224 with self.open(filename, 'r+', encoding=charset) as f:
3225 f.seek(pos)
3226 f.write('zzz')
3227 f.seek(0)
3228 f.write('bbb')
3229 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003230 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003231
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003232 def test_seek_append_bom(self):
3233 # Same test, but first seek to the start and then to the end
3234 filename = support.TESTFN
3235 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3236 with self.open(filename, 'w', encoding=charset) as f:
3237 f.write('aaa')
3238 with self.open(filename, 'a', encoding=charset) as f:
3239 f.seek(0)
3240 f.seek(0, self.SEEK_END)
3241 f.write('xxx')
3242 with self.open(filename, 'rb') as f:
3243 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3244
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003245 def test_errors_property(self):
3246 with self.open(support.TESTFN, "w") as f:
3247 self.assertEqual(f.errors, "strict")
3248 with self.open(support.TESTFN, "w", errors="replace") as f:
3249 self.assertEqual(f.errors, "replace")
3250
Brett Cannon31f59292011-02-21 19:29:56 +00003251 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003252 def test_threads_write(self):
3253 # Issue6750: concurrent writes could duplicate data
3254 event = threading.Event()
3255 with self.open(support.TESTFN, "w", buffering=1) as f:
3256 def run(n):
3257 text = "Thread%03d\n" % n
3258 event.wait()
3259 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003260 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003261 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003262 with support.start_threads(threads, event.set):
3263 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003264 with self.open(support.TESTFN) as f:
3265 content = f.read()
3266 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003267 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003268
Antoine Pitrou6be88762010-05-03 16:48:20 +00003269 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003270 # Test that text file is closed despite failed flush
3271 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003272 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003273 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003274 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003275 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003276 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003277 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003278 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003279 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003280 self.assertTrue(txt.buffer.closed)
3281 self.assertTrue(closed) # flush() called
3282 self.assertFalse(closed[0]) # flush() called before file closed
3283 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003284 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003285
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003286 def test_close_error_on_close(self):
3287 buffer = self.BytesIO(self.testdata)
3288 def bad_flush():
3289 raise OSError('flush')
3290 def bad_close():
3291 raise OSError('close')
3292 buffer.close = bad_close
3293 txt = self.TextIOWrapper(buffer, encoding="ascii")
3294 txt.flush = bad_flush
3295 with self.assertRaises(OSError) as err: # exception not swallowed
3296 txt.close()
3297 self.assertEqual(err.exception.args, ('close',))
3298 self.assertIsInstance(err.exception.__context__, OSError)
3299 self.assertEqual(err.exception.__context__.args, ('flush',))
3300 self.assertFalse(txt.closed)
3301
Victor Stinner472f7942019-04-12 21:58:24 +02003302 # Silence destructor error
3303 buffer.close = lambda: None
3304 txt.flush = lambda: None
3305
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003306 def test_nonnormalized_close_error_on_close(self):
3307 # Issue #21677
3308 buffer = self.BytesIO(self.testdata)
3309 def bad_flush():
3310 raise non_existing_flush
3311 def bad_close():
3312 raise non_existing_close
3313 buffer.close = bad_close
3314 txt = self.TextIOWrapper(buffer, encoding="ascii")
3315 txt.flush = bad_flush
3316 with self.assertRaises(NameError) as err: # exception not swallowed
3317 txt.close()
3318 self.assertIn('non_existing_close', str(err.exception))
3319 self.assertIsInstance(err.exception.__context__, NameError)
3320 self.assertIn('non_existing_flush', str(err.exception.__context__))
3321 self.assertFalse(txt.closed)
3322
Victor Stinner472f7942019-04-12 21:58:24 +02003323 # Silence destructor error
3324 buffer.close = lambda: None
3325 txt.flush = lambda: None
3326
Antoine Pitrou6be88762010-05-03 16:48:20 +00003327 def test_multi_close(self):
3328 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3329 txt.close()
3330 txt.close()
3331 txt.close()
3332 self.assertRaises(ValueError, txt.flush)
3333
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003334 def test_unseekable(self):
3335 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3336 self.assertRaises(self.UnsupportedOperation, txt.tell)
3337 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3338
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003339 def test_readonly_attributes(self):
3340 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3341 buf = self.BytesIO(self.testdata)
3342 with self.assertRaises(AttributeError):
3343 txt.buffer = buf
3344
Antoine Pitroue96ec682011-07-23 21:46:35 +02003345 def test_rawio(self):
3346 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3347 # that subprocess.Popen() can have the required unbuffered
3348 # semantics with universal_newlines=True.
3349 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3350 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3351 # Reads
3352 self.assertEqual(txt.read(4), 'abcd')
3353 self.assertEqual(txt.readline(), 'efghi\n')
3354 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3355
3356 def test_rawio_write_through(self):
3357 # Issue #12591: with write_through=True, writes don't need a flush
3358 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3359 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3360 write_through=True)
3361 txt.write('1')
3362 txt.write('23\n4')
3363 txt.write('5')
3364 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3365
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003366 def test_bufio_write_through(self):
3367 # Issue #21396: write_through=True doesn't force a flush()
3368 # on the underlying binary buffered object.
3369 flush_called, write_called = [], []
3370 class BufferedWriter(self.BufferedWriter):
3371 def flush(self, *args, **kwargs):
3372 flush_called.append(True)
3373 return super().flush(*args, **kwargs)
3374 def write(self, *args, **kwargs):
3375 write_called.append(True)
3376 return super().write(*args, **kwargs)
3377
3378 rawio = self.BytesIO()
3379 data = b"a"
3380 bufio = BufferedWriter(rawio, len(data)*2)
3381 textio = self.TextIOWrapper(bufio, encoding='ascii',
3382 write_through=True)
3383 # write to the buffered io but don't overflow the buffer
3384 text = data.decode('ascii')
3385 textio.write(text)
3386
3387 # buffer.flush is not called with write_through=True
3388 self.assertFalse(flush_called)
3389 # buffer.write *is* called with write_through=True
3390 self.assertTrue(write_called)
3391 self.assertEqual(rawio.getvalue(), b"") # no flush
3392
3393 write_called = [] # reset
3394 textio.write(text * 10) # total content is larger than bufio buffer
3395 self.assertTrue(write_called)
3396 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3397
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003398 def test_reconfigure_write_through(self):
3399 raw = self.MockRawIO([])
3400 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3401 t.write('1')
3402 t.reconfigure(write_through=True) # implied flush
3403 self.assertEqual(t.write_through, True)
3404 self.assertEqual(b''.join(raw._write_stack), b'1')
3405 t.write('23')
3406 self.assertEqual(b''.join(raw._write_stack), b'123')
3407 t.reconfigure(write_through=False)
3408 self.assertEqual(t.write_through, False)
3409 t.write('45')
3410 t.flush()
3411 self.assertEqual(b''.join(raw._write_stack), b'12345')
3412 # Keeping default value
3413 t.reconfigure()
3414 t.reconfigure(write_through=None)
3415 self.assertEqual(t.write_through, False)
3416 t.reconfigure(write_through=True)
3417 t.reconfigure()
3418 t.reconfigure(write_through=None)
3419 self.assertEqual(t.write_through, True)
3420
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003421 def test_read_nonbytes(self):
3422 # Issue #17106
3423 # Crash when underlying read() returns non-bytes
3424 t = self.TextIOWrapper(self.StringIO('a'))
3425 self.assertRaises(TypeError, t.read, 1)
3426 t = self.TextIOWrapper(self.StringIO('a'))
3427 self.assertRaises(TypeError, t.readline)
3428 t = self.TextIOWrapper(self.StringIO('a'))
3429 self.assertRaises(TypeError, t.read)
3430
Oren Milmana5b4ea12017-08-25 21:14:54 +03003431 def test_illegal_encoder(self):
3432 # Issue 31271: Calling write() while the return value of encoder's
3433 # encode() is invalid shouldn't cause an assertion failure.
3434 rot13 = codecs.lookup("rot13")
3435 with support.swap_attr(rot13, '_is_text_encoding', True):
3436 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3437 self.assertRaises(TypeError, t.write, 'bar')
3438
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003439 def test_illegal_decoder(self):
3440 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003441 # Bypass the early encoding check added in issue 20404
3442 def _make_illegal_wrapper():
3443 quopri = codecs.lookup("quopri")
3444 quopri._is_text_encoding = True
3445 try:
3446 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3447 newline='\n', encoding="quopri")
3448 finally:
3449 quopri._is_text_encoding = False
3450 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003451 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003452 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003453 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003454 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003455 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003456 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003457 self.assertRaises(TypeError, t.read)
3458
Oren Milmanba7d7362017-08-29 11:58:27 +03003459 # Issue 31243: calling read() while the return value of decoder's
3460 # getstate() is invalid should neither crash the interpreter nor
3461 # raise a SystemError.
3462 def _make_very_illegal_wrapper(getstate_ret_val):
3463 class BadDecoder:
3464 def getstate(self):
3465 return getstate_ret_val
3466 def _get_bad_decoder(dummy):
3467 return BadDecoder()
3468 quopri = codecs.lookup("quopri")
3469 with support.swap_attr(quopri, 'incrementaldecoder',
3470 _get_bad_decoder):
3471 return _make_illegal_wrapper()
3472 t = _make_very_illegal_wrapper(42)
3473 self.assertRaises(TypeError, t.read, 42)
3474 t = _make_very_illegal_wrapper(())
3475 self.assertRaises(TypeError, t.read, 42)
3476 t = _make_very_illegal_wrapper((1, 2))
3477 self.assertRaises(TypeError, t.read, 42)
3478
Antoine Pitrou712cb732013-12-21 15:51:54 +01003479 def _check_create_at_shutdown(self, **kwargs):
3480 # Issue #20037: creating a TextIOWrapper at shutdown
3481 # shouldn't crash the interpreter.
3482 iomod = self.io.__name__
3483 code = """if 1:
3484 import codecs
3485 import {iomod} as io
3486
3487 # Avoid looking up codecs at shutdown
3488 codecs.lookup('utf-8')
3489
3490 class C:
3491 def __init__(self):
3492 self.buf = io.BytesIO()
3493 def __del__(self):
3494 io.TextIOWrapper(self.buf, **{kwargs})
3495 print("ok")
3496 c = C()
3497 """.format(iomod=iomod, kwargs=kwargs)
3498 return assert_python_ok("-c", code)
3499
3500 def test_create_at_shutdown_without_encoding(self):
3501 rc, out, err = self._check_create_at_shutdown()
3502 if err:
3503 # Can error out with a RuntimeError if the module state
3504 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003505 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003506 else:
3507 self.assertEqual("ok", out.decode().strip())
3508
3509 def test_create_at_shutdown_with_encoding(self):
3510 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3511 errors='strict')
3512 self.assertFalse(err)
3513 self.assertEqual("ok", out.decode().strip())
3514
Antoine Pitroub8503892014-04-29 10:14:02 +02003515 def test_read_byteslike(self):
3516 r = MemviewBytesIO(b'Just some random string\n')
3517 t = self.TextIOWrapper(r, 'utf-8')
3518
3519 # TextIOwrapper will not read the full string, because
3520 # we truncate it to a multiple of the native int size
3521 # so that we can construct a more complex memoryview.
3522 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3523
3524 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3525
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003526 def test_issue22849(self):
3527 class F(object):
3528 def readable(self): return True
3529 def writable(self): return True
3530 def seekable(self): return True
3531
3532 for i in range(10):
3533 try:
3534 self.TextIOWrapper(F(), encoding='utf-8')
3535 except Exception:
3536 pass
3537
3538 F.tell = lambda x: 0
3539 t = self.TextIOWrapper(F(), encoding='utf-8')
3540
INADA Naoki507434f2017-12-21 09:59:53 +09003541 def test_reconfigure_encoding_read(self):
3542 # latin1 -> utf8
3543 # (latin1 can decode utf-8 encoded string)
3544 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3545 raw = self.BytesIO(data)
3546 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3547 self.assertEqual(txt.readline(), 'abc\xe9\n')
3548 with self.assertRaises(self.UnsupportedOperation):
3549 txt.reconfigure(encoding='utf-8')
3550 with self.assertRaises(self.UnsupportedOperation):
3551 txt.reconfigure(newline=None)
3552
3553 def test_reconfigure_write_fromascii(self):
3554 # ascii has a specific encodefunc in the C implementation,
3555 # but utf-8-sig has not. Make sure that we get rid of the
3556 # cached encodefunc when we switch encoders.
3557 raw = self.BytesIO()
3558 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3559 txt.write('foo\n')
3560 txt.reconfigure(encoding='utf-8-sig')
3561 txt.write('\xe9\n')
3562 txt.flush()
3563 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3564
3565 def test_reconfigure_write(self):
3566 # latin -> utf8
3567 raw = self.BytesIO()
3568 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3569 txt.write('abc\xe9\n')
3570 txt.reconfigure(encoding='utf-8')
3571 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3572 txt.write('d\xe9f\n')
3573 txt.flush()
3574 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3575
3576 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3577 # the file
3578 raw = self.BytesIO()
3579 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3580 txt.write('abc\n')
3581 txt.reconfigure(encoding='utf-8-sig')
3582 txt.write('d\xe9f\n')
3583 txt.flush()
3584 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3585
3586 def test_reconfigure_write_non_seekable(self):
3587 raw = self.BytesIO()
3588 raw.seekable = lambda: False
3589 raw.seek = None
3590 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3591 txt.write('abc\n')
3592 txt.reconfigure(encoding='utf-8-sig')
3593 txt.write('d\xe9f\n')
3594 txt.flush()
3595
3596 # If the raw stream is not seekable, there'll be a BOM
3597 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3598
3599 def test_reconfigure_defaults(self):
3600 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3601 txt.reconfigure(encoding=None)
3602 self.assertEqual(txt.encoding, 'ascii')
3603 self.assertEqual(txt.errors, 'replace')
3604 txt.write('LF\n')
3605
3606 txt.reconfigure(newline='\r\n')
3607 self.assertEqual(txt.encoding, 'ascii')
3608 self.assertEqual(txt.errors, 'replace')
3609
3610 txt.reconfigure(errors='ignore')
3611 self.assertEqual(txt.encoding, 'ascii')
3612 self.assertEqual(txt.errors, 'ignore')
3613 txt.write('CRLF\n')
3614
3615 txt.reconfigure(encoding='utf-8', newline=None)
3616 self.assertEqual(txt.errors, 'strict')
3617 txt.seek(0)
3618 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3619
3620 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3621
3622 def test_reconfigure_newline(self):
3623 raw = self.BytesIO(b'CR\rEOF')
3624 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3625 txt.reconfigure(newline=None)
3626 self.assertEqual(txt.readline(), 'CR\n')
3627 raw = self.BytesIO(b'CR\rEOF')
3628 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3629 txt.reconfigure(newline='')
3630 self.assertEqual(txt.readline(), 'CR\r')
3631 raw = self.BytesIO(b'CR\rLF\nEOF')
3632 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3633 txt.reconfigure(newline='\n')
3634 self.assertEqual(txt.readline(), 'CR\rLF\n')
3635 raw = self.BytesIO(b'LF\nCR\rEOF')
3636 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3637 txt.reconfigure(newline='\r')
3638 self.assertEqual(txt.readline(), 'LF\nCR\r')
3639 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3640 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3641 txt.reconfigure(newline='\r\n')
3642 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3643
3644 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3645 txt.reconfigure(newline=None)
3646 txt.write('linesep\n')
3647 txt.reconfigure(newline='')
3648 txt.write('LF\n')
3649 txt.reconfigure(newline='\n')
3650 txt.write('LF\n')
3651 txt.reconfigure(newline='\r')
3652 txt.write('CR\n')
3653 txt.reconfigure(newline='\r\n')
3654 txt.write('CRLF\n')
3655 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3656 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3657
Zackery Spytz23db9352018-06-29 04:14:58 -06003658 def test_issue25862(self):
3659 # Assertion failures occurred in tell() after read() and write().
3660 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3661 t.read(1)
3662 t.read()
3663 t.tell()
3664 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3665 t.read(1)
3666 t.write('x')
3667 t.tell()
3668
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003669
Antoine Pitroub8503892014-04-29 10:14:02 +02003670class MemviewBytesIO(io.BytesIO):
3671 '''A BytesIO object whose read method returns memoryviews
3672 rather than bytes'''
3673
3674 def read1(self, len_):
3675 return _to_memoryview(super().read1(len_))
3676
3677 def read(self, len_):
3678 return _to_memoryview(super().read(len_))
3679
3680def _to_memoryview(buf):
3681 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3682
3683 arr = array.array('i')
3684 idx = len(buf) - len(buf) % arr.itemsize
3685 arr.frombytes(buf[:idx])
3686 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003687
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003689class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003690 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003691 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003692
3693 def test_initialization(self):
3694 r = self.BytesIO(b"\xc3\xa9\n\n")
3695 b = self.BufferedReader(r, 1000)
3696 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003697 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3698 self.assertRaises(ValueError, t.read)
3699
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003700 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3701 self.assertRaises(Exception, repr, t)
3702
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003703 def test_garbage_collection(self):
3704 # C TextIOWrapper objects are collected, and collecting them flushes
3705 # all data to disk.
3706 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003707 with support.check_warnings(('', ResourceWarning)):
3708 rawio = io.FileIO(support.TESTFN, "wb")
3709 b = self.BufferedWriter(rawio)
3710 t = self.TextIOWrapper(b, encoding="ascii")
3711 t.write("456def")
3712 t.x = t
3713 wr = weakref.ref(t)
3714 del t
3715 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003716 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003717 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003718 self.assertEqual(f.read(), b"456def")
3719
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003720 def test_rwpair_cleared_before_textio(self):
3721 # Issue 13070: TextIOWrapper's finalization would crash when called
3722 # after the reference to the underlying BufferedRWPair's writer got
3723 # cleared by the GC.
3724 for i in range(1000):
3725 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3726 t1 = self.TextIOWrapper(b1, encoding="ascii")
3727 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3728 t2 = self.TextIOWrapper(b2, encoding="ascii")
3729 # circular references
3730 t1.buddy = t2
3731 t2.buddy = t1
3732 support.gc_collect()
3733
Zackery Spytz842acaa2018-12-17 07:52:45 -07003734 def test_del__CHUNK_SIZE_SystemError(self):
3735 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3736 with self.assertRaises(AttributeError):
3737 del t._CHUNK_SIZE
3738
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003739
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003740class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003741 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003742 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003743
3744
3745class IncrementalNewlineDecoderTest(unittest.TestCase):
3746
3747 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003748 # UTF-8 specific tests for a newline decoder
3749 def _check_decode(b, s, **kwargs):
3750 # We exercise getstate() / setstate() as well as decode()
3751 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003752 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003753 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003754 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003755
Antoine Pitrou180a3362008-12-14 16:36:46 +00003756 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003757
Antoine Pitrou180a3362008-12-14 16:36:46 +00003758 _check_decode(b'\xe8', "")
3759 _check_decode(b'\xa2', "")
3760 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003761
Antoine Pitrou180a3362008-12-14 16:36:46 +00003762 _check_decode(b'\xe8', "")
3763 _check_decode(b'\xa2', "")
3764 _check_decode(b'\x88', "\u8888")
3765
3766 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003767 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3768
Antoine Pitrou180a3362008-12-14 16:36:46 +00003769 decoder.reset()
3770 _check_decode(b'\n', "\n")
3771 _check_decode(b'\r', "")
3772 _check_decode(b'', "\n", final=True)
3773 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003774
Antoine Pitrou180a3362008-12-14 16:36:46 +00003775 _check_decode(b'\r', "")
3776 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003777
Antoine Pitrou180a3362008-12-14 16:36:46 +00003778 _check_decode(b'\r\r\n', "\n\n")
3779 _check_decode(b'\r', "")
3780 _check_decode(b'\r', "\n")
3781 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003782
Antoine Pitrou180a3362008-12-14 16:36:46 +00003783 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3784 _check_decode(b'\xe8\xa2\x88', "\u8888")
3785 _check_decode(b'\n', "\n")
3786 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3787 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003789 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003790 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003791 if encoding is not None:
3792 encoder = codecs.getincrementalencoder(encoding)()
3793 def _decode_bytewise(s):
3794 # Decode one byte at a time
3795 for b in encoder.encode(s):
3796 result.append(decoder.decode(bytes([b])))
3797 else:
3798 encoder = None
3799 def _decode_bytewise(s):
3800 # Decode one char at a time
3801 for c in s:
3802 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003803 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003804 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003805 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003806 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003807 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003808 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003809 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003810 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003811 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003812 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003813 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003814 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003815 input = "abc"
3816 if encoder is not None:
3817 encoder.reset()
3818 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003819 self.assertEqual(decoder.decode(input), "abc")
3820 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003821
3822 def test_newline_decoder(self):
3823 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003824 # None meaning the IncrementalNewlineDecoder takes unicode input
3825 # rather than bytes input
3826 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003827 'utf-16', 'utf-16-le', 'utf-16-be',
3828 'utf-32', 'utf-32-le', 'utf-32-be',
3829 )
3830 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003831 decoder = enc and codecs.getincrementaldecoder(enc)()
3832 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3833 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003834 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003835 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3836 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003837 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003838
Antoine Pitrou66913e22009-03-06 23:40:56 +00003839 def test_newline_bytes(self):
3840 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3841 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003842 self.assertEqual(dec.newlines, None)
3843 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3844 self.assertEqual(dec.newlines, None)
3845 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3846 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003847 dec = self.IncrementalNewlineDecoder(None, translate=False)
3848 _check(dec)
3849 dec = self.IncrementalNewlineDecoder(None, translate=True)
3850 _check(dec)
3851
Xiang Zhangb08746b2018-10-31 19:49:16 +08003852 def test_translate(self):
3853 # issue 35062
3854 for translate in (-2, -1, 1, 2):
3855 decoder = codecs.getincrementaldecoder("utf-8")()
3856 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3857 self.check_newline_decoding_utf8(decoder)
3858 decoder = codecs.getincrementaldecoder("utf-8")()
3859 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3860 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003862class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3863 pass
3864
3865class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3866 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003867
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003868
Guido van Rossum01a27522007-03-07 01:00:12 +00003869# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003870
Guido van Rossum5abbf752007-08-27 17:39:33 +00003871class MiscIOTest(unittest.TestCase):
3872
Barry Warsaw40e82462008-11-20 20:14:50 +00003873 def tearDown(self):
3874 support.unlink(support.TESTFN)
3875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003876 def test___all__(self):
3877 for name in self.io.__all__:
3878 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003879 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003880 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003881 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003882 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003883 self.assertTrue(issubclass(obj, Exception), name)
3884 elif not name.startswith("SEEK_"):
3885 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003886
Barry Warsaw40e82462008-11-20 20:14:50 +00003887 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003888 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003889 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003890 f.close()
3891
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003892 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003893 self.assertEqual(f.mode, "w+")
3894 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3895 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003897 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003898 self.assertEqual(g.mode, "wb")
3899 self.assertEqual(g.raw.mode, "wb")
3900 self.assertEqual(g.name, f.fileno())
3901 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003902 f.close()
3903 g.close()
3904
Victor Stinnere471e722019-10-28 15:40:08 +01003905 def test_removed_u_mode(self):
3906 # "U" mode has been removed in Python 3.9
3907 for mode in ("U", "rU", "r+U"):
3908 with self.assertRaises(ValueError) as cm:
3909 self.open(support.TESTFN, mode)
3910 self.assertIn('invalid mode', str(cm.exception))
3911
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003912 def test_open_pipe_with_append(self):
3913 # bpo-27805: Ignore ESPIPE from lseek() in open().
3914 r, w = os.pipe()
3915 self.addCleanup(os.close, r)
3916 f = self.open(w, 'a')
3917 self.addCleanup(f.close)
3918 # Check that the file is marked non-seekable. On Windows, however, lseek
3919 # somehow succeeds on pipes.
3920 if sys.platform != 'win32':
3921 self.assertFalse(f.seekable())
3922
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003923 def test_io_after_close(self):
3924 for kwargs in [
3925 {"mode": "w"},
3926 {"mode": "wb"},
3927 {"mode": "w", "buffering": 1},
3928 {"mode": "w", "buffering": 2},
3929 {"mode": "wb", "buffering": 0},
3930 {"mode": "r"},
3931 {"mode": "rb"},
3932 {"mode": "r", "buffering": 1},
3933 {"mode": "r", "buffering": 2},
3934 {"mode": "rb", "buffering": 0},
3935 {"mode": "w+"},
3936 {"mode": "w+b"},
3937 {"mode": "w+", "buffering": 1},
3938 {"mode": "w+", "buffering": 2},
3939 {"mode": "w+b", "buffering": 0},
3940 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003941 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003942 f.close()
3943 self.assertRaises(ValueError, f.flush)
3944 self.assertRaises(ValueError, f.fileno)
3945 self.assertRaises(ValueError, f.isatty)
3946 self.assertRaises(ValueError, f.__iter__)
3947 if hasattr(f, "peek"):
3948 self.assertRaises(ValueError, f.peek, 1)
3949 self.assertRaises(ValueError, f.read)
3950 if hasattr(f, "read1"):
3951 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003952 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003953 if hasattr(f, "readall"):
3954 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003955 if hasattr(f, "readinto"):
3956 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003957 if hasattr(f, "readinto1"):
3958 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003959 self.assertRaises(ValueError, f.readline)
3960 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003961 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003962 self.assertRaises(ValueError, f.seek, 0)
3963 self.assertRaises(ValueError, f.tell)
3964 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003965 self.assertRaises(ValueError, f.write,
3966 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003967 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003968 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003969
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003970 def test_blockingioerror(self):
3971 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003972 class C(str):
3973 pass
3974 c = C("")
3975 b = self.BlockingIOError(1, c)
3976 c.b = b
3977 b.c = c
3978 wr = weakref.ref(c)
3979 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003980 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003981 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003982
3983 def test_abcs(self):
3984 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003985 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3986 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3987 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3988 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003989
3990 def _check_abc_inheritance(self, abcmodule):
3991 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003992 self.assertIsInstance(f, abcmodule.IOBase)
3993 self.assertIsInstance(f, abcmodule.RawIOBase)
3994 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3995 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003996 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003997 self.assertIsInstance(f, abcmodule.IOBase)
3998 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3999 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4000 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004001 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004002 self.assertIsInstance(f, abcmodule.IOBase)
4003 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4004 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4005 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004006
4007 def test_abc_inheritance(self):
4008 # Test implementations inherit from their respective ABCs
4009 self._check_abc_inheritance(self)
4010
4011 def test_abc_inheritance_official(self):
4012 # Test implementations inherit from the official ABCs of the
4013 # baseline "io" module.
4014 self._check_abc_inheritance(io)
4015
Antoine Pitroue033e062010-10-29 10:38:18 +00004016 def _check_warn_on_dealloc(self, *args, **kwargs):
4017 f = open(*args, **kwargs)
4018 r = repr(f)
4019 with self.assertWarns(ResourceWarning) as cm:
4020 f = None
4021 support.gc_collect()
4022 self.assertIn(r, str(cm.warning.args[0]))
4023
4024 def test_warn_on_dealloc(self):
4025 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4026 self._check_warn_on_dealloc(support.TESTFN, "wb")
4027 self._check_warn_on_dealloc(support.TESTFN, "w")
4028
4029 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4030 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004031 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004032 for fd in fds:
4033 try:
4034 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004035 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004036 if e.errno != errno.EBADF:
4037 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004038 self.addCleanup(cleanup_fds)
4039 r, w = os.pipe()
4040 fds += r, w
4041 self._check_warn_on_dealloc(r, *args, **kwargs)
4042 # When using closefd=False, there's no warning
4043 r, w = os.pipe()
4044 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004045 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004046 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004047
4048 def test_warn_on_dealloc_fd(self):
4049 self._check_warn_on_dealloc_fd("rb", buffering=0)
4050 self._check_warn_on_dealloc_fd("rb")
4051 self._check_warn_on_dealloc_fd("r")
4052
4053
Antoine Pitrou243757e2010-11-05 21:15:39 +00004054 def test_pickling(self):
4055 # Pickling file objects is forbidden
4056 for kwargs in [
4057 {"mode": "w"},
4058 {"mode": "wb"},
4059 {"mode": "wb", "buffering": 0},
4060 {"mode": "r"},
4061 {"mode": "rb"},
4062 {"mode": "rb", "buffering": 0},
4063 {"mode": "w+"},
4064 {"mode": "w+b"},
4065 {"mode": "w+b", "buffering": 0},
4066 ]:
4067 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4068 with self.open(support.TESTFN, **kwargs) as f:
4069 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4070
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004071 def test_nonblock_pipe_write_bigbuf(self):
4072 self._test_nonblock_pipe_write(16*1024)
4073
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004074 def test_nonblock_pipe_write_smallbuf(self):
4075 self._test_nonblock_pipe_write(1024)
4076
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004077 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4078 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004079 def _test_nonblock_pipe_write(self, bufsize):
4080 sent = []
4081 received = []
4082 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004083 os.set_blocking(r, False)
4084 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004085
4086 # To exercise all code paths in the C implementation we need
4087 # to play with buffer sizes. For instance, if we choose a
4088 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4089 # then we will never get a partial write of the buffer.
4090 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4091 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4092
4093 with rf, wf:
4094 for N in 9999, 73, 7574:
4095 try:
4096 i = 0
4097 while True:
4098 msg = bytes([i % 26 + 97]) * N
4099 sent.append(msg)
4100 wf.write(msg)
4101 i += 1
4102
4103 except self.BlockingIOError as e:
4104 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004105 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004106 sent[-1] = sent[-1][:e.characters_written]
4107 received.append(rf.read())
4108 msg = b'BLOCKED'
4109 wf.write(msg)
4110 sent.append(msg)
4111
4112 while True:
4113 try:
4114 wf.flush()
4115 break
4116 except self.BlockingIOError as e:
4117 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004118 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004119 self.assertEqual(e.characters_written, 0)
4120 received.append(rf.read())
4121
4122 received += iter(rf.read, None)
4123
4124 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004125 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004126 self.assertTrue(wf.closed)
4127 self.assertTrue(rf.closed)
4128
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004129 def test_create_fail(self):
4130 # 'x' mode fails if file is existing
4131 with self.open(support.TESTFN, 'w'):
4132 pass
4133 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4134
4135 def test_create_writes(self):
4136 # 'x' mode opens for writing
4137 with self.open(support.TESTFN, 'xb') as f:
4138 f.write(b"spam")
4139 with self.open(support.TESTFN, 'rb') as f:
4140 self.assertEqual(b"spam", f.read())
4141
Christian Heimes7b648752012-09-10 14:48:43 +02004142 def test_open_allargs(self):
4143 # there used to be a buffer overflow in the parser for rawmode
4144 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4145
Victor Stinner22eb6892019-06-26 00:51:05 +02004146 def test_check_encoding_errors(self):
4147 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4148 # arguments in dev mode
4149 mod = self.io.__name__
4150 filename = __file__
4151 invalid = 'Boom, Shaka Laka, Boom!'
4152 code = textwrap.dedent(f'''
4153 import sys
4154 from {mod} import open, TextIOWrapper
4155
4156 try:
4157 open({filename!r}, encoding={invalid!r})
4158 except LookupError:
4159 pass
4160 else:
4161 sys.exit(21)
4162
4163 try:
4164 open({filename!r}, errors={invalid!r})
4165 except LookupError:
4166 pass
4167 else:
4168 sys.exit(22)
4169
4170 fp = open({filename!r}, "rb")
4171 with fp:
4172 try:
4173 TextIOWrapper(fp, encoding={invalid!r})
4174 except LookupError:
4175 pass
4176 else:
4177 sys.exit(23)
4178
4179 try:
4180 TextIOWrapper(fp, errors={invalid!r})
4181 except LookupError:
4182 pass
4183 else:
4184 sys.exit(24)
4185
4186 sys.exit(10)
4187 ''')
4188 proc = assert_python_failure('-X', 'dev', '-c', code)
4189 self.assertEqual(proc.rc, 10, proc)
4190
Christian Heimes7b648752012-09-10 14:48:43 +02004191
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004192class CMiscIOTest(MiscIOTest):
4193 io = io
4194
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004195 def test_readinto_buffer_overflow(self):
4196 # Issue #18025
4197 class BadReader(self.io.BufferedIOBase):
4198 def read(self, n=-1):
4199 return b'x' * 10**6
4200 bufio = BadReader()
4201 b = bytearray(2)
4202 self.assertRaises(ValueError, bufio.readinto, b)
4203
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004204 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4205 # Issue #23309: deadlocks at shutdown should be avoided when a
4206 # daemon thread and the main thread both write to a file.
4207 code = """if 1:
4208 import sys
4209 import time
4210 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004211 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004212
4213 file = sys.{stream_name}
4214
4215 def run():
4216 while True:
4217 file.write('.')
4218 file.flush()
4219
Victor Stinner2a1aed02017-04-21 17:59:23 +02004220 crash = SuppressCrashReport()
4221 crash.__enter__()
4222 # don't call __exit__(): the crash occurs at Python shutdown
4223
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004224 thread = threading.Thread(target=run)
4225 thread.daemon = True
4226 thread.start()
4227
4228 time.sleep(0.5)
4229 file.write('!')
4230 file.flush()
4231 """.format_map(locals())
4232 res, _ = run_python_until_end("-c", code)
4233 err = res.err.decode()
4234 if res.rc != 0:
4235 # Failure: should be a fatal error
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004236 pattern = (r"Fatal Python error: could not acquire lock "
4237 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4238 r"at interpreter shutdown, possibly due to "
4239 r"daemon threads".format_map(locals()))
4240 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004241 else:
4242 self.assertFalse(err.strip('.!'))
4243
4244 def test_daemon_threads_shutdown_stdout_deadlock(self):
4245 self.check_daemon_threads_shutdown_deadlock('stdout')
4246
4247 def test_daemon_threads_shutdown_stderr_deadlock(self):
4248 self.check_daemon_threads_shutdown_deadlock('stderr')
4249
4250
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004251class PyMiscIOTest(MiscIOTest):
4252 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004253
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004254
4255@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4256class SignalsTest(unittest.TestCase):
4257
4258 def setUp(self):
4259 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4260
4261 def tearDown(self):
4262 signal.signal(signal.SIGALRM, self.oldalrm)
4263
4264 def alarm_interrupt(self, sig, frame):
4265 1/0
4266
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004267 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4268 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004269 invokes the signal handler, and bubbles up the exception raised
4270 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004271 read_results = []
4272 def _read():
4273 s = os.read(r, 1)
4274 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004275
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004276 t = threading.Thread(target=_read)
4277 t.daemon = True
4278 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004279 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004280 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004281 try:
4282 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004283 if hasattr(signal, 'pthread_sigmask'):
4284 # create the thread with SIGALRM signal blocked
4285 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4286 t.start()
4287 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4288 else:
4289 t.start()
4290
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004291 # Fill the pipe enough that the write will be blocking.
4292 # It will be interrupted by the timer armed above. Since the
4293 # other thread has read one byte, the low-level write will
4294 # return with a successful (partial) result rather than an EINTR.
4295 # The buffered IO layer must check for pending signal
4296 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004297 signal.alarm(1)
4298 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004299 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004300 finally:
4301 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004302 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004303 # We got one byte, get another one and check that it isn't a
4304 # repeat of the first one.
4305 read_results.append(os.read(r, 1))
4306 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4307 finally:
4308 os.close(w)
4309 os.close(r)
4310 # This is deliberate. If we didn't close the file descriptor
4311 # before closing wio, wio would try to flush its internal
4312 # buffer, and block again.
4313 try:
4314 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004315 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004316 if e.errno != errno.EBADF:
4317 raise
4318
4319 def test_interrupted_write_unbuffered(self):
4320 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4321
4322 def test_interrupted_write_buffered(self):
4323 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4324
4325 def test_interrupted_write_text(self):
4326 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4327
Brett Cannon31f59292011-02-21 19:29:56 +00004328 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004329 def check_reentrant_write(self, data, **fdopen_kwargs):
4330 def on_alarm(*args):
4331 # Will be called reentrantly from the same thread
4332 wio.write(data)
4333 1/0
4334 signal.signal(signal.SIGALRM, on_alarm)
4335 r, w = os.pipe()
4336 wio = self.io.open(w, **fdopen_kwargs)
4337 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004338 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004339 # Either the reentrant call to wio.write() fails with RuntimeError,
4340 # or the signal handler raises ZeroDivisionError.
4341 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4342 while 1:
4343 for i in range(100):
4344 wio.write(data)
4345 wio.flush()
4346 # Make sure the buffer doesn't fill up and block further writes
4347 os.read(r, len(data) * 100)
4348 exc = cm.exception
4349 if isinstance(exc, RuntimeError):
4350 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4351 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004352 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004353 wio.close()
4354 os.close(r)
4355
4356 def test_reentrant_write_buffered(self):
4357 self.check_reentrant_write(b"xy", mode="wb")
4358
4359 def test_reentrant_write_text(self):
4360 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4361
Antoine Pitrou707ce822011-02-25 21:24:11 +00004362 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4363 """Check that a buffered read, when it gets interrupted (either
4364 returning a partial result or EINTR), properly invokes the signal
4365 handler and retries if the latter returned successfully."""
4366 r, w = os.pipe()
4367 fdopen_kwargs["closefd"] = False
4368 def alarm_handler(sig, frame):
4369 os.write(w, b"bar")
4370 signal.signal(signal.SIGALRM, alarm_handler)
4371 try:
4372 rio = self.io.open(r, **fdopen_kwargs)
4373 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004374 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004375 # Expected behaviour:
4376 # - first raw read() returns partial b"foo"
4377 # - second raw read() returns EINTR
4378 # - third raw read() returns b"bar"
4379 self.assertEqual(decode(rio.read(6)), "foobar")
4380 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004381 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004382 rio.close()
4383 os.close(w)
4384 os.close(r)
4385
Antoine Pitrou20db5112011-08-19 20:32:34 +02004386 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004387 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4388 mode="rb")
4389
Antoine Pitrou20db5112011-08-19 20:32:34 +02004390 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004391 self.check_interrupted_read_retry(lambda x: x,
4392 mode="r")
4393
Antoine Pitrou707ce822011-02-25 21:24:11 +00004394 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4395 """Check that a buffered write, when it gets interrupted (either
4396 returning a partial result or EINTR), properly invokes the signal
4397 handler and retries if the latter returned successfully."""
4398 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004399
Antoine Pitrou707ce822011-02-25 21:24:11 +00004400 # A quantity that exceeds the buffer size of an anonymous pipe's
4401 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004402 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004403 r, w = os.pipe()
4404 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004405
Antoine Pitrou707ce822011-02-25 21:24:11 +00004406 # We need a separate thread to read from the pipe and allow the
4407 # write() to finish. This thread is started after the SIGALRM is
4408 # received (forcing a first EINTR in write()).
4409 read_results = []
4410 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004411 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004412 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004413 try:
4414 while not write_finished:
4415 while r in select.select([r], [], [], 1.0)[0]:
4416 s = os.read(r, 1024)
4417 read_results.append(s)
4418 except BaseException as exc:
4419 nonlocal error
4420 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004421 t = threading.Thread(target=_read)
4422 t.daemon = True
4423 def alarm1(sig, frame):
4424 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004425 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004426 def alarm2(sig, frame):
4427 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004428
4429 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004430 signal.signal(signal.SIGALRM, alarm1)
4431 try:
4432 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004433 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004434 # Expected behaviour:
4435 # - first raw write() is partial (because of the limited pipe buffer
4436 # and the first alarm)
4437 # - second raw write() returns EINTR (because of the second alarm)
4438 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004439 written = wio.write(large_data)
4440 self.assertEqual(N, written)
4441
Antoine Pitrou707ce822011-02-25 21:24:11 +00004442 wio.flush()
4443 write_finished = True
4444 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004445
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004446 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004447 self.assertEqual(N, sum(len(x) for x in read_results))
4448 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004449 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004450 write_finished = True
4451 os.close(w)
4452 os.close(r)
4453 # This is deliberate. If we didn't close the file descriptor
4454 # before closing wio, wio would try to flush its internal
4455 # buffer, and could block (in case of failure).
4456 try:
4457 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004458 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004459 if e.errno != errno.EBADF:
4460 raise
4461
Antoine Pitrou20db5112011-08-19 20:32:34 +02004462 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004463 self.check_interrupted_write_retry(b"x", mode="wb")
4464
Antoine Pitrou20db5112011-08-19 20:32:34 +02004465 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004466 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4467
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004468
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004469class CSignalsTest(SignalsTest):
4470 io = io
4471
4472class PySignalsTest(SignalsTest):
4473 io = pyio
4474
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004475 # Handling reentrancy issues would slow down _pyio even more, so the
4476 # tests are disabled.
4477 test_reentrant_write_buffered = None
4478 test_reentrant_write_text = None
4479
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004480
Ezio Melottidaa42c72013-03-23 16:30:16 +02004481def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004482 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004483 CBufferedReaderTest, PyBufferedReaderTest,
4484 CBufferedWriterTest, PyBufferedWriterTest,
4485 CBufferedRWPairTest, PyBufferedRWPairTest,
4486 CBufferedRandomTest, PyBufferedRandomTest,
4487 StatefulIncrementalDecoderTest,
4488 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4489 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004490 CMiscIOTest, PyMiscIOTest,
4491 CSignalsTest, PySignalsTest,
4492 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004493
4494 # Put the namespaces of the IO module we are testing and some useful mock
4495 # classes in the __dict__ of each test.
4496 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004497 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4498 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004499 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4500 c_io_ns = {name : getattr(io, name) for name in all_members}
4501 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4502 globs = globals()
4503 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4504 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4505 # Avoid turning open into a bound method.
4506 py_io_ns["open"] = pyio.OpenWrapper
4507 for test in tests:
4508 if test.__name__.startswith("C"):
4509 for name, obj in c_io_ns.items():
4510 setattr(test, name, obj)
4511 elif test.__name__.startswith("Py"):
4512 for name, obj in py_io_ns.items():
4513 setattr(test, name, obj)
4514
Ezio Melottidaa42c72013-03-23 16:30:16 +02004515 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4516 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004517
4518if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004519 unittest.main()