blob: c27dfd96bc00dd3aff086c65f044d7703a6c29ab [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Serhiy Storchakab21d1552018-03-02 11:53:51 +020043from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000044
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000045import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000046import io # C implementation of io
47import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Martin Panter6bb91f32016-05-28 00:41:57 +000049try:
50 import ctypes
51except ImportError:
52 def byteslike(*pos, **kw):
53 return array.array("b", bytes(*pos, **kw))
54else:
55 def byteslike(*pos, **kw):
56 """Create a bytes-like object having no string or sequence methods"""
57 data = bytes(*pos, **kw)
58 obj = EmptyStruct()
59 ctypes.resize(obj, len(data))
60 memoryview(obj).cast("B")[:] = data
61 return obj
62 class EmptyStruct(ctypes.Structure):
63 pass
64
Gregory P. Smithe5796c42018-12-30 20:17:57 -080065_cflags = sysconfig.get_config_var('CFLAGS') or ''
66_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
67MEMORY_SANITIZER = (
68 '-fsanitize=memory' in _cflags or
69 '--with-memory-sanitizer' in _config_args
70)
71
Victor Stinnerbc2aa812019-05-23 03:45:09 +020072# Does io.IOBase finalizer log the exception if the close() method fails?
73# The exception is ignored silently by default in release build.
74IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020075
76
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077def _default_chunk_size():
78 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000079 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return f._CHUNK_SIZE
81
82
Antoine Pitrou328ec742010-09-14 18:37:24 +000083class MockRawIOWithoutRead:
84 """A RawIO implementation without read(), so as to exercise the default
85 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000086
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000087 def __init__(self, read_stack=()):
88 self._read_stack = list(read_stack)
89 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000090 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000092
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000095 return len(b)
96
97 def writable(self):
98 return True
99
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000100 def fileno(self):
101 return 42
102
103 def readable(self):
104 return True
105
Guido van Rossum01a27522007-03-07 01:00:12 +0000106 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000107 return True
108
Guido van Rossum01a27522007-03-07 01:00:12 +0000109 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000110 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000111
112 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000113 return 0 # same comment as above
114
115 def readinto(self, buf):
116 self._reads += 1
117 max_len = len(buf)
118 try:
119 data = self._read_stack[0]
120 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000121 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000122 return 0
123 if data is None:
124 del self._read_stack[0]
125 return None
126 n = len(data)
127 if len(data) <= max_len:
128 del self._read_stack[0]
129 buf[:n] = data
130 return n
131 else:
132 buf[:] = data[:max_len]
133 self._read_stack[0] = data[max_len:]
134 return max_len
135
136 def truncate(self, pos=None):
137 return pos
138
Antoine Pitrou328ec742010-09-14 18:37:24 +0000139class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
140 pass
141
142class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
143 pass
144
145
146class MockRawIO(MockRawIOWithoutRead):
147
148 def read(self, n=None):
149 self._reads += 1
150 try:
151 return self._read_stack.pop(0)
152 except:
153 self._extraneous_reads += 1
154 return b""
155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000156class CMockRawIO(MockRawIO, io.RawIOBase):
157 pass
158
159class PyMockRawIO(MockRawIO, pyio.RawIOBase):
160 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000161
Guido van Rossuma9e20242007-03-08 00:43:48 +0000162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163class MisbehavedRawIO(MockRawIO):
164 def write(self, b):
165 return super().write(b) * 2
166
167 def read(self, n=None):
168 return super().read(n) * 2
169
170 def seek(self, pos, whence):
171 return -123
172
173 def tell(self):
174 return -456
175
176 def readinto(self, buf):
177 super().readinto(buf)
178 return len(buf) * 5
179
180class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
181 pass
182
183class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
184 pass
185
186
benfogle9703f092017-11-10 16:03:40 -0500187class SlowFlushRawIO(MockRawIO):
188 def __init__(self):
189 super().__init__()
190 self.in_flush = threading.Event()
191
192 def flush(self):
193 self.in_flush.set()
194 time.sleep(0.25)
195
196class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
197 pass
198
199class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
200 pass
201
202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000203class CloseFailureIO(MockRawIO):
204 closed = 0
205
206 def close(self):
207 if not self.closed:
208 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200209 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000210
211class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
212 pass
213
214class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
215 pass
216
217
218class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000219
220 def __init__(self, data):
221 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000223
224 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000225 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000226 self.read_history.append(None if res is None else len(res))
227 return res
228
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000229 def readinto(self, b):
230 res = super().readinto(b)
231 self.read_history.append(res)
232 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000234class CMockFileIO(MockFileIO, io.BytesIO):
235 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237class PyMockFileIO(MockFileIO, pyio.BytesIO):
238 pass
239
240
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000241class MockUnseekableIO:
242 def seekable(self):
243 return False
244
245 def seek(self, *args):
246 raise self.UnsupportedOperation("not seekable")
247
248 def tell(self, *args):
249 raise self.UnsupportedOperation("not seekable")
250
Martin Panter754aab22016-03-31 07:21:56 +0000251 def truncate(self, *args):
252 raise self.UnsupportedOperation("not seekable")
253
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000254class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
255 UnsupportedOperation = io.UnsupportedOperation
256
257class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
258 UnsupportedOperation = pyio.UnsupportedOperation
259
260
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000261class MockNonBlockWriterIO:
262
263 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000264 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000267 def pop_written(self):
268 s = b"".join(self._write_stack)
269 self._write_stack[:] = []
270 return s
271
272 def block_on(self, char):
273 """Block when a given char is encountered."""
274 self._blocker_char = char
275
276 def readable(self):
277 return True
278
279 def seekable(self):
280 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000281
Victor Stinnerb589cef2019-06-11 03:10:59 +0200282 def seek(self, pos, whence=0):
283 # naive implementation, enough for tests
284 return 0
285
Guido van Rossum01a27522007-03-07 01:00:12 +0000286 def writable(self):
287 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000289 def write(self, b):
290 b = bytes(b)
291 n = -1
292 if self._blocker_char:
293 try:
294 n = b.index(self._blocker_char)
295 except ValueError:
296 pass
297 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100298 if n > 0:
299 # write data up to the first blocker
300 self._write_stack.append(b[:n])
301 return n
302 else:
303 # cancel blocker and indicate would block
304 self._blocker_char = None
305 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000306 self._write_stack.append(b)
307 return len(b)
308
309class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
310 BlockingIOError = io.BlockingIOError
311
312class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
313 BlockingIOError = pyio.BlockingIOError
314
Guido van Rossuma9e20242007-03-08 00:43:48 +0000315
Guido van Rossum28524c72007-02-27 05:47:44 +0000316class IOTest(unittest.TestCase):
317
Neal Norwitze7789b12008-03-24 06:18:09 +0000318 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000319 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000320
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000321 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000322 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000323
Guido van Rossum28524c72007-02-27 05:47:44 +0000324 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000326 f.truncate(0)
327 self.assertEqual(f.tell(), 5)
328 f.seek(0)
329
330 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000331 self.assertEqual(f.seek(0), 0)
332 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000335 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000336 buffer = bytearray(b" world\n\n\n")
337 self.assertEqual(f.write(buffer), 9)
338 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000340 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000341 self.assertEqual(f.seek(-1, 2), 13)
342 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000343
Guido van Rossum87429772007-04-10 21:06:59 +0000344 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000345 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000346 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000347
Guido van Rossum9b76da62007-04-11 01:09:03 +0000348 def read_ops(self, f, buffered=False):
349 data = f.read(5)
350 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000351 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000352 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000353 self.assertEqual(bytes(data), b" worl")
354 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000355 self.assertEqual(f.readinto(data), 2)
356 self.assertEqual(len(data), 5)
357 self.assertEqual(data[:2], b"d\n")
358 self.assertEqual(f.seek(0), 0)
359 self.assertEqual(f.read(20), b"hello world\n")
360 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000361 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000362 self.assertEqual(f.seek(-6, 2), 6)
363 self.assertEqual(f.read(5), b"world")
364 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000365 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000366 self.assertEqual(f.seek(-6, 1), 5)
367 self.assertEqual(f.read(5), b" worl")
368 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000369 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000370 if buffered:
371 f.seek(0)
372 self.assertEqual(f.read(), b"hello world\n")
373 f.seek(6)
374 self.assertEqual(f.read(), b"world\n")
375 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000376 f.seek(0)
377 data = byteslike(5)
378 self.assertEqual(f.readinto1(data), 5)
379 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000380
Guido van Rossum34d69e52007-04-10 20:08:41 +0000381 LARGE = 2**31
382
Guido van Rossum53807da2007-04-10 19:01:47 +0000383 def large_file_ops(self, f):
384 assert f.readable()
385 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100386 try:
387 self.assertEqual(f.seek(self.LARGE), self.LARGE)
388 except (OverflowError, ValueError):
389 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000390 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000391 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000392 self.assertEqual(f.tell(), self.LARGE + 3)
393 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000394 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000395 self.assertEqual(f.tell(), self.LARGE + 2)
396 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000397 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000398 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000399 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
400 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000401 self.assertEqual(f.read(2), b"x")
402
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000403 def test_invalid_operations(self):
404 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000405 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000406 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000407 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000408 self.assertRaises(exc, fp.read)
409 self.assertRaises(exc, fp.readline)
410 with self.open(support.TESTFN, "wb", buffering=0) as fp:
411 self.assertRaises(exc, fp.read)
412 self.assertRaises(exc, fp.readline)
413 with self.open(support.TESTFN, "rb", buffering=0) as fp:
414 self.assertRaises(exc, fp.write, b"blah")
415 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000416 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000417 self.assertRaises(exc, fp.write, b"blah")
418 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000419 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000420 self.assertRaises(exc, fp.write, "blah")
421 self.assertRaises(exc, fp.writelines, ["blah\n"])
422 # Non-zero seeking from current or end pos
423 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
424 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000425
Martin Panter754aab22016-03-31 07:21:56 +0000426 def test_optional_abilities(self):
427 # Test for OSError when optional APIs are not supported
428 # The purpose of this test is to try fileno(), reading, writing and
429 # seeking operations with various objects that indicate they do not
430 # support these operations.
431
432 def pipe_reader():
433 [r, w] = os.pipe()
434 os.close(w) # So that read() is harmless
435 return self.FileIO(r, "r")
436
437 def pipe_writer():
438 [r, w] = os.pipe()
439 self.addCleanup(os.close, r)
440 # Guarantee that we can write into the pipe without blocking
441 thread = threading.Thread(target=os.read, args=(r, 100))
442 thread.start()
443 self.addCleanup(thread.join)
444 return self.FileIO(w, "w")
445
446 def buffered_reader():
447 return self.BufferedReader(self.MockUnseekableIO())
448
449 def buffered_writer():
450 return self.BufferedWriter(self.MockUnseekableIO())
451
452 def buffered_random():
453 return self.BufferedRandom(self.BytesIO())
454
455 def buffered_rw_pair():
456 return self.BufferedRWPair(self.MockUnseekableIO(),
457 self.MockUnseekableIO())
458
459 def text_reader():
460 class UnseekableReader(self.MockUnseekableIO):
461 writable = self.BufferedIOBase.writable
462 write = self.BufferedIOBase.write
463 return self.TextIOWrapper(UnseekableReader(), "ascii")
464
465 def text_writer():
466 class UnseekableWriter(self.MockUnseekableIO):
467 readable = self.BufferedIOBase.readable
468 read = self.BufferedIOBase.read
469 return self.TextIOWrapper(UnseekableWriter(), "ascii")
470
471 tests = (
472 (pipe_reader, "fr"), (pipe_writer, "fw"),
473 (buffered_reader, "r"), (buffered_writer, "w"),
474 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
475 (text_reader, "r"), (text_writer, "w"),
476 (self.BytesIO, "rws"), (self.StringIO, "rws"),
477 )
478 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000479 with self.subTest(test), test() as obj:
480 readable = "r" in abilities
481 self.assertEqual(obj.readable(), readable)
482 writable = "w" in abilities
483 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000484
485 if isinstance(obj, self.TextIOBase):
486 data = "3"
487 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
488 data = b"3"
489 else:
490 self.fail("Unknown base class")
491
492 if "f" in abilities:
493 obj.fileno()
494 else:
495 self.assertRaises(OSError, obj.fileno)
496
497 if readable:
498 obj.read(1)
499 obj.read()
500 else:
501 self.assertRaises(OSError, obj.read, 1)
502 self.assertRaises(OSError, obj.read)
503
504 if writable:
505 obj.write(data)
506 else:
507 self.assertRaises(OSError, obj.write, data)
508
Martin Panter3ee147f2016-03-31 21:05:31 +0000509 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000510 pipe_reader, pipe_writer):
511 # Pipes seem to appear as seekable on Windows
512 continue
513 seekable = "s" in abilities
514 self.assertEqual(obj.seekable(), seekable)
515
Martin Panter754aab22016-03-31 07:21:56 +0000516 if seekable:
517 obj.tell()
518 obj.seek(0)
519 else:
520 self.assertRaises(OSError, obj.tell)
521 self.assertRaises(OSError, obj.seek, 0)
522
523 if writable and seekable:
524 obj.truncate()
525 obj.truncate(0)
526 else:
527 self.assertRaises(OSError, obj.truncate)
528 self.assertRaises(OSError, obj.truncate, 0)
529
Antoine Pitrou13348842012-01-29 18:36:34 +0100530 def test_open_handles_NUL_chars(self):
531 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300532 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100533
534 bytes_fn = bytes(fn_with_NUL, 'ascii')
535 with warnings.catch_warnings():
536 warnings.simplefilter("ignore", DeprecationWarning)
537 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100538
Guido van Rossum28524c72007-02-27 05:47:44 +0000539 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000540 with self.open(support.TESTFN, "wb", buffering=0) as f:
541 self.assertEqual(f.readable(), False)
542 self.assertEqual(f.writable(), True)
543 self.assertEqual(f.seekable(), True)
544 self.write_ops(f)
545 with self.open(support.TESTFN, "rb", buffering=0) as f:
546 self.assertEqual(f.readable(), True)
547 self.assertEqual(f.writable(), False)
548 self.assertEqual(f.seekable(), True)
549 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000550
Guido van Rossum87429772007-04-10 21:06:59 +0000551 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000552 with self.open(support.TESTFN, "wb") as f:
553 self.assertEqual(f.readable(), False)
554 self.assertEqual(f.writable(), True)
555 self.assertEqual(f.seekable(), True)
556 self.write_ops(f)
557 with self.open(support.TESTFN, "rb") as f:
558 self.assertEqual(f.readable(), True)
559 self.assertEqual(f.writable(), False)
560 self.assertEqual(f.seekable(), True)
561 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000562
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000563 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000564 with self.open(support.TESTFN, "wb") as f:
565 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
566 with self.open(support.TESTFN, "rb") as f:
567 self.assertEqual(f.readline(), b"abc\n")
568 self.assertEqual(f.readline(10), b"def\n")
569 self.assertEqual(f.readline(2), b"xy")
570 self.assertEqual(f.readline(4), b"zzy\n")
571 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000572 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000573 self.assertRaises(TypeError, f.readline, 5.3)
574 with self.open(support.TESTFN, "r") as f:
575 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000576
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300577 def test_readline_nonsizeable(self):
578 # Issue #30061
579 # Crash when readline() returns an object without __len__
580 class R(self.IOBase):
581 def readline(self):
582 return None
583 self.assertRaises((TypeError, StopIteration), next, R())
584
585 def test_next_nonsizeable(self):
586 # Issue #30061
587 # Crash when __next__() returns an object without __len__
588 class R(self.IOBase):
589 def __next__(self):
590 return None
591 self.assertRaises(TypeError, R().readlines, 1)
592
Guido van Rossum28524c72007-02-27 05:47:44 +0000593 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000594 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000595 self.write_ops(f)
596 data = f.getvalue()
597 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000599 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000600
Guido van Rossum53807da2007-04-10 19:01:47 +0000601 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300602 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800603 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000604 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200605 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600606 support.requires(
607 'largefile',
608 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 with self.open(support.TESTFN, "w+b", 0) as f:
610 self.large_file_ops(f)
611 with self.open(support.TESTFN, "w+b") as f:
612 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000613
614 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300615 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000616 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000617 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000618 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000619 self.assertEqual(f.closed, True)
620 f = None
621 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000622 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000623 1/0
624 except ZeroDivisionError:
625 self.assertEqual(f.closed, True)
626 else:
627 self.fail("1/0 didn't raise an exception")
628
Antoine Pitrou08838b62009-01-21 00:55:13 +0000629 # issue 5008
630 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000632 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000634 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000636 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300638 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000639
Guido van Rossum87429772007-04-10 21:06:59 +0000640 def test_destructor(self):
641 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000642 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000643 def __del__(self):
644 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000645 try:
646 f = super().__del__
647 except AttributeError:
648 pass
649 else:
650 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000651 def close(self):
652 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000653 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000654 def flush(self):
655 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000657 with support.check_warnings(('', ResourceWarning)):
658 f = MyFileIO(support.TESTFN, "wb")
659 f.write(b"xxx")
660 del f
661 support.gc_collect()
662 self.assertEqual(record, [1, 2, 3])
663 with self.open(support.TESTFN, "rb") as f:
664 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665
666 def _check_base_destructor(self, base):
667 record = []
668 class MyIO(base):
669 def __init__(self):
670 # This exercises the availability of attributes on object
671 # destruction.
672 # (in the C version, close() is called by the tp_dealloc
673 # function, not by __del__)
674 self.on_del = 1
675 self.on_close = 2
676 self.on_flush = 3
677 def __del__(self):
678 record.append(self.on_del)
679 try:
680 f = super().__del__
681 except AttributeError:
682 pass
683 else:
684 f()
685 def close(self):
686 record.append(self.on_close)
687 super().close()
688 def flush(self):
689 record.append(self.on_flush)
690 super().flush()
691 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000692 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000693 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000694 self.assertEqual(record, [1, 2, 3])
695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 def test_IOBase_destructor(self):
697 self._check_base_destructor(self.IOBase)
698
699 def test_RawIOBase_destructor(self):
700 self._check_base_destructor(self.RawIOBase)
701
702 def test_BufferedIOBase_destructor(self):
703 self._check_base_destructor(self.BufferedIOBase)
704
705 def test_TextIOBase_destructor(self):
706 self._check_base_destructor(self.TextIOBase)
707
Guido van Rossum87429772007-04-10 21:06:59 +0000708 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000709 with self.open(support.TESTFN, "wb") as f:
710 f.write(b"xxx")
711 with self.open(support.TESTFN, "rb") as f:
712 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000713
Guido van Rossumd4103952007-04-12 05:44:49 +0000714 def test_array_writes(self):
715 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000716 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000717 def check(f):
718 with f:
719 self.assertEqual(f.write(a), n)
720 f.writelines((a,))
721 check(self.BytesIO())
722 check(self.FileIO(support.TESTFN, "w"))
723 check(self.BufferedWriter(self.MockRawIO()))
724 check(self.BufferedRandom(self.MockRawIO()))
725 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000726
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000727 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000728 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000729 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 def test_read_closed(self):
732 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000733 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 with self.open(support.TESTFN, "r") as f:
735 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000736 self.assertEqual(file.read(), "egg\n")
737 file.seek(0)
738 file.close()
739 self.assertRaises(ValueError, file.read)
Philipp Gesangcb1c0742020-02-04 22:25:16 +0100740 with self.open(support.TESTFN, "rb") as f:
741 file = self.open(f.fileno(), "rb", closefd=False)
742 self.assertEqual(file.read()[:3], b"egg")
743 file.close()
744 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000745
746 def test_no_closefd_with_filename(self):
747 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000749
750 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000752 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000754 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000755 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000756 self.assertEqual(file.buffer.raw.closefd, False)
757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000758 def test_garbage_collection(self):
759 # FileIO objects are collected, and collecting them flushes
760 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000761 with support.check_warnings(('', ResourceWarning)):
762 f = self.FileIO(support.TESTFN, "wb")
763 f.write(b"abcxxx")
764 f.f = f
765 wr = weakref.ref(f)
766 del f
767 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300768 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000769 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000770 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000771
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000772 def test_unbounded_file(self):
773 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
774 zero = "/dev/zero"
775 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000776 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000777 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000778 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000779 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800780 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000781 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000782 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000783 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000784 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000785 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000786 self.assertRaises(OverflowError, f.read)
787
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200788 def check_flush_error_on_close(self, *args, **kwargs):
789 # Test that the file is closed despite failed flush
790 # and that flush() is called before file closed.
791 f = self.open(*args, **kwargs)
792 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000793 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200794 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200795 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000796 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200797 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600798 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200799 self.assertTrue(closed) # flush() called
800 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200801 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200802
803 def test_flush_error_on_close(self):
804 # raw file
805 # Issue #5700: io.FileIO calls flush() after file closed
806 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808 self.check_flush_error_on_close(fd, 'wb', buffering=0)
809 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
810 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
811 os.close(fd)
812 # buffered io
813 self.check_flush_error_on_close(support.TESTFN, 'wb')
814 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
815 self.check_flush_error_on_close(fd, 'wb')
816 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
817 self.check_flush_error_on_close(fd, 'wb', closefd=False)
818 os.close(fd)
819 # text io
820 self.check_flush_error_on_close(support.TESTFN, 'w')
821 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
822 self.check_flush_error_on_close(fd, 'w')
823 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
824 self.check_flush_error_on_close(fd, 'w', closefd=False)
825 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000826
827 def test_multi_close(self):
828 f = self.open(support.TESTFN, "wb", buffering=0)
829 f.close()
830 f.close()
831 f.close()
832 self.assertRaises(ValueError, f.flush)
833
Antoine Pitrou328ec742010-09-14 18:37:24 +0000834 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530835 # Exercise the default limited RawIOBase.read(n) implementation (which
836 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000837 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
838 self.assertEqual(rawio.read(2), b"ab")
839 self.assertEqual(rawio.read(2), b"c")
840 self.assertEqual(rawio.read(2), b"d")
841 self.assertEqual(rawio.read(2), None)
842 self.assertEqual(rawio.read(2), b"ef")
843 self.assertEqual(rawio.read(2), b"g")
844 self.assertEqual(rawio.read(2), None)
845 self.assertEqual(rawio.read(2), b"")
846
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400847 def test_types_have_dict(self):
848 test = (
849 self.IOBase(),
850 self.RawIOBase(),
851 self.TextIOBase(),
852 self.StringIO(),
853 self.BytesIO()
854 )
855 for obj in test:
856 self.assertTrue(hasattr(obj, "__dict__"))
857
Ross Lagerwall59142db2011-10-31 20:34:46 +0200858 def test_opener(self):
859 with self.open(support.TESTFN, "w") as f:
860 f.write("egg\n")
861 fd = os.open(support.TESTFN, os.O_RDONLY)
862 def opener(path, flags):
863 return fd
864 with self.open("non-existent", "r", opener=opener) as f:
865 self.assertEqual(f.read(), "egg\n")
866
Barry Warsaw480e2852016-06-08 17:47:26 -0400867 def test_bad_opener_negative_1(self):
868 # Issue #27066.
869 def badopener(fname, flags):
870 return -1
871 with self.assertRaises(ValueError) as cm:
872 open('non-existent', 'r', opener=badopener)
873 self.assertEqual(str(cm.exception), 'opener returned -1')
874
875 def test_bad_opener_other_negative(self):
876 # Issue #27066.
877 def badopener(fname, flags):
878 return -2
879 with self.assertRaises(ValueError) as cm:
880 open('non-existent', 'r', opener=badopener)
881 self.assertEqual(str(cm.exception), 'opener returned -2')
882
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200883 def test_fileio_closefd(self):
884 # Issue #4841
885 with self.open(__file__, 'rb') as f1, \
886 self.open(__file__, 'rb') as f2:
887 fileio = self.FileIO(f1.fileno(), closefd=False)
888 # .__init__() must not close f1
889 fileio.__init__(f2.fileno(), closefd=False)
890 f1.readline()
891 # .close() must not close f2
892 fileio.close()
893 f2.readline()
894
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300895 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200896 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300897 with self.assertRaises(ValueError):
898 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300899
900 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200901 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300902 with self.assertRaises(ValueError):
903 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300904
Martin Panter6bb91f32016-05-28 00:41:57 +0000905 def test_buffered_readinto_mixin(self):
906 # Test the implementation provided by BufferedIOBase
907 class Stream(self.BufferedIOBase):
908 def read(self, size):
909 return b"12345"
910 read1 = read
911 stream = Stream()
912 for method in ("readinto", "readinto1"):
913 with self.subTest(method):
914 buffer = byteslike(5)
915 self.assertEqual(getattr(stream, method)(buffer), 5)
916 self.assertEqual(bytes(buffer), b"12345")
917
Ethan Furmand62548a2016-06-04 14:38:43 -0700918 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700919 def check_path_succeeds(path):
920 with self.open(path, "w") as f:
921 f.write("egg\n")
922
923 with self.open(path, "r") as f:
924 self.assertEqual(f.read(), "egg\n")
925
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200926 check_path_succeeds(FakePath(support.TESTFN))
927 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700928
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200929 with self.open(support.TESTFN, "w") as f:
930 bad_path = FakePath(f.fileno())
931 with self.assertRaises(TypeError):
932 self.open(bad_path, 'w')
933
934 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700935 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700936 self.open(bad_path, 'w')
937
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200938 bad_path = FakePath(FloatingPointError)
939 with self.assertRaises(FloatingPointError):
940 self.open(bad_path, 'w')
941
Ethan Furmand62548a2016-06-04 14:38:43 -0700942 # ensure that refcounting is correct with some error conditions
943 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200944 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700945
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530946 def test_RawIOBase_readall(self):
947 # Exercise the default unlimited RawIOBase.read() and readall()
948 # implementations.
949 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
950 self.assertEqual(rawio.read(), b"abcdefg")
951 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
952 self.assertEqual(rawio.readall(), b"abcdefg")
953
954 def test_BufferedIOBase_readinto(self):
955 # Exercise the default BufferedIOBase.readinto() and readinto1()
956 # implementations (which call read() or read1() internally).
957 class Reader(self.BufferedIOBase):
958 def __init__(self, avail):
959 self.avail = avail
960 def read(self, size):
961 result = self.avail[:size]
962 self.avail = self.avail[size:]
963 return result
964 def read1(self, size):
965 """Returns no more than 5 bytes at once"""
966 return self.read(min(size, 5))
967 tests = (
968 # (test method, total data available, read buffer size, expected
969 # read size)
970 ("readinto", 10, 5, 5),
971 ("readinto", 10, 6, 6), # More than read1() can return
972 ("readinto", 5, 6, 5), # Buffer larger than total available
973 ("readinto", 6, 7, 6),
974 ("readinto", 10, 0, 0), # Empty buffer
975 ("readinto1", 10, 5, 5), # Result limited to single read1() call
976 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
977 ("readinto1", 5, 6, 5), # Buffer larger than total available
978 ("readinto1", 6, 7, 5),
979 ("readinto1", 10, 0, 0), # Empty buffer
980 )
981 UNUSED_BYTE = 0x81
982 for test in tests:
983 with self.subTest(test):
984 method, avail, request, result = test
985 reader = Reader(bytes(range(avail)))
986 buffer = bytearray((UNUSED_BYTE,) * request)
987 method = getattr(reader, method)
988 self.assertEqual(method(buffer), result)
989 self.assertEqual(len(buffer), request)
990 self.assertSequenceEqual(buffer[:result], range(result))
991 unused = (UNUSED_BYTE,) * (request - result)
992 self.assertSequenceEqual(buffer[result:], unused)
993 self.assertEqual(len(reader.avail), avail - result)
994
Zackery Spytz28f07362018-07-17 00:31:44 -0600995 def test_close_assert(self):
996 class R(self.IOBase):
997 def __setattr__(self, name, value):
998 pass
999 def flush(self):
1000 raise OSError()
1001 f = R()
1002 # This would cause an assertion failure.
1003 self.assertRaises(OSError, f.close)
1004
Victor Stinner472f7942019-04-12 21:58:24 +02001005 # Silence destructor error
1006 R.flush = lambda self: None
1007
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001008
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001009class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001010
1011 def test_IOBase_finalize(self):
1012 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1013 # class which inherits IOBase and an object of this class are caught
1014 # in a reference cycle and close() is already in the method cache.
1015 class MyIO(self.IOBase):
1016 def close(self):
1017 pass
1018
1019 # create an instance to populate the method cache
1020 MyIO()
1021 obj = MyIO()
1022 obj.obj = obj
1023 wr = weakref.ref(obj)
1024 del MyIO
1025 del obj
1026 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001027 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001028
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001029class PyIOTest(IOTest):
1030 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001031
Guido van Rossuma9e20242007-03-08 00:43:48 +00001032
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001033@support.cpython_only
1034class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001035
Gregory P. Smith054b0652015-04-14 12:58:05 -07001036 def test_RawIOBase_io_in_pyio_match(self):
1037 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001038 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1039 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001040 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1041
1042 def test_RawIOBase_pyio_in_io_match(self):
1043 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1044 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1045 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1046
1047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001048class CommonBufferedTests:
1049 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1050
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001051 def test_detach(self):
1052 raw = self.MockRawIO()
1053 buf = self.tp(raw)
1054 self.assertIs(buf.detach(), raw)
1055 self.assertRaises(ValueError, buf.detach)
1056
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001057 repr(buf) # Should still work
1058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001059 def test_fileno(self):
1060 rawio = self.MockRawIO()
1061 bufio = self.tp(rawio)
1062
Ezio Melottib3aedd42010-11-20 19:04:17 +00001063 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001065 def test_invalid_args(self):
1066 rawio = self.MockRawIO()
1067 bufio = self.tp(rawio)
1068 # Invalid whence
1069 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001070 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001071
1072 def test_override_destructor(self):
1073 tp = self.tp
1074 record = []
1075 class MyBufferedIO(tp):
1076 def __del__(self):
1077 record.append(1)
1078 try:
1079 f = super().__del__
1080 except AttributeError:
1081 pass
1082 else:
1083 f()
1084 def close(self):
1085 record.append(2)
1086 super().close()
1087 def flush(self):
1088 record.append(3)
1089 super().flush()
1090 rawio = self.MockRawIO()
1091 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001092 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001093 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001094 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001095
1096 def test_context_manager(self):
1097 # Test usability as a context manager
1098 rawio = self.MockRawIO()
1099 bufio = self.tp(rawio)
1100 def _with():
1101 with bufio:
1102 pass
1103 _with()
1104 # bufio should now be closed, and using it a second time should raise
1105 # a ValueError.
1106 self.assertRaises(ValueError, _with)
1107
1108 def test_error_through_destructor(self):
1109 # Test that the exception state is not modified by a destructor,
1110 # even if close() fails.
1111 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001112 with support.catch_unraisable_exception() as cm:
1113 with self.assertRaises(AttributeError):
1114 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001115
1116 if not IOBASE_EMITS_UNRAISABLE:
1117 self.assertIsNone(cm.unraisable)
1118 elif cm.unraisable is not None:
1119 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001120
Antoine Pitrou716c4442009-05-23 19:04:03 +00001121 def test_repr(self):
1122 raw = self.MockRawIO()
1123 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001124 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1125 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001126 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001127 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001128 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001129 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001130
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001131 def test_recursive_repr(self):
1132 # Issue #25455
1133 raw = self.MockRawIO()
1134 b = self.tp(raw)
1135 with support.swap_attr(raw, 'name', b):
1136 try:
1137 repr(b) # Should not crash
1138 except RuntimeError:
1139 pass
1140
Antoine Pitrou6be88762010-05-03 16:48:20 +00001141 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001142 # Test that buffered file is closed despite failed flush
1143 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001144 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001145 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001146 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001147 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001148 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001149 raw.flush = bad_flush
1150 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001151 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001152 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001153 self.assertTrue(raw.closed)
1154 self.assertTrue(closed) # flush() called
1155 self.assertFalse(closed[0]) # flush() called before file closed
1156 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001157 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001158
1159 def test_close_error_on_close(self):
1160 raw = self.MockRawIO()
1161 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001162 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001163 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001164 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001165 raw.close = bad_close
1166 b = self.tp(raw)
1167 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001168 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001169 b.close()
1170 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001171 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001172 self.assertEqual(err.exception.__context__.args, ('flush',))
1173 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001174
Victor Stinner472f7942019-04-12 21:58:24 +02001175 # Silence destructor error
1176 raw.close = lambda: None
1177 b.flush = lambda: None
1178
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001179 def test_nonnormalized_close_error_on_close(self):
1180 # Issue #21677
1181 raw = self.MockRawIO()
1182 def bad_flush():
1183 raise non_existing_flush
1184 def bad_close():
1185 raise non_existing_close
1186 raw.close = bad_close
1187 b = self.tp(raw)
1188 b.flush = bad_flush
1189 with self.assertRaises(NameError) as err: # exception not swallowed
1190 b.close()
1191 self.assertIn('non_existing_close', str(err.exception))
1192 self.assertIsInstance(err.exception.__context__, NameError)
1193 self.assertIn('non_existing_flush', str(err.exception.__context__))
1194 self.assertFalse(b.closed)
1195
Victor Stinner472f7942019-04-12 21:58:24 +02001196 # Silence destructor error
1197 b.flush = lambda: None
1198 raw.close = lambda: None
1199
Antoine Pitrou6be88762010-05-03 16:48:20 +00001200 def test_multi_close(self):
1201 raw = self.MockRawIO()
1202 b = self.tp(raw)
1203 b.close()
1204 b.close()
1205 b.close()
1206 self.assertRaises(ValueError, b.flush)
1207
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001208 def test_unseekable(self):
1209 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1210 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1211 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1212
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001213 def test_readonly_attributes(self):
1214 raw = self.MockRawIO()
1215 buf = self.tp(raw)
1216 x = self.MockRawIO()
1217 with self.assertRaises(AttributeError):
1218 buf.raw = x
1219
Guido van Rossum78892e42007-04-06 17:31:18 +00001220
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001221class SizeofTest:
1222
1223 @support.cpython_only
1224 def test_sizeof(self):
1225 bufsize1 = 4096
1226 bufsize2 = 8192
1227 rawio = self.MockRawIO()
1228 bufio = self.tp(rawio, buffer_size=bufsize1)
1229 size = sys.getsizeof(bufio) - bufsize1
1230 rawio = self.MockRawIO()
1231 bufio = self.tp(rawio, buffer_size=bufsize2)
1232 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1233
Jesus Ceadc469452012-10-04 12:37:56 +02001234 @support.cpython_only
1235 def test_buffer_freeing(self) :
1236 bufsize = 4096
1237 rawio = self.MockRawIO()
1238 bufio = self.tp(rawio, buffer_size=bufsize)
1239 size = sys.getsizeof(bufio) - bufsize
1240 bufio.close()
1241 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001242
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001243class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1244 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001245
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001246 def test_constructor(self):
1247 rawio = self.MockRawIO([b"abc"])
1248 bufio = self.tp(rawio)
1249 bufio.__init__(rawio)
1250 bufio.__init__(rawio, buffer_size=1024)
1251 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001252 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001253 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1254 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1255 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1256 rawio = self.MockRawIO([b"abc"])
1257 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001258 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001259
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001260 def test_uninitialized(self):
1261 bufio = self.tp.__new__(self.tp)
1262 del bufio
1263 bufio = self.tp.__new__(self.tp)
1264 self.assertRaisesRegex((ValueError, AttributeError),
1265 'uninitialized|has no attribute',
1266 bufio.read, 0)
1267 bufio.__init__(self.MockRawIO())
1268 self.assertEqual(bufio.read(0), b'')
1269
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001270 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001271 for arg in (None, 7):
1272 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1273 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001274 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001275 # Invalid args
1276 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001277
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001278 def test_read1(self):
1279 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1280 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001281 self.assertEqual(b"a", bufio.read(1))
1282 self.assertEqual(b"b", bufio.read1(1))
1283 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001284 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001285 self.assertEqual(b"c", bufio.read1(100))
1286 self.assertEqual(rawio._reads, 1)
1287 self.assertEqual(b"d", bufio.read1(100))
1288 self.assertEqual(rawio._reads, 2)
1289 self.assertEqual(b"efg", bufio.read1(100))
1290 self.assertEqual(rawio._reads, 3)
1291 self.assertEqual(b"", bufio.read1(100))
1292 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001293
1294 def test_read1_arbitrary(self):
1295 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1296 bufio = self.tp(rawio)
1297 self.assertEqual(b"a", bufio.read(1))
1298 self.assertEqual(b"bc", bufio.read1())
1299 self.assertEqual(b"d", bufio.read1())
1300 self.assertEqual(b"efg", bufio.read1(-1))
1301 self.assertEqual(rawio._reads, 3)
1302 self.assertEqual(b"", bufio.read1())
1303 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001304
1305 def test_readinto(self):
1306 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1307 bufio = self.tp(rawio)
1308 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001309 self.assertEqual(bufio.readinto(b), 2)
1310 self.assertEqual(b, b"ab")
1311 self.assertEqual(bufio.readinto(b), 2)
1312 self.assertEqual(b, b"cd")
1313 self.assertEqual(bufio.readinto(b), 2)
1314 self.assertEqual(b, b"ef")
1315 self.assertEqual(bufio.readinto(b), 1)
1316 self.assertEqual(b, b"gf")
1317 self.assertEqual(bufio.readinto(b), 0)
1318 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001319 rawio = self.MockRawIO((b"abc", None))
1320 bufio = self.tp(rawio)
1321 self.assertEqual(bufio.readinto(b), 2)
1322 self.assertEqual(b, b"ab")
1323 self.assertEqual(bufio.readinto(b), 1)
1324 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001325
Benjamin Petersona96fea02014-06-22 14:17:44 -07001326 def test_readinto1(self):
1327 buffer_size = 10
1328 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1329 bufio = self.tp(rawio, buffer_size=buffer_size)
1330 b = bytearray(2)
1331 self.assertEqual(bufio.peek(3), b'abc')
1332 self.assertEqual(rawio._reads, 1)
1333 self.assertEqual(bufio.readinto1(b), 2)
1334 self.assertEqual(b, b"ab")
1335 self.assertEqual(rawio._reads, 1)
1336 self.assertEqual(bufio.readinto1(b), 1)
1337 self.assertEqual(b[:1], b"c")
1338 self.assertEqual(rawio._reads, 1)
1339 self.assertEqual(bufio.readinto1(b), 2)
1340 self.assertEqual(b, b"de")
1341 self.assertEqual(rawio._reads, 2)
1342 b = bytearray(2*buffer_size)
1343 self.assertEqual(bufio.peek(3), b'fgh')
1344 self.assertEqual(rawio._reads, 3)
1345 self.assertEqual(bufio.readinto1(b), 6)
1346 self.assertEqual(b[:6], b"fghjkl")
1347 self.assertEqual(rawio._reads, 4)
1348
1349 def test_readinto_array(self):
1350 buffer_size = 60
1351 data = b"a" * 26
1352 rawio = self.MockRawIO((data,))
1353 bufio = self.tp(rawio, buffer_size=buffer_size)
1354
1355 # Create an array with element size > 1 byte
1356 b = array.array('i', b'x' * 32)
1357 assert len(b) != 16
1358
1359 # Read into it. We should get as many *bytes* as we can fit into b
1360 # (which is more than the number of elements)
1361 n = bufio.readinto(b)
1362 self.assertGreater(n, len(b))
1363
1364 # Check that old contents of b are preserved
1365 bm = memoryview(b).cast('B')
1366 self.assertLess(n, len(bm))
1367 self.assertEqual(bm[:n], data[:n])
1368 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1369
1370 def test_readinto1_array(self):
1371 buffer_size = 60
1372 data = b"a" * 26
1373 rawio = self.MockRawIO((data,))
1374 bufio = self.tp(rawio, buffer_size=buffer_size)
1375
1376 # Create an array with element size > 1 byte
1377 b = array.array('i', b'x' * 32)
1378 assert len(b) != 16
1379
1380 # Read into it. We should get as many *bytes* as we can fit into b
1381 # (which is more than the number of elements)
1382 n = bufio.readinto1(b)
1383 self.assertGreater(n, len(b))
1384
1385 # Check that old contents of b are preserved
1386 bm = memoryview(b).cast('B')
1387 self.assertLess(n, len(bm))
1388 self.assertEqual(bm[:n], data[:n])
1389 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1390
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001391 def test_readlines(self):
1392 def bufio():
1393 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1394 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001395 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1396 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1397 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001398
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001399 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001400 data = b"abcdefghi"
1401 dlen = len(data)
1402
1403 tests = [
1404 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1405 [ 100, [ 3, 3, 3], [ dlen ] ],
1406 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1407 ]
1408
1409 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001410 rawio = self.MockFileIO(data)
1411 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001412 pos = 0
1413 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001414 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001415 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001416 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001417 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001418
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001420 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001421 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1422 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001423 self.assertEqual(b"abcd", bufio.read(6))
1424 self.assertEqual(b"e", bufio.read(1))
1425 self.assertEqual(b"fg", bufio.read())
1426 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001427 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001428 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001429
Victor Stinnera80987f2011-05-25 22:47:16 +02001430 rawio = self.MockRawIO((b"a", None, None))
1431 self.assertEqual(b"a", rawio.readall())
1432 self.assertIsNone(rawio.readall())
1433
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001434 def test_read_past_eof(self):
1435 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1436 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001437
Ezio Melottib3aedd42010-11-20 19:04:17 +00001438 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001439
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 def test_read_all(self):
1441 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1442 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001443
Ezio Melottib3aedd42010-11-20 19:04:17 +00001444 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001445
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001446 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001447 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001448 try:
1449 # Write out many bytes with exactly the same number of 0's,
1450 # 1's... 255's. This will help us check that concurrent reading
1451 # doesn't duplicate or forget contents.
1452 N = 1000
1453 l = list(range(256)) * N
1454 random.shuffle(l)
1455 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001456 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001457 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001458 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001459 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001460 errors = []
1461 results = []
1462 def f():
1463 try:
1464 # Intra-buffer read then buffer-flushing read
1465 for n in cycle([1, 19]):
1466 s = bufio.read(n)
1467 if not s:
1468 break
1469 # list.append() is atomic
1470 results.append(s)
1471 except Exception as e:
1472 errors.append(e)
1473 raise
1474 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001475 with support.start_threads(threads):
1476 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001477 self.assertFalse(errors,
1478 "the following exceptions were caught: %r" % errors)
1479 s = b''.join(results)
1480 for i in range(256):
1481 c = bytes(bytearray([i]))
1482 self.assertEqual(s.count(c), N)
1483 finally:
1484 support.unlink(support.TESTFN)
1485
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001486 def test_unseekable(self):
1487 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1488 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1489 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1490 bufio.read(1)
1491 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1492 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1493
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494 def test_misbehaved_io(self):
1495 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1496 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001497 self.assertRaises(OSError, bufio.seek, 0)
1498 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001499
Victor Stinnerb589cef2019-06-11 03:10:59 +02001500 # Silence destructor error
1501 bufio.close = lambda: None
1502
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001503 def test_no_extraneous_read(self):
1504 # Issue #9550; when the raw IO object has satisfied the read request,
1505 # we should not issue any additional reads, otherwise it may block
1506 # (e.g. socket).
1507 bufsize = 16
1508 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1509 rawio = self.MockRawIO([b"x" * n])
1510 bufio = self.tp(rawio, bufsize)
1511 self.assertEqual(bufio.read(n), b"x" * n)
1512 # Simple case: one raw read is enough to satisfy the request.
1513 self.assertEqual(rawio._extraneous_reads, 0,
1514 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1515 # A more complex case where two raw reads are needed to satisfy
1516 # the request.
1517 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1518 bufio = self.tp(rawio, bufsize)
1519 self.assertEqual(bufio.read(n), b"x" * n)
1520 self.assertEqual(rawio._extraneous_reads, 0,
1521 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1522
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001523 def test_read_on_closed(self):
1524 # Issue #23796
1525 b = io.BufferedReader(io.BytesIO(b"12"))
1526 b.read(1)
1527 b.close()
1528 self.assertRaises(ValueError, b.peek)
1529 self.assertRaises(ValueError, b.read1, 1)
1530
Berker Peksagfd5116c2020-02-21 20:57:26 +03001531 def test_truncate_on_read_only(self):
1532 rawio = self.MockFileIO(b"abc")
1533 bufio = self.tp(rawio)
1534 self.assertFalse(bufio.writable())
1535 self.assertRaises(self.UnsupportedOperation, bufio.truncate)
1536 self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
1537
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001538
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001539class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001540 tp = io.BufferedReader
1541
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001542 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1543 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001544 def test_constructor(self):
1545 BufferedReaderTest.test_constructor(self)
1546 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001547 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001548 if sys.maxsize > 0x7FFFFFFF:
1549 rawio = self.MockRawIO()
1550 bufio = self.tp(rawio)
1551 self.assertRaises((OverflowError, MemoryError, ValueError),
1552 bufio.__init__, rawio, sys.maxsize)
1553
1554 def test_initialization(self):
1555 rawio = self.MockRawIO([b"abc"])
1556 bufio = self.tp(rawio)
1557 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1558 self.assertRaises(ValueError, bufio.read)
1559 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1560 self.assertRaises(ValueError, bufio.read)
1561 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1562 self.assertRaises(ValueError, bufio.read)
1563
1564 def test_misbehaved_io_read(self):
1565 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1566 bufio = self.tp(rawio)
1567 # _pyio.BufferedReader seems to implement reading different, so that
1568 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001569 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001570
1571 def test_garbage_collection(self):
1572 # C BufferedReader objects are collected.
1573 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001574 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001575 with support.check_warnings(('', ResourceWarning)):
1576 rawio = self.FileIO(support.TESTFN, "w+b")
1577 f = self.tp(rawio)
1578 f.f = f
1579 wr = weakref.ref(f)
1580 del f
1581 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001582 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001583
R David Murray67bfe802013-02-23 21:51:05 -05001584 def test_args_error(self):
1585 # Issue #17275
1586 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1587 self.tp(io.BytesIO(), 1024, 1024, 1024)
1588
1589
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001590class PyBufferedReaderTest(BufferedReaderTest):
1591 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001592
Guido van Rossuma9e20242007-03-08 00:43:48 +00001593
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001594class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1595 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001596
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001597 def test_constructor(self):
1598 rawio = self.MockRawIO()
1599 bufio = self.tp(rawio)
1600 bufio.__init__(rawio)
1601 bufio.__init__(rawio, buffer_size=1024)
1602 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001603 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001604 bufio.flush()
1605 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1606 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1607 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1608 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001609 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001610 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001611 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001612
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001613 def test_uninitialized(self):
1614 bufio = self.tp.__new__(self.tp)
1615 del bufio
1616 bufio = self.tp.__new__(self.tp)
1617 self.assertRaisesRegex((ValueError, AttributeError),
1618 'uninitialized|has no attribute',
1619 bufio.write, b'')
1620 bufio.__init__(self.MockRawIO())
1621 self.assertEqual(bufio.write(b''), 0)
1622
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001623 def test_detach_flush(self):
1624 raw = self.MockRawIO()
1625 buf = self.tp(raw)
1626 buf.write(b"howdy!")
1627 self.assertFalse(raw._write_stack)
1628 buf.detach()
1629 self.assertEqual(raw._write_stack, [b"howdy!"])
1630
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001631 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001632 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001633 writer = self.MockRawIO()
1634 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001635 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001636 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001637 buffer = bytearray(b"def")
1638 bufio.write(buffer)
1639 buffer[:] = b"***" # Overwrite our copy of the data
1640 bufio.flush()
1641 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001642
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001643 def test_write_overflow(self):
1644 writer = self.MockRawIO()
1645 bufio = self.tp(writer, 8)
1646 contents = b"abcdefghijklmnop"
1647 for n in range(0, len(contents), 3):
1648 bufio.write(contents[n:n+3])
1649 flushed = b"".join(writer._write_stack)
1650 # At least (total - 8) bytes were implicitly flushed, perhaps more
1651 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001652 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001654 def check_writes(self, intermediate_func):
1655 # Lots of writes, test the flushed output is as expected.
1656 contents = bytes(range(256)) * 1000
1657 n = 0
1658 writer = self.MockRawIO()
1659 bufio = self.tp(writer, 13)
1660 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1661 def gen_sizes():
1662 for size in count(1):
1663 for i in range(15):
1664 yield size
1665 sizes = gen_sizes()
1666 while n < len(contents):
1667 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001668 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001669 intermediate_func(bufio)
1670 n += size
1671 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001672 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001673
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001674 def test_writes(self):
1675 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001676
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001677 def test_writes_and_flushes(self):
1678 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001680 def test_writes_and_seeks(self):
1681 def _seekabs(bufio):
1682 pos = bufio.tell()
1683 bufio.seek(pos + 1, 0)
1684 bufio.seek(pos - 1, 0)
1685 bufio.seek(pos, 0)
1686 self.check_writes(_seekabs)
1687 def _seekrel(bufio):
1688 pos = bufio.seek(0, 1)
1689 bufio.seek(+1, 1)
1690 bufio.seek(-1, 1)
1691 bufio.seek(pos, 0)
1692 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 def test_writes_and_truncates(self):
1695 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001696
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001697 def test_write_non_blocking(self):
1698 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001699 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001700
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(bufio.write(b"abcd"), 4)
1702 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 # 1 byte will be written, the rest will be buffered
1704 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001705 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001706
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001707 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1708 raw.block_on(b"0")
1709 try:
1710 bufio.write(b"opqrwxyz0123456789")
1711 except self.BlockingIOError as e:
1712 written = e.characters_written
1713 else:
1714 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001715 self.assertEqual(written, 16)
1716 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001717 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001718
Ezio Melottib3aedd42010-11-20 19:04:17 +00001719 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001720 s = raw.pop_written()
1721 # Previously buffered bytes were flushed
1722 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001723
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001724 def test_write_and_rewind(self):
1725 raw = io.BytesIO()
1726 bufio = self.tp(raw, 4)
1727 self.assertEqual(bufio.write(b"abcdef"), 6)
1728 self.assertEqual(bufio.tell(), 6)
1729 bufio.seek(0, 0)
1730 self.assertEqual(bufio.write(b"XY"), 2)
1731 bufio.seek(6, 0)
1732 self.assertEqual(raw.getvalue(), b"XYcdef")
1733 self.assertEqual(bufio.write(b"123456"), 6)
1734 bufio.flush()
1735 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001737 def test_flush(self):
1738 writer = self.MockRawIO()
1739 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001740 bufio.write(b"abc")
1741 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001742 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001743
Antoine Pitrou131a4892012-10-16 22:57:11 +02001744 def test_writelines(self):
1745 l = [b'ab', b'cd', b'ef']
1746 writer = self.MockRawIO()
1747 bufio = self.tp(writer, 8)
1748 bufio.writelines(l)
1749 bufio.flush()
1750 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1751
1752 def test_writelines_userlist(self):
1753 l = UserList([b'ab', b'cd', b'ef'])
1754 writer = self.MockRawIO()
1755 bufio = self.tp(writer, 8)
1756 bufio.writelines(l)
1757 bufio.flush()
1758 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1759
1760 def test_writelines_error(self):
1761 writer = self.MockRawIO()
1762 bufio = self.tp(writer, 8)
1763 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1764 self.assertRaises(TypeError, bufio.writelines, None)
1765 self.assertRaises(TypeError, bufio.writelines, 'abc')
1766
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 def test_destructor(self):
1768 writer = self.MockRawIO()
1769 bufio = self.tp(writer, 8)
1770 bufio.write(b"abc")
1771 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001772 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001773 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001774
1775 def test_truncate(self):
1776 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001777 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001778 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001779 bufio = self.tp(raw, 8)
1780 bufio.write(b"abcdef")
1781 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001782 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001783 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001784 self.assertEqual(f.read(), b"abc")
1785
Nitish Chandra059f58c2018-01-28 21:30:09 +05301786 def test_truncate_after_write(self):
1787 # Ensure that truncate preserves the file position after
1788 # writes longer than the buffer size.
1789 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001790 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301791 with self.open(support.TESTFN, "wb") as f:
1792 # Fill with some buffer
1793 f.write(b'\x00' * 10000)
1794 buffer_sizes = [8192, 4096, 200]
1795 for buffer_size in buffer_sizes:
1796 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1797 f.write(b'\x00' * (buffer_size + 1))
1798 # After write write_pos and write_end are set to 0
1799 f.read(1)
1800 # read operation makes sure that pos != raw_pos
1801 f.truncate()
1802 self.assertEqual(f.tell(), buffer_size + 2)
1803
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001804 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001805 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001806 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001807 # Write out many bytes from many threads and test they were
1808 # all flushed.
1809 N = 1000
1810 contents = bytes(range(256)) * N
1811 sizes = cycle([1, 19])
1812 n = 0
1813 queue = deque()
1814 while n < len(contents):
1815 size = next(sizes)
1816 queue.append(contents[n:n+size])
1817 n += size
1818 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001819 # We use a real file object because it allows us to
1820 # exercise situations where the GIL is released before
1821 # writing the buffer to the raw streams. This is in addition
1822 # to concurrency issues due to switching threads in the middle
1823 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001824 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001825 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001826 errors = []
1827 def f():
1828 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829 while True:
1830 try:
1831 s = queue.popleft()
1832 except IndexError:
1833 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001834 bufio.write(s)
1835 except Exception as e:
1836 errors.append(e)
1837 raise
1838 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001839 with support.start_threads(threads):
1840 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001841 self.assertFalse(errors,
1842 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001843 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001844 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845 s = f.read()
1846 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001847 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001848 finally:
1849 support.unlink(support.TESTFN)
1850
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001851 def test_misbehaved_io(self):
1852 rawio = self.MisbehavedRawIO()
1853 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001854 self.assertRaises(OSError, bufio.seek, 0)
1855 self.assertRaises(OSError, bufio.tell)
1856 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001857
Victor Stinnerb589cef2019-06-11 03:10:59 +02001858 # Silence destructor error
1859 bufio.close = lambda: None
1860
Florent Xicluna109d5732012-07-07 17:03:22 +02001861 def test_max_buffer_size_removal(self):
1862 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001863 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001864
Benjamin Peterson68623612012-12-20 11:53:11 -06001865 def test_write_error_on_close(self):
1866 raw = self.MockRawIO()
1867 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001868 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001869 raw.write = bad_write
1870 b = self.tp(raw)
1871 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001872 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001873 self.assertTrue(b.closed)
1874
benfogle9703f092017-11-10 16:03:40 -05001875 def test_slow_close_from_thread(self):
1876 # Issue #31976
1877 rawio = self.SlowFlushRawIO()
1878 bufio = self.tp(rawio, 8)
1879 t = threading.Thread(target=bufio.close)
1880 t.start()
1881 rawio.in_flush.wait()
1882 self.assertRaises(ValueError, bufio.write, b'spam')
1883 self.assertTrue(bufio.closed)
1884 t.join()
1885
1886
Benjamin Peterson59406a92009-03-26 17:10:29 +00001887
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001888class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001889 tp = io.BufferedWriter
1890
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001891 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1892 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001893 def test_constructor(self):
1894 BufferedWriterTest.test_constructor(self)
1895 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001896 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001897 if sys.maxsize > 0x7FFFFFFF:
1898 rawio = self.MockRawIO()
1899 bufio = self.tp(rawio)
1900 self.assertRaises((OverflowError, MemoryError, ValueError),
1901 bufio.__init__, rawio, sys.maxsize)
1902
1903 def test_initialization(self):
1904 rawio = self.MockRawIO()
1905 bufio = self.tp(rawio)
1906 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1907 self.assertRaises(ValueError, bufio.write, b"def")
1908 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1909 self.assertRaises(ValueError, bufio.write, b"def")
1910 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1911 self.assertRaises(ValueError, bufio.write, b"def")
1912
1913 def test_garbage_collection(self):
1914 # C BufferedWriter objects are collected, and collecting them flushes
1915 # all data to disk.
1916 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001917 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001918 with support.check_warnings(('', ResourceWarning)):
1919 rawio = self.FileIO(support.TESTFN, "w+b")
1920 f = self.tp(rawio)
1921 f.write(b"123xxx")
1922 f.x = f
1923 wr = weakref.ref(f)
1924 del f
1925 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001926 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001927 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001928 self.assertEqual(f.read(), b"123xxx")
1929
R David Murray67bfe802013-02-23 21:51:05 -05001930 def test_args_error(self):
1931 # Issue #17275
1932 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1933 self.tp(io.BytesIO(), 1024, 1024, 1024)
1934
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001935
1936class PyBufferedWriterTest(BufferedWriterTest):
1937 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001938
Guido van Rossum01a27522007-03-07 01:00:12 +00001939class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001940
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001941 def test_constructor(self):
1942 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001943 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001944
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001945 def test_uninitialized(self):
1946 pair = self.tp.__new__(self.tp)
1947 del pair
1948 pair = self.tp.__new__(self.tp)
1949 self.assertRaisesRegex((ValueError, AttributeError),
1950 'uninitialized|has no attribute',
1951 pair.read, 0)
1952 self.assertRaisesRegex((ValueError, AttributeError),
1953 'uninitialized|has no attribute',
1954 pair.write, b'')
1955 pair.__init__(self.MockRawIO(), self.MockRawIO())
1956 self.assertEqual(pair.read(0), b'')
1957 self.assertEqual(pair.write(b''), 0)
1958
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001959 def test_detach(self):
1960 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1961 self.assertRaises(self.UnsupportedOperation, pair.detach)
1962
Florent Xicluna109d5732012-07-07 17:03:22 +02001963 def test_constructor_max_buffer_size_removal(self):
1964 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001965 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001966
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001967 def test_constructor_with_not_readable(self):
1968 class NotReadable(MockRawIO):
1969 def readable(self):
1970 return False
1971
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001972 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001973
1974 def test_constructor_with_not_writeable(self):
1975 class NotWriteable(MockRawIO):
1976 def writable(self):
1977 return False
1978
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001979 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001980
1981 def test_read(self):
1982 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1983
1984 self.assertEqual(pair.read(3), b"abc")
1985 self.assertEqual(pair.read(1), b"d")
1986 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001987 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1988 self.assertEqual(pair.read(None), b"abc")
1989
1990 def test_readlines(self):
1991 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1992 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1993 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1994 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001995
1996 def test_read1(self):
1997 # .read1() is delegated to the underlying reader object, so this test
1998 # can be shallow.
1999 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2000
2001 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00002002 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002003
2004 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00002005 for method in ("readinto", "readinto1"):
2006 with self.subTest(method):
2007 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002008
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002009 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002010 self.assertEqual(getattr(pair, method)(data), 5)
2011 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002012
2013 def test_write(self):
2014 w = self.MockRawIO()
2015 pair = self.tp(self.MockRawIO(), w)
2016
2017 pair.write(b"abc")
2018 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002019 buffer = bytearray(b"def")
2020 pair.write(buffer)
2021 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002022 pair.flush()
2023 self.assertEqual(w._write_stack, [b"abc", b"def"])
2024
2025 def test_peek(self):
2026 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2027
2028 self.assertTrue(pair.peek(3).startswith(b"abc"))
2029 self.assertEqual(pair.read(3), b"abc")
2030
2031 def test_readable(self):
2032 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2033 self.assertTrue(pair.readable())
2034
2035 def test_writeable(self):
2036 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2037 self.assertTrue(pair.writable())
2038
2039 def test_seekable(self):
2040 # BufferedRWPairs are never seekable, even if their readers and writers
2041 # are.
2042 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2043 self.assertFalse(pair.seekable())
2044
2045 # .flush() is delegated to the underlying writer object and has been
2046 # tested in the test_write method.
2047
2048 def test_close_and_closed(self):
2049 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2050 self.assertFalse(pair.closed)
2051 pair.close()
2052 self.assertTrue(pair.closed)
2053
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002054 def test_reader_close_error_on_close(self):
2055 def reader_close():
2056 reader_non_existing
2057 reader = self.MockRawIO()
2058 reader.close = reader_close
2059 writer = self.MockRawIO()
2060 pair = self.tp(reader, writer)
2061 with self.assertRaises(NameError) as err:
2062 pair.close()
2063 self.assertIn('reader_non_existing', str(err.exception))
2064 self.assertTrue(pair.closed)
2065 self.assertFalse(reader.closed)
2066 self.assertTrue(writer.closed)
2067
Victor Stinner472f7942019-04-12 21:58:24 +02002068 # Silence destructor error
2069 reader.close = lambda: None
2070
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002071 def test_writer_close_error_on_close(self):
2072 def writer_close():
2073 writer_non_existing
2074 reader = self.MockRawIO()
2075 writer = self.MockRawIO()
2076 writer.close = writer_close
2077 pair = self.tp(reader, writer)
2078 with self.assertRaises(NameError) as err:
2079 pair.close()
2080 self.assertIn('writer_non_existing', str(err.exception))
2081 self.assertFalse(pair.closed)
2082 self.assertTrue(reader.closed)
2083 self.assertFalse(writer.closed)
2084
Victor Stinner472f7942019-04-12 21:58:24 +02002085 # Silence destructor error
2086 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002087 writer = None
2088
Victor Stinner212646c2019-06-14 18:03:22 +02002089 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002090 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002091 # Ignore BufferedRWPair unraisable exception
2092 with support.catch_unraisable_exception():
2093 pair = None
2094 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002095 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002096
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002097 def test_reader_writer_close_error_on_close(self):
2098 def reader_close():
2099 reader_non_existing
2100 def writer_close():
2101 writer_non_existing
2102 reader = self.MockRawIO()
2103 reader.close = reader_close
2104 writer = self.MockRawIO()
2105 writer.close = writer_close
2106 pair = self.tp(reader, writer)
2107 with self.assertRaises(NameError) as err:
2108 pair.close()
2109 self.assertIn('reader_non_existing', str(err.exception))
2110 self.assertIsInstance(err.exception.__context__, NameError)
2111 self.assertIn('writer_non_existing', str(err.exception.__context__))
2112 self.assertFalse(pair.closed)
2113 self.assertFalse(reader.closed)
2114 self.assertFalse(writer.closed)
2115
Victor Stinner472f7942019-04-12 21:58:24 +02002116 # Silence destructor error
2117 reader.close = lambda: None
2118 writer.close = lambda: None
2119
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002120 def test_isatty(self):
2121 class SelectableIsAtty(MockRawIO):
2122 def __init__(self, isatty):
2123 MockRawIO.__init__(self)
2124 self._isatty = isatty
2125
2126 def isatty(self):
2127 return self._isatty
2128
2129 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2130 self.assertFalse(pair.isatty())
2131
2132 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2133 self.assertTrue(pair.isatty())
2134
2135 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2136 self.assertTrue(pair.isatty())
2137
2138 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2139 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002140
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002141 def test_weakref_clearing(self):
2142 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2143 ref = weakref.ref(brw)
2144 brw = None
2145 ref = None # Shouldn't segfault.
2146
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002147class CBufferedRWPairTest(BufferedRWPairTest):
2148 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002150class PyBufferedRWPairTest(BufferedRWPairTest):
2151 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002152
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002153
2154class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2155 read_mode = "rb+"
2156 write_mode = "wb+"
2157
2158 def test_constructor(self):
2159 BufferedReaderTest.test_constructor(self)
2160 BufferedWriterTest.test_constructor(self)
2161
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002162 def test_uninitialized(self):
2163 BufferedReaderTest.test_uninitialized(self)
2164 BufferedWriterTest.test_uninitialized(self)
2165
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002166 def test_read_and_write(self):
2167 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002168 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002169
2170 self.assertEqual(b"as", rw.read(2))
2171 rw.write(b"ddd")
2172 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002173 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002174 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002175 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002176
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002177 def test_seek_and_tell(self):
2178 raw = self.BytesIO(b"asdfghjkl")
2179 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002180
Ezio Melottib3aedd42010-11-20 19:04:17 +00002181 self.assertEqual(b"as", rw.read(2))
2182 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002183 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002184 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002185
Antoine Pitroue05565e2011-08-20 14:39:23 +02002186 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002187 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002188 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002189 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002190 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002191 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002192 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002193 self.assertEqual(7, rw.tell())
2194 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002195 rw.flush()
2196 self.assertEqual(b"asdf123fl", raw.getvalue())
2197
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002198 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002199
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002200 def check_flush_and_read(self, read_func):
2201 raw = self.BytesIO(b"abcdefghi")
2202 bufio = self.tp(raw)
2203
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002206 self.assertEqual(b"ef", read_func(bufio, 2))
2207 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002208 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002209 self.assertEqual(6, bufio.tell())
2210 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002211 raw.seek(0, 0)
2212 raw.write(b"XYZ")
2213 # flush() resets the read buffer
2214 bufio.flush()
2215 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002216 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002217
2218 def test_flush_and_read(self):
2219 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2220
2221 def test_flush_and_readinto(self):
2222 def _readinto(bufio, n=-1):
2223 b = bytearray(n if n >= 0 else 9999)
2224 n = bufio.readinto(b)
2225 return bytes(b[:n])
2226 self.check_flush_and_read(_readinto)
2227
2228 def test_flush_and_peek(self):
2229 def _peek(bufio, n=-1):
2230 # This relies on the fact that the buffer can contain the whole
2231 # raw stream, otherwise peek() can return less.
2232 b = bufio.peek(n)
2233 if n != -1:
2234 b = b[:n]
2235 bufio.seek(len(b), 1)
2236 return b
2237 self.check_flush_and_read(_peek)
2238
2239 def test_flush_and_write(self):
2240 raw = self.BytesIO(b"abcdefghi")
2241 bufio = self.tp(raw)
2242
2243 bufio.write(b"123")
2244 bufio.flush()
2245 bufio.write(b"45")
2246 bufio.flush()
2247 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002248 self.assertEqual(b"12345fghi", raw.getvalue())
2249 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002250
2251 def test_threads(self):
2252 BufferedReaderTest.test_threads(self)
2253 BufferedWriterTest.test_threads(self)
2254
2255 def test_writes_and_peek(self):
2256 def _peek(bufio):
2257 bufio.peek(1)
2258 self.check_writes(_peek)
2259 def _peek(bufio):
2260 pos = bufio.tell()
2261 bufio.seek(-1, 1)
2262 bufio.peek(1)
2263 bufio.seek(pos, 0)
2264 self.check_writes(_peek)
2265
2266 def test_writes_and_reads(self):
2267 def _read(bufio):
2268 bufio.seek(-1, 1)
2269 bufio.read(1)
2270 self.check_writes(_read)
2271
2272 def test_writes_and_read1s(self):
2273 def _read1(bufio):
2274 bufio.seek(-1, 1)
2275 bufio.read1(1)
2276 self.check_writes(_read1)
2277
2278 def test_writes_and_readintos(self):
2279 def _read(bufio):
2280 bufio.seek(-1, 1)
2281 bufio.readinto(bytearray(1))
2282 self.check_writes(_read)
2283
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002284 def test_write_after_readahead(self):
2285 # Issue #6629: writing after the buffer was filled by readahead should
2286 # first rewind the raw stream.
2287 for overwrite_size in [1, 5]:
2288 raw = self.BytesIO(b"A" * 10)
2289 bufio = self.tp(raw, 4)
2290 # Trigger readahead
2291 self.assertEqual(bufio.read(1), b"A")
2292 self.assertEqual(bufio.tell(), 1)
2293 # Overwriting should rewind the raw stream if it needs so
2294 bufio.write(b"B" * overwrite_size)
2295 self.assertEqual(bufio.tell(), overwrite_size + 1)
2296 # If the write size was smaller than the buffer size, flush() and
2297 # check that rewind happens.
2298 bufio.flush()
2299 self.assertEqual(bufio.tell(), overwrite_size + 1)
2300 s = raw.getvalue()
2301 self.assertEqual(s,
2302 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2303
Antoine Pitrou7c404892011-05-13 00:13:33 +02002304 def test_write_rewind_write(self):
2305 # Various combinations of reading / writing / seeking backwards / writing again
2306 def mutate(bufio, pos1, pos2):
2307 assert pos2 >= pos1
2308 # Fill the buffer
2309 bufio.seek(pos1)
2310 bufio.read(pos2 - pos1)
2311 bufio.write(b'\x02')
2312 # This writes earlier than the previous write, but still inside
2313 # the buffer.
2314 bufio.seek(pos1)
2315 bufio.write(b'\x01')
2316
2317 b = b"\x80\x81\x82\x83\x84"
2318 for i in range(0, len(b)):
2319 for j in range(i, len(b)):
2320 raw = self.BytesIO(b)
2321 bufio = self.tp(raw, 100)
2322 mutate(bufio, i, j)
2323 bufio.flush()
2324 expected = bytearray(b)
2325 expected[j] = 2
2326 expected[i] = 1
2327 self.assertEqual(raw.getvalue(), expected,
2328 "failed result for i=%d, j=%d" % (i, j))
2329
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002330 def test_truncate_after_read_or_write(self):
2331 raw = self.BytesIO(b"A" * 10)
2332 bufio = self.tp(raw, 100)
2333 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2334 self.assertEqual(bufio.truncate(), 2)
2335 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2336 self.assertEqual(bufio.truncate(), 4)
2337
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002338 def test_misbehaved_io(self):
2339 BufferedReaderTest.test_misbehaved_io(self)
2340 BufferedWriterTest.test_misbehaved_io(self)
2341
Antoine Pitroue05565e2011-08-20 14:39:23 +02002342 def test_interleaved_read_write(self):
2343 # Test for issue #12213
2344 with self.BytesIO(b'abcdefgh') as raw:
2345 with self.tp(raw, 100) as f:
2346 f.write(b"1")
2347 self.assertEqual(f.read(1), b'b')
2348 f.write(b'2')
2349 self.assertEqual(f.read1(1), b'd')
2350 f.write(b'3')
2351 buf = bytearray(1)
2352 f.readinto(buf)
2353 self.assertEqual(buf, b'f')
2354 f.write(b'4')
2355 self.assertEqual(f.peek(1), b'h')
2356 f.flush()
2357 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2358
2359 with self.BytesIO(b'abc') as raw:
2360 with self.tp(raw, 100) as f:
2361 self.assertEqual(f.read(1), b'a')
2362 f.write(b"2")
2363 self.assertEqual(f.read(1), b'c')
2364 f.flush()
2365 self.assertEqual(raw.getvalue(), b'a2c')
2366
2367 def test_interleaved_readline_write(self):
2368 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2369 with self.tp(raw) as f:
2370 f.write(b'1')
2371 self.assertEqual(f.readline(), b'b\n')
2372 f.write(b'2')
2373 self.assertEqual(f.readline(), b'def\n')
2374 f.write(b'3')
2375 self.assertEqual(f.readline(), b'\n')
2376 f.flush()
2377 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2378
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002379 # You can't construct a BufferedRandom over a non-seekable stream.
2380 test_unseekable = None
2381
Berker Peksagfd5116c2020-02-21 20:57:26 +03002382 # writable() returns True, so there's no point to test it over
2383 # a writable stream.
2384 test_truncate_on_read_only = None
2385
R David Murray67bfe802013-02-23 21:51:05 -05002386
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002387class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002388 tp = io.BufferedRandom
2389
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002390 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2391 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002392 def test_constructor(self):
2393 BufferedRandomTest.test_constructor(self)
2394 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002395 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002396 if sys.maxsize > 0x7FFFFFFF:
2397 rawio = self.MockRawIO()
2398 bufio = self.tp(rawio)
2399 self.assertRaises((OverflowError, MemoryError, ValueError),
2400 bufio.__init__, rawio, sys.maxsize)
2401
2402 def test_garbage_collection(self):
2403 CBufferedReaderTest.test_garbage_collection(self)
2404 CBufferedWriterTest.test_garbage_collection(self)
2405
R David Murray67bfe802013-02-23 21:51:05 -05002406 def test_args_error(self):
2407 # Issue #17275
2408 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2409 self.tp(io.BytesIO(), 1024, 1024, 1024)
2410
2411
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002412class PyBufferedRandomTest(BufferedRandomTest):
2413 tp = pyio.BufferedRandom
2414
2415
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002416# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2417# properties:
2418# - A single output character can correspond to many bytes of input.
2419# - The number of input bytes to complete the character can be
2420# undetermined until the last input byte is received.
2421# - The number of input bytes can vary depending on previous input.
2422# - A single input byte can correspond to many characters of output.
2423# - The number of output characters can be undetermined until the
2424# last input byte is received.
2425# - The number of output characters can vary depending on previous input.
2426
2427class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2428 """
2429 For testing seek/tell behavior with a stateful, buffering decoder.
2430
2431 Input is a sequence of words. Words may be fixed-length (length set
2432 by input) or variable-length (period-terminated). In variable-length
2433 mode, extra periods are ignored. Possible words are:
2434 - 'i' followed by a number sets the input length, I (maximum 99).
2435 When I is set to 0, words are space-terminated.
2436 - 'o' followed by a number sets the output length, O (maximum 99).
2437 - Any other word is converted into a word followed by a period on
2438 the output. The output word consists of the input word truncated
2439 or padded out with hyphens to make its length equal to O. If O
2440 is 0, the word is output verbatim without truncating or padding.
2441 I and O are initially set to 1. When I changes, any buffered input is
2442 re-scanned according to the new I. EOF also terminates the last word.
2443 """
2444
2445 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002446 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002447 self.reset()
2448
2449 def __repr__(self):
2450 return '<SID %x>' % id(self)
2451
2452 def reset(self):
2453 self.i = 1
2454 self.o = 1
2455 self.buffer = bytearray()
2456
2457 def getstate(self):
2458 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2459 return bytes(self.buffer), i*100 + o
2460
2461 def setstate(self, state):
2462 buffer, io = state
2463 self.buffer = bytearray(buffer)
2464 i, o = divmod(io, 100)
2465 self.i, self.o = i ^ 1, o ^ 1
2466
2467 def decode(self, input, final=False):
2468 output = ''
2469 for b in input:
2470 if self.i == 0: # variable-length, terminated with period
2471 if b == ord('.'):
2472 if self.buffer:
2473 output += self.process_word()
2474 else:
2475 self.buffer.append(b)
2476 else: # fixed-length, terminate after self.i bytes
2477 self.buffer.append(b)
2478 if len(self.buffer) == self.i:
2479 output += self.process_word()
2480 if final and self.buffer: # EOF terminates the last word
2481 output += self.process_word()
2482 return output
2483
2484 def process_word(self):
2485 output = ''
2486 if self.buffer[0] == ord('i'):
2487 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2488 elif self.buffer[0] == ord('o'):
2489 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2490 else:
2491 output = self.buffer.decode('ascii')
2492 if len(output) < self.o:
2493 output += '-'*self.o # pad out with hyphens
2494 if self.o:
2495 output = output[:self.o] # truncate to output length
2496 output += '.'
2497 self.buffer = bytearray()
2498 return output
2499
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002500 codecEnabled = False
2501
2502 @classmethod
2503 def lookupTestDecoder(cls, name):
2504 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002505 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002506 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002507 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002508 incrementalencoder=None,
2509 streamreader=None, streamwriter=None,
2510 incrementaldecoder=cls)
2511
2512# Register the previous decoder for testing.
2513# Disabled by default, tests will enable it.
2514codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2515
2516
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002517class StatefulIncrementalDecoderTest(unittest.TestCase):
2518 """
2519 Make sure the StatefulIncrementalDecoder actually works.
2520 """
2521
2522 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002523 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002524 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002525 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002526 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002527 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002528 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002529 # I=0, O=6 (variable-length input, fixed-length output)
2530 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2531 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002532 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002533 # I=6, O=3 (fixed-length input > fixed-length output)
2534 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2535 # I=0, then 3; O=29, then 15 (with longer output)
2536 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2537 'a----------------------------.' +
2538 'b----------------------------.' +
2539 'cde--------------------------.' +
2540 'abcdefghijabcde.' +
2541 'a.b------------.' +
2542 '.c.------------.' +
2543 'd.e------------.' +
2544 'k--------------.' +
2545 'l--------------.' +
2546 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002547 ]
2548
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002549 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002550 # Try a few one-shot test cases.
2551 for input, eof, output in self.test_cases:
2552 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002553 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002554
2555 # Also test an unfinished decode, followed by forcing EOF.
2556 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002557 self.assertEqual(d.decode(b'oiabcd'), '')
2558 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002559
2560class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002561
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002562 def setUp(self):
2563 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2564 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002565 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002566
Guido van Rossumd0712812007-04-11 16:32:43 +00002567 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002568 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002569
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002570 def test_constructor(self):
2571 r = self.BytesIO(b"\xc3\xa9\n\n")
2572 b = self.BufferedReader(r, 1000)
2573 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002574 t.__init__(b, encoding="latin-1", newline="\r\n")
2575 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002576 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002577 t.__init__(b, encoding="utf-8", line_buffering=True)
2578 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002579 self.assertEqual(t.line_buffering, True)
2580 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002581 self.assertRaises(TypeError, t.__init__, b, newline=42)
2582 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2583
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002584 def test_uninitialized(self):
2585 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2586 del t
2587 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2588 self.assertRaises(Exception, repr, t)
2589 self.assertRaisesRegex((ValueError, AttributeError),
2590 'uninitialized|has no attribute',
2591 t.read, 0)
2592 t.__init__(self.MockRawIO())
2593 self.assertEqual(t.read(0), '')
2594
Nick Coghlana9b15242014-02-04 22:11:18 +10002595 def test_non_text_encoding_codecs_are_rejected(self):
2596 # Ensure the constructor complains if passed a codec that isn't
2597 # marked as a text encoding
2598 # http://bugs.python.org/issue20404
2599 r = self.BytesIO()
2600 b = self.BufferedWriter(r)
2601 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2602 self.TextIOWrapper(b, encoding="hex")
2603
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002604 def test_detach(self):
2605 r = self.BytesIO()
2606 b = self.BufferedWriter(r)
2607 t = self.TextIOWrapper(b)
2608 self.assertIs(t.detach(), b)
2609
2610 t = self.TextIOWrapper(b, encoding="ascii")
2611 t.write("howdy")
2612 self.assertFalse(r.getvalue())
2613 t.detach()
2614 self.assertEqual(r.getvalue(), b"howdy")
2615 self.assertRaises(ValueError, t.detach)
2616
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002617 # Operations independent of the detached stream should still work
2618 repr(t)
2619 self.assertEqual(t.encoding, "ascii")
2620 self.assertEqual(t.errors, "strict")
2621 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002622 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002623
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002624 def test_repr(self):
2625 raw = self.BytesIO("hello".encode("utf-8"))
2626 b = self.BufferedReader(raw)
2627 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002628 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002629 self.assertRegex(repr(t),
2630 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002631 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002632 self.assertRegex(repr(t),
2633 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002634 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002635 self.assertRegex(repr(t),
2636 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002637 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002638 self.assertRegex(repr(t),
2639 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002640
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002641 t.buffer.detach()
2642 repr(t) # Should not raise an exception
2643
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002644 def test_recursive_repr(self):
2645 # Issue #25455
2646 raw = self.BytesIO()
2647 t = self.TextIOWrapper(raw)
2648 with support.swap_attr(raw, 'name', t):
2649 try:
2650 repr(t) # Should not crash
2651 except RuntimeError:
2652 pass
2653
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002654 def test_line_buffering(self):
2655 r = self.BytesIO()
2656 b = self.BufferedWriter(r, 1000)
2657 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002658 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002659 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002660 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002661 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002662 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002663 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002664
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002665 def test_reconfigure_line_buffering(self):
2666 r = self.BytesIO()
2667 b = self.BufferedWriter(r, 1000)
2668 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2669 t.write("AB\nC")
2670 self.assertEqual(r.getvalue(), b"")
2671
2672 t.reconfigure(line_buffering=True) # implicit flush
2673 self.assertEqual(r.getvalue(), b"AB\nC")
2674 t.write("DEF\nG")
2675 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2676 t.write("H")
2677 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2678 t.reconfigure(line_buffering=False) # implicit flush
2679 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2680 t.write("IJ")
2681 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2682
2683 # Keeping default value
2684 t.reconfigure()
2685 t.reconfigure(line_buffering=None)
2686 self.assertEqual(t.line_buffering, False)
2687 t.reconfigure(line_buffering=True)
2688 t.reconfigure()
2689 t.reconfigure(line_buffering=None)
2690 self.assertEqual(t.line_buffering, True)
2691
Victor Stinner91106cd2017-12-13 12:29:09 +01002692 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002693 def test_default_encoding(self):
2694 old_environ = dict(os.environ)
2695 try:
2696 # try to get a user preferred encoding different than the current
2697 # locale encoding to check that TextIOWrapper() uses the current
2698 # locale encoding and not the user preferred encoding
2699 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2700 if key in os.environ:
2701 del os.environ[key]
2702
2703 current_locale_encoding = locale.getpreferredencoding(False)
2704 b = self.BytesIO()
2705 t = self.TextIOWrapper(b)
2706 self.assertEqual(t.encoding, current_locale_encoding)
2707 finally:
2708 os.environ.clear()
2709 os.environ.update(old_environ)
2710
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002711 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002712 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002713 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002714 # Issue 15989
2715 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002716 b = self.BytesIO()
2717 b.fileno = lambda: _testcapi.INT_MAX + 1
2718 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2719 b.fileno = lambda: _testcapi.UINT_MAX + 1
2720 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 def test_encoding(self):
2723 # Check the encoding attribute is always set, and valid
2724 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002725 t = self.TextIOWrapper(b, encoding="utf-8")
2726 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002727 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002728 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 codecs.lookup(t.encoding)
2730
2731 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002732 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002733 b = self.BytesIO(b"abc\n\xff\n")
2734 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002735 self.assertRaises(UnicodeError, t.read)
2736 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002737 b = self.BytesIO(b"abc\n\xff\n")
2738 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002739 self.assertRaises(UnicodeError, t.read)
2740 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002741 b = self.BytesIO(b"abc\n\xff\n")
2742 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002743 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002744 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002745 b = self.BytesIO(b"abc\n\xff\n")
2746 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002747 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002748
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002749 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002750 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002751 b = self.BytesIO()
2752 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002753 self.assertRaises(UnicodeError, t.write, "\xff")
2754 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002755 b = self.BytesIO()
2756 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002757 self.assertRaises(UnicodeError, t.write, "\xff")
2758 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002759 b = self.BytesIO()
2760 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002761 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002762 t.write("abc\xffdef\n")
2763 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002764 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002765 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002766 b = self.BytesIO()
2767 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002768 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002769 t.write("abc\xffdef\n")
2770 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002771 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002772
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002773 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002774 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2775
2776 tests = [
2777 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002778 [ '', input_lines ],
2779 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2780 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2781 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002782 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002783 encodings = (
2784 'utf-8', 'latin-1',
2785 'utf-16', 'utf-16-le', 'utf-16-be',
2786 'utf-32', 'utf-32-le', 'utf-32-be',
2787 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002788
Guido van Rossum8358db22007-08-18 21:39:55 +00002789 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002790 # character in TextIOWrapper._pending_line.
2791 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002792 # XXX: str.encode() should return bytes
2793 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002794 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002795 for bufsize in range(1, 10):
2796 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002797 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2798 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002799 encoding=encoding)
2800 if do_reads:
2801 got_lines = []
2802 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002803 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002804 if c2 == '':
2805 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002806 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002807 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002808 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002809 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002810
2811 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002812 self.assertEqual(got_line, exp_line)
2813 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002814
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002815 def test_newlines_input(self):
2816 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002817 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2818 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002819 (None, normalized.decode("ascii").splitlines(keepends=True)),
2820 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002821 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2822 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2823 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002824 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002825 buf = self.BytesIO(testdata)
2826 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002827 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002828 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002829 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002830
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002831 def test_newlines_output(self):
2832 testdict = {
2833 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2834 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2835 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2836 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2837 }
2838 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2839 for newline, expected in tests:
2840 buf = self.BytesIO()
2841 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2842 txt.write("AAA\nB")
2843 txt.write("BB\nCCC\n")
2844 txt.write("X\rY\r\nZ")
2845 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002846 self.assertEqual(buf.closed, False)
2847 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002848
2849 def test_destructor(self):
2850 l = []
2851 base = self.BytesIO
2852 class MyBytesIO(base):
2853 def close(self):
2854 l.append(self.getvalue())
2855 base.close(self)
2856 b = MyBytesIO()
2857 t = self.TextIOWrapper(b, encoding="ascii")
2858 t.write("abc")
2859 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002860 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002861 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002862
2863 def test_override_destructor(self):
2864 record = []
2865 class MyTextIO(self.TextIOWrapper):
2866 def __del__(self):
2867 record.append(1)
2868 try:
2869 f = super().__del__
2870 except AttributeError:
2871 pass
2872 else:
2873 f()
2874 def close(self):
2875 record.append(2)
2876 super().close()
2877 def flush(self):
2878 record.append(3)
2879 super().flush()
2880 b = self.BytesIO()
2881 t = MyTextIO(b, encoding="ascii")
2882 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002883 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002884 self.assertEqual(record, [1, 2, 3])
2885
2886 def test_error_through_destructor(self):
2887 # Test that the exception state is not modified by a destructor,
2888 # even if close() fails.
2889 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002890 with support.catch_unraisable_exception() as cm:
2891 with self.assertRaises(AttributeError):
2892 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002893
2894 if not IOBASE_EMITS_UNRAISABLE:
2895 self.assertIsNone(cm.unraisable)
2896 elif cm.unraisable is not None:
2897 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002898
Guido van Rossum9b76da62007-04-11 01:09:03 +00002899 # Systematic tests of the text I/O API
2900
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002901 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002902 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002903 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002904 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002905 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002906 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002907 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002908 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002909 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002910 self.assertEqual(f.tell(), 0)
2911 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002912 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002913 self.assertEqual(f.seek(0), 0)
2914 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002915 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002916 self.assertEqual(f.read(2), "ab")
2917 self.assertEqual(f.read(1), "c")
2918 self.assertEqual(f.read(1), "")
2919 self.assertEqual(f.read(), "")
2920 self.assertEqual(f.tell(), cookie)
2921 self.assertEqual(f.seek(0), 0)
2922 self.assertEqual(f.seek(0, 2), cookie)
2923 self.assertEqual(f.write("def"), 3)
2924 self.assertEqual(f.seek(cookie), cookie)
2925 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002926 if enc.startswith("utf"):
2927 self.multi_line_test(f, enc)
2928 f.close()
2929
2930 def multi_line_test(self, f, enc):
2931 f.seek(0)
2932 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002933 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002934 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002935 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002936 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002937 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002938 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002939 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002940 wlines.append((f.tell(), line))
2941 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002942 f.seek(0)
2943 rlines = []
2944 while True:
2945 pos = f.tell()
2946 line = f.readline()
2947 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002948 break
2949 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002950 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002951
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002952 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002953 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002954 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002955 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002956 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002957 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002958 p2 = f.tell()
2959 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002960 self.assertEqual(f.tell(), p0)
2961 self.assertEqual(f.readline(), "\xff\n")
2962 self.assertEqual(f.tell(), p1)
2963 self.assertEqual(f.readline(), "\xff\n")
2964 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002965 f.seek(0)
2966 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002967 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002968 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002969 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002970 f.close()
2971
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002972 def test_seeking(self):
2973 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002974 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002975 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002976 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002977 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002978 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002979 suffix = bytes(u_suffix.encode("utf-8"))
2980 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002981 with self.open(support.TESTFN, "wb") as f:
2982 f.write(line*2)
2983 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2984 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002985 self.assertEqual(s, str(prefix, "ascii"))
2986 self.assertEqual(f.tell(), prefix_size)
2987 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002988
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002989 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002990 # Regression test for a specific bug
2991 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002992 with self.open(support.TESTFN, "wb") as f:
2993 f.write(data)
2994 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2995 f._CHUNK_SIZE # Just test that it exists
2996 f._CHUNK_SIZE = 2
2997 f.readline()
2998 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002999
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003000 def test_seek_and_tell(self):
3001 #Test seek/tell using the StatefulIncrementalDecoder.
3002 # Make test faster by doing smaller seeks
3003 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003004
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003005 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003006 """Tell/seek to various points within a data stream and ensure
3007 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003008 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003009 f.write(data)
3010 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003011 f = self.open(support.TESTFN, encoding='test_decoder')
3012 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003013 decoded = f.read()
3014 f.close()
3015
Neal Norwitze2b07052008-03-18 19:52:05 +00003016 for i in range(min_pos, len(decoded) + 1): # seek positions
3017 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003018 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003019 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003020 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003021 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003022 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003023 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003024 f.close()
3025
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003026 # Enable the test decoder.
3027 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003028
3029 # Run the tests.
3030 try:
3031 # Try each test case.
3032 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003033 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003034
3035 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003036 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3037 offset = CHUNK_SIZE - len(input)//2
3038 prefix = b'.'*offset
3039 # Don't bother seeking into the prefix (takes too long).
3040 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003041 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003042
3043 # Ensure our test decoder won't interfere with subsequent tests.
3044 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003045 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003046
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003047 def test_multibyte_seek_and_tell(self):
3048 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3049 f.write("AB\n\u3046\u3048\n")
3050 f.close()
3051
3052 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3053 self.assertEqual(f.readline(), "AB\n")
3054 p0 = f.tell()
3055 self.assertEqual(f.readline(), "\u3046\u3048\n")
3056 p1 = f.tell()
3057 f.seek(p0)
3058 self.assertEqual(f.readline(), "\u3046\u3048\n")
3059 self.assertEqual(f.tell(), p1)
3060 f.close()
3061
3062 def test_seek_with_encoder_state(self):
3063 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3064 f.write("\u00e6\u0300")
3065 p0 = f.tell()
3066 f.write("\u00e6")
3067 f.seek(p0)
3068 f.write("\u0300")
3069 f.close()
3070
3071 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3072 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3073 f.close()
3074
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003075 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003076 data = "1234567890"
3077 tests = ("utf-16",
3078 "utf-16-le",
3079 "utf-16-be",
3080 "utf-32",
3081 "utf-32-le",
3082 "utf-32-be")
3083 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003084 buf = self.BytesIO()
3085 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003086 # Check if the BOM is written only once (see issue1753).
3087 f.write(data)
3088 f.write(data)
3089 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003090 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003091 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003092 self.assertEqual(f.read(), data * 2)
3093 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003094
Benjamin Petersona1b49012009-03-31 23:11:32 +00003095 def test_unreadable(self):
3096 class UnReadable(self.BytesIO):
3097 def readable(self):
3098 return False
3099 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003100 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003101
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003102 def test_read_one_by_one(self):
3103 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003104 reads = ""
3105 while True:
3106 c = txt.read(1)
3107 if not c:
3108 break
3109 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003110 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003111
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003112 def test_readlines(self):
3113 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3114 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3115 txt.seek(0)
3116 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3117 txt.seek(0)
3118 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3119
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003120 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003121 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003122 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003123 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003124 reads = ""
3125 while True:
3126 c = txt.read(128)
3127 if not c:
3128 break
3129 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003130 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003131
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003132 def test_writelines(self):
3133 l = ['ab', 'cd', 'ef']
3134 buf = self.BytesIO()
3135 txt = self.TextIOWrapper(buf)
3136 txt.writelines(l)
3137 txt.flush()
3138 self.assertEqual(buf.getvalue(), b'abcdef')
3139
3140 def test_writelines_userlist(self):
3141 l = UserList(['ab', 'cd', 'ef'])
3142 buf = self.BytesIO()
3143 txt = self.TextIOWrapper(buf)
3144 txt.writelines(l)
3145 txt.flush()
3146 self.assertEqual(buf.getvalue(), b'abcdef')
3147
3148 def test_writelines_error(self):
3149 txt = self.TextIOWrapper(self.BytesIO())
3150 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3151 self.assertRaises(TypeError, txt.writelines, None)
3152 self.assertRaises(TypeError, txt.writelines, b'abc')
3153
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003154 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003155 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003156
3157 # read one char at a time
3158 reads = ""
3159 while True:
3160 c = txt.read(1)
3161 if not c:
3162 break
3163 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003164 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003165
3166 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003167 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003168 txt._CHUNK_SIZE = 4
3169
3170 reads = ""
3171 while True:
3172 c = txt.read(4)
3173 if not c:
3174 break
3175 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003176 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003177
3178 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003179 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003180 txt._CHUNK_SIZE = 4
3181
3182 reads = txt.read(4)
3183 reads += txt.read(4)
3184 reads += txt.readline()
3185 reads += txt.readline()
3186 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003187 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003188
3189 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003190 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003191 txt._CHUNK_SIZE = 4
3192
3193 reads = txt.read(4)
3194 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003195 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003196
3197 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003198 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003199 txt._CHUNK_SIZE = 4
3200
3201 reads = txt.read(4)
3202 pos = txt.tell()
3203 txt.seek(0)
3204 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003205 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003206
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003207 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003208 buffer = self.BytesIO(self.testdata)
3209 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003210
3211 self.assertEqual(buffer.seekable(), txt.seekable())
3212
Antoine Pitroue4501852009-05-14 18:55:55 +00003213 def test_append_bom(self):
3214 # The BOM is not written again when appending to a non-empty file
3215 filename = support.TESTFN
3216 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3217 with self.open(filename, 'w', encoding=charset) as f:
3218 f.write('aaa')
3219 pos = f.tell()
3220 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003221 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003222
3223 with self.open(filename, 'a', encoding=charset) as f:
3224 f.write('xxx')
3225 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003226 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003227
3228 def test_seek_bom(self):
3229 # Same test, but when seeking manually
3230 filename = support.TESTFN
3231 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3232 with self.open(filename, 'w', encoding=charset) as f:
3233 f.write('aaa')
3234 pos = f.tell()
3235 with self.open(filename, 'r+', encoding=charset) as f:
3236 f.seek(pos)
3237 f.write('zzz')
3238 f.seek(0)
3239 f.write('bbb')
3240 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003241 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003242
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003243 def test_seek_append_bom(self):
3244 # Same test, but first seek to the start and then to the end
3245 filename = support.TESTFN
3246 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3247 with self.open(filename, 'w', encoding=charset) as f:
3248 f.write('aaa')
3249 with self.open(filename, 'a', encoding=charset) as f:
3250 f.seek(0)
3251 f.seek(0, self.SEEK_END)
3252 f.write('xxx')
3253 with self.open(filename, 'rb') as f:
3254 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3255
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003256 def test_errors_property(self):
3257 with self.open(support.TESTFN, "w") as f:
3258 self.assertEqual(f.errors, "strict")
3259 with self.open(support.TESTFN, "w", errors="replace") as f:
3260 self.assertEqual(f.errors, "replace")
3261
Brett Cannon31f59292011-02-21 19:29:56 +00003262 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003263 def test_threads_write(self):
3264 # Issue6750: concurrent writes could duplicate data
3265 event = threading.Event()
3266 with self.open(support.TESTFN, "w", buffering=1) as f:
3267 def run(n):
3268 text = "Thread%03d\n" % n
3269 event.wait()
3270 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003271 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003272 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003273 with support.start_threads(threads, event.set):
3274 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003275 with self.open(support.TESTFN) as f:
3276 content = f.read()
3277 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003278 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003279
Antoine Pitrou6be88762010-05-03 16:48:20 +00003280 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003281 # Test that text file is closed despite failed flush
3282 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003283 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003284 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003285 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003286 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003287 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003288 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003289 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003290 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003291 self.assertTrue(txt.buffer.closed)
3292 self.assertTrue(closed) # flush() called
3293 self.assertFalse(closed[0]) # flush() called before file closed
3294 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003295 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003296
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003297 def test_close_error_on_close(self):
3298 buffer = self.BytesIO(self.testdata)
3299 def bad_flush():
3300 raise OSError('flush')
3301 def bad_close():
3302 raise OSError('close')
3303 buffer.close = bad_close
3304 txt = self.TextIOWrapper(buffer, encoding="ascii")
3305 txt.flush = bad_flush
3306 with self.assertRaises(OSError) as err: # exception not swallowed
3307 txt.close()
3308 self.assertEqual(err.exception.args, ('close',))
3309 self.assertIsInstance(err.exception.__context__, OSError)
3310 self.assertEqual(err.exception.__context__.args, ('flush',))
3311 self.assertFalse(txt.closed)
3312
Victor Stinner472f7942019-04-12 21:58:24 +02003313 # Silence destructor error
3314 buffer.close = lambda: None
3315 txt.flush = lambda: None
3316
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003317 def test_nonnormalized_close_error_on_close(self):
3318 # Issue #21677
3319 buffer = self.BytesIO(self.testdata)
3320 def bad_flush():
3321 raise non_existing_flush
3322 def bad_close():
3323 raise non_existing_close
3324 buffer.close = bad_close
3325 txt = self.TextIOWrapper(buffer, encoding="ascii")
3326 txt.flush = bad_flush
3327 with self.assertRaises(NameError) as err: # exception not swallowed
3328 txt.close()
3329 self.assertIn('non_existing_close', str(err.exception))
3330 self.assertIsInstance(err.exception.__context__, NameError)
3331 self.assertIn('non_existing_flush', str(err.exception.__context__))
3332 self.assertFalse(txt.closed)
3333
Victor Stinner472f7942019-04-12 21:58:24 +02003334 # Silence destructor error
3335 buffer.close = lambda: None
3336 txt.flush = lambda: None
3337
Antoine Pitrou6be88762010-05-03 16:48:20 +00003338 def test_multi_close(self):
3339 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3340 txt.close()
3341 txt.close()
3342 txt.close()
3343 self.assertRaises(ValueError, txt.flush)
3344
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003345 def test_unseekable(self):
3346 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3347 self.assertRaises(self.UnsupportedOperation, txt.tell)
3348 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3349
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003350 def test_readonly_attributes(self):
3351 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3352 buf = self.BytesIO(self.testdata)
3353 with self.assertRaises(AttributeError):
3354 txt.buffer = buf
3355
Antoine Pitroue96ec682011-07-23 21:46:35 +02003356 def test_rawio(self):
3357 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3358 # that subprocess.Popen() can have the required unbuffered
3359 # semantics with universal_newlines=True.
3360 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3361 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3362 # Reads
3363 self.assertEqual(txt.read(4), 'abcd')
3364 self.assertEqual(txt.readline(), 'efghi\n')
3365 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3366
3367 def test_rawio_write_through(self):
3368 # Issue #12591: with write_through=True, writes don't need a flush
3369 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3370 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3371 write_through=True)
3372 txt.write('1')
3373 txt.write('23\n4')
3374 txt.write('5')
3375 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3376
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003377 def test_bufio_write_through(self):
3378 # Issue #21396: write_through=True doesn't force a flush()
3379 # on the underlying binary buffered object.
3380 flush_called, write_called = [], []
3381 class BufferedWriter(self.BufferedWriter):
3382 def flush(self, *args, **kwargs):
3383 flush_called.append(True)
3384 return super().flush(*args, **kwargs)
3385 def write(self, *args, **kwargs):
3386 write_called.append(True)
3387 return super().write(*args, **kwargs)
3388
3389 rawio = self.BytesIO()
3390 data = b"a"
3391 bufio = BufferedWriter(rawio, len(data)*2)
3392 textio = self.TextIOWrapper(bufio, encoding='ascii',
3393 write_through=True)
3394 # write to the buffered io but don't overflow the buffer
3395 text = data.decode('ascii')
3396 textio.write(text)
3397
3398 # buffer.flush is not called with write_through=True
3399 self.assertFalse(flush_called)
3400 # buffer.write *is* called with write_through=True
3401 self.assertTrue(write_called)
3402 self.assertEqual(rawio.getvalue(), b"") # no flush
3403
3404 write_called = [] # reset
3405 textio.write(text * 10) # total content is larger than bufio buffer
3406 self.assertTrue(write_called)
3407 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3408
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003409 def test_reconfigure_write_through(self):
3410 raw = self.MockRawIO([])
3411 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3412 t.write('1')
3413 t.reconfigure(write_through=True) # implied flush
3414 self.assertEqual(t.write_through, True)
3415 self.assertEqual(b''.join(raw._write_stack), b'1')
3416 t.write('23')
3417 self.assertEqual(b''.join(raw._write_stack), b'123')
3418 t.reconfigure(write_through=False)
3419 self.assertEqual(t.write_through, False)
3420 t.write('45')
3421 t.flush()
3422 self.assertEqual(b''.join(raw._write_stack), b'12345')
3423 # Keeping default value
3424 t.reconfigure()
3425 t.reconfigure(write_through=None)
3426 self.assertEqual(t.write_through, False)
3427 t.reconfigure(write_through=True)
3428 t.reconfigure()
3429 t.reconfigure(write_through=None)
3430 self.assertEqual(t.write_through, True)
3431
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003432 def test_read_nonbytes(self):
3433 # Issue #17106
3434 # Crash when underlying read() returns non-bytes
3435 t = self.TextIOWrapper(self.StringIO('a'))
3436 self.assertRaises(TypeError, t.read, 1)
3437 t = self.TextIOWrapper(self.StringIO('a'))
3438 self.assertRaises(TypeError, t.readline)
3439 t = self.TextIOWrapper(self.StringIO('a'))
3440 self.assertRaises(TypeError, t.read)
3441
Oren Milmana5b4ea12017-08-25 21:14:54 +03003442 def test_illegal_encoder(self):
3443 # Issue 31271: Calling write() while the return value of encoder's
3444 # encode() is invalid shouldn't cause an assertion failure.
3445 rot13 = codecs.lookup("rot13")
3446 with support.swap_attr(rot13, '_is_text_encoding', True):
3447 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3448 self.assertRaises(TypeError, t.write, 'bar')
3449
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003450 def test_illegal_decoder(self):
3451 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003452 # Bypass the early encoding check added in issue 20404
3453 def _make_illegal_wrapper():
3454 quopri = codecs.lookup("quopri")
3455 quopri._is_text_encoding = True
3456 try:
3457 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3458 newline='\n', encoding="quopri")
3459 finally:
3460 quopri._is_text_encoding = False
3461 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003462 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003463 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003464 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003465 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003466 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003467 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003468 self.assertRaises(TypeError, t.read)
3469
Oren Milmanba7d7362017-08-29 11:58:27 +03003470 # Issue 31243: calling read() while the return value of decoder's
3471 # getstate() is invalid should neither crash the interpreter nor
3472 # raise a SystemError.
3473 def _make_very_illegal_wrapper(getstate_ret_val):
3474 class BadDecoder:
3475 def getstate(self):
3476 return getstate_ret_val
3477 def _get_bad_decoder(dummy):
3478 return BadDecoder()
3479 quopri = codecs.lookup("quopri")
3480 with support.swap_attr(quopri, 'incrementaldecoder',
3481 _get_bad_decoder):
3482 return _make_illegal_wrapper()
3483 t = _make_very_illegal_wrapper(42)
3484 self.assertRaises(TypeError, t.read, 42)
3485 t = _make_very_illegal_wrapper(())
3486 self.assertRaises(TypeError, t.read, 42)
3487 t = _make_very_illegal_wrapper((1, 2))
3488 self.assertRaises(TypeError, t.read, 42)
3489
Antoine Pitrou712cb732013-12-21 15:51:54 +01003490 def _check_create_at_shutdown(self, **kwargs):
3491 # Issue #20037: creating a TextIOWrapper at shutdown
3492 # shouldn't crash the interpreter.
3493 iomod = self.io.__name__
3494 code = """if 1:
3495 import codecs
3496 import {iomod} as io
3497
3498 # Avoid looking up codecs at shutdown
3499 codecs.lookup('utf-8')
3500
3501 class C:
3502 def __init__(self):
3503 self.buf = io.BytesIO()
3504 def __del__(self):
3505 io.TextIOWrapper(self.buf, **{kwargs})
3506 print("ok")
3507 c = C()
3508 """.format(iomod=iomod, kwargs=kwargs)
3509 return assert_python_ok("-c", code)
3510
3511 def test_create_at_shutdown_without_encoding(self):
3512 rc, out, err = self._check_create_at_shutdown()
3513 if err:
3514 # Can error out with a RuntimeError if the module state
3515 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003516 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003517 else:
3518 self.assertEqual("ok", out.decode().strip())
3519
3520 def test_create_at_shutdown_with_encoding(self):
3521 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3522 errors='strict')
3523 self.assertFalse(err)
3524 self.assertEqual("ok", out.decode().strip())
3525
Antoine Pitroub8503892014-04-29 10:14:02 +02003526 def test_read_byteslike(self):
3527 r = MemviewBytesIO(b'Just some random string\n')
3528 t = self.TextIOWrapper(r, 'utf-8')
3529
3530 # TextIOwrapper will not read the full string, because
3531 # we truncate it to a multiple of the native int size
3532 # so that we can construct a more complex memoryview.
3533 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3534
3535 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3536
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003537 def test_issue22849(self):
3538 class F(object):
3539 def readable(self): return True
3540 def writable(self): return True
3541 def seekable(self): return True
3542
3543 for i in range(10):
3544 try:
3545 self.TextIOWrapper(F(), encoding='utf-8')
3546 except Exception:
3547 pass
3548
3549 F.tell = lambda x: 0
3550 t = self.TextIOWrapper(F(), encoding='utf-8')
3551
INADA Naoki507434f2017-12-21 09:59:53 +09003552 def test_reconfigure_encoding_read(self):
3553 # latin1 -> utf8
3554 # (latin1 can decode utf-8 encoded string)
3555 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3556 raw = self.BytesIO(data)
3557 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3558 self.assertEqual(txt.readline(), 'abc\xe9\n')
3559 with self.assertRaises(self.UnsupportedOperation):
3560 txt.reconfigure(encoding='utf-8')
3561 with self.assertRaises(self.UnsupportedOperation):
3562 txt.reconfigure(newline=None)
3563
3564 def test_reconfigure_write_fromascii(self):
3565 # ascii has a specific encodefunc in the C implementation,
3566 # but utf-8-sig has not. Make sure that we get rid of the
3567 # cached encodefunc when we switch encoders.
3568 raw = self.BytesIO()
3569 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3570 txt.write('foo\n')
3571 txt.reconfigure(encoding='utf-8-sig')
3572 txt.write('\xe9\n')
3573 txt.flush()
3574 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3575
3576 def test_reconfigure_write(self):
3577 # latin -> utf8
3578 raw = self.BytesIO()
3579 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3580 txt.write('abc\xe9\n')
3581 txt.reconfigure(encoding='utf-8')
3582 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3583 txt.write('d\xe9f\n')
3584 txt.flush()
3585 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3586
3587 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3588 # the file
3589 raw = self.BytesIO()
3590 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3591 txt.write('abc\n')
3592 txt.reconfigure(encoding='utf-8-sig')
3593 txt.write('d\xe9f\n')
3594 txt.flush()
3595 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3596
3597 def test_reconfigure_write_non_seekable(self):
3598 raw = self.BytesIO()
3599 raw.seekable = lambda: False
3600 raw.seek = None
3601 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3602 txt.write('abc\n')
3603 txt.reconfigure(encoding='utf-8-sig')
3604 txt.write('d\xe9f\n')
3605 txt.flush()
3606
3607 # If the raw stream is not seekable, there'll be a BOM
3608 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3609
3610 def test_reconfigure_defaults(self):
3611 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3612 txt.reconfigure(encoding=None)
3613 self.assertEqual(txt.encoding, 'ascii')
3614 self.assertEqual(txt.errors, 'replace')
3615 txt.write('LF\n')
3616
3617 txt.reconfigure(newline='\r\n')
3618 self.assertEqual(txt.encoding, 'ascii')
3619 self.assertEqual(txt.errors, 'replace')
3620
3621 txt.reconfigure(errors='ignore')
3622 self.assertEqual(txt.encoding, 'ascii')
3623 self.assertEqual(txt.errors, 'ignore')
3624 txt.write('CRLF\n')
3625
3626 txt.reconfigure(encoding='utf-8', newline=None)
3627 self.assertEqual(txt.errors, 'strict')
3628 txt.seek(0)
3629 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3630
3631 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3632
3633 def test_reconfigure_newline(self):
3634 raw = self.BytesIO(b'CR\rEOF')
3635 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3636 txt.reconfigure(newline=None)
3637 self.assertEqual(txt.readline(), 'CR\n')
3638 raw = self.BytesIO(b'CR\rEOF')
3639 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3640 txt.reconfigure(newline='')
3641 self.assertEqual(txt.readline(), 'CR\r')
3642 raw = self.BytesIO(b'CR\rLF\nEOF')
3643 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3644 txt.reconfigure(newline='\n')
3645 self.assertEqual(txt.readline(), 'CR\rLF\n')
3646 raw = self.BytesIO(b'LF\nCR\rEOF')
3647 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3648 txt.reconfigure(newline='\r')
3649 self.assertEqual(txt.readline(), 'LF\nCR\r')
3650 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3651 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3652 txt.reconfigure(newline='\r\n')
3653 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3654
3655 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3656 txt.reconfigure(newline=None)
3657 txt.write('linesep\n')
3658 txt.reconfigure(newline='')
3659 txt.write('LF\n')
3660 txt.reconfigure(newline='\n')
3661 txt.write('LF\n')
3662 txt.reconfigure(newline='\r')
3663 txt.write('CR\n')
3664 txt.reconfigure(newline='\r\n')
3665 txt.write('CRLF\n')
3666 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3667 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3668
Zackery Spytz23db9352018-06-29 04:14:58 -06003669 def test_issue25862(self):
3670 # Assertion failures occurred in tell() after read() and write().
3671 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3672 t.read(1)
3673 t.read()
3674 t.tell()
3675 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3676 t.read(1)
3677 t.write('x')
3678 t.tell()
3679
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003680
Antoine Pitroub8503892014-04-29 10:14:02 +02003681class MemviewBytesIO(io.BytesIO):
3682 '''A BytesIO object whose read method returns memoryviews
3683 rather than bytes'''
3684
3685 def read1(self, len_):
3686 return _to_memoryview(super().read1(len_))
3687
3688 def read(self, len_):
3689 return _to_memoryview(super().read(len_))
3690
3691def _to_memoryview(buf):
3692 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3693
3694 arr = array.array('i')
3695 idx = len(buf) - len(buf) % arr.itemsize
3696 arr.frombytes(buf[:idx])
3697 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003698
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003700class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003701 io = io
Eddie Elizondo4590f722020-02-04 02:29:25 -08003702 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003703
3704 def test_initialization(self):
3705 r = self.BytesIO(b"\xc3\xa9\n\n")
3706 b = self.BufferedReader(r, 1000)
3707 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003708 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3709 self.assertRaises(ValueError, t.read)
3710
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003711 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3712 self.assertRaises(Exception, repr, t)
3713
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003714 def test_garbage_collection(self):
3715 # C TextIOWrapper objects are collected, and collecting them flushes
3716 # all data to disk.
3717 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003718 with support.check_warnings(('', ResourceWarning)):
3719 rawio = io.FileIO(support.TESTFN, "wb")
3720 b = self.BufferedWriter(rawio)
3721 t = self.TextIOWrapper(b, encoding="ascii")
3722 t.write("456def")
3723 t.x = t
3724 wr = weakref.ref(t)
3725 del t
3726 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003727 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003728 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003729 self.assertEqual(f.read(), b"456def")
3730
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003731 def test_rwpair_cleared_before_textio(self):
3732 # Issue 13070: TextIOWrapper's finalization would crash when called
3733 # after the reference to the underlying BufferedRWPair's writer got
3734 # cleared by the GC.
3735 for i in range(1000):
3736 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3737 t1 = self.TextIOWrapper(b1, encoding="ascii")
3738 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3739 t2 = self.TextIOWrapper(b2, encoding="ascii")
3740 # circular references
3741 t1.buddy = t2
3742 t2.buddy = t1
3743 support.gc_collect()
3744
Zackery Spytz842acaa2018-12-17 07:52:45 -07003745 def test_del__CHUNK_SIZE_SystemError(self):
3746 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3747 with self.assertRaises(AttributeError):
3748 del t._CHUNK_SIZE
3749
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003751class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003752 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003753 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003754
3755
3756class IncrementalNewlineDecoderTest(unittest.TestCase):
3757
3758 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003759 # UTF-8 specific tests for a newline decoder
3760 def _check_decode(b, s, **kwargs):
3761 # We exercise getstate() / setstate() as well as decode()
3762 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003763 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003764 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003765 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003766
Antoine Pitrou180a3362008-12-14 16:36:46 +00003767 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003768
Antoine Pitrou180a3362008-12-14 16:36:46 +00003769 _check_decode(b'\xe8', "")
3770 _check_decode(b'\xa2', "")
3771 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003772
Antoine Pitrou180a3362008-12-14 16:36:46 +00003773 _check_decode(b'\xe8', "")
3774 _check_decode(b'\xa2', "")
3775 _check_decode(b'\x88', "\u8888")
3776
3777 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003778 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3779
Antoine Pitrou180a3362008-12-14 16:36:46 +00003780 decoder.reset()
3781 _check_decode(b'\n', "\n")
3782 _check_decode(b'\r', "")
3783 _check_decode(b'', "\n", final=True)
3784 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003785
Antoine Pitrou180a3362008-12-14 16:36:46 +00003786 _check_decode(b'\r', "")
3787 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003788
Antoine Pitrou180a3362008-12-14 16:36:46 +00003789 _check_decode(b'\r\r\n', "\n\n")
3790 _check_decode(b'\r', "")
3791 _check_decode(b'\r', "\n")
3792 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003793
Antoine Pitrou180a3362008-12-14 16:36:46 +00003794 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3795 _check_decode(b'\xe8\xa2\x88', "\u8888")
3796 _check_decode(b'\n', "\n")
3797 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3798 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003799
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003800 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003801 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003802 if encoding is not None:
3803 encoder = codecs.getincrementalencoder(encoding)()
3804 def _decode_bytewise(s):
3805 # Decode one byte at a time
3806 for b in encoder.encode(s):
3807 result.append(decoder.decode(bytes([b])))
3808 else:
3809 encoder = None
3810 def _decode_bytewise(s):
3811 # Decode one char at a time
3812 for c in s:
3813 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003814 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003815 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003816 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003817 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003818 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003819 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003820 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003821 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003822 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003823 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003824 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003825 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003826 input = "abc"
3827 if encoder is not None:
3828 encoder.reset()
3829 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003830 self.assertEqual(decoder.decode(input), "abc")
3831 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003832
3833 def test_newline_decoder(self):
3834 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003835 # None meaning the IncrementalNewlineDecoder takes unicode input
3836 # rather than bytes input
3837 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003838 'utf-16', 'utf-16-le', 'utf-16-be',
3839 'utf-32', 'utf-32-le', 'utf-32-be',
3840 )
3841 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003842 decoder = enc and codecs.getincrementaldecoder(enc)()
3843 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3844 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003845 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003846 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3847 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003848 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003849
Antoine Pitrou66913e22009-03-06 23:40:56 +00003850 def test_newline_bytes(self):
3851 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3852 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003853 self.assertEqual(dec.newlines, None)
3854 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3855 self.assertEqual(dec.newlines, None)
3856 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3857 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003858 dec = self.IncrementalNewlineDecoder(None, translate=False)
3859 _check(dec)
3860 dec = self.IncrementalNewlineDecoder(None, translate=True)
3861 _check(dec)
3862
Xiang Zhangb08746b2018-10-31 19:49:16 +08003863 def test_translate(self):
3864 # issue 35062
3865 for translate in (-2, -1, 1, 2):
3866 decoder = codecs.getincrementaldecoder("utf-8")()
3867 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3868 self.check_newline_decoding_utf8(decoder)
3869 decoder = codecs.getincrementaldecoder("utf-8")()
3870 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3871 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003873class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3874 pass
3875
3876class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3877 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003878
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003879
Guido van Rossum01a27522007-03-07 01:00:12 +00003880# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003881
Guido van Rossum5abbf752007-08-27 17:39:33 +00003882class MiscIOTest(unittest.TestCase):
3883
Barry Warsaw40e82462008-11-20 20:14:50 +00003884 def tearDown(self):
3885 support.unlink(support.TESTFN)
3886
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003887 def test___all__(self):
3888 for name in self.io.__all__:
3889 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003890 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003891 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003892 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003893 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003894 self.assertTrue(issubclass(obj, Exception), name)
3895 elif not name.startswith("SEEK_"):
3896 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003897
Barry Warsaw40e82462008-11-20 20:14:50 +00003898 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003899 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003900 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003901 f.close()
3902
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003903 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003904 self.assertEqual(f.mode, "w+")
3905 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3906 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003907
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003908 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003909 self.assertEqual(g.mode, "wb")
3910 self.assertEqual(g.raw.mode, "wb")
3911 self.assertEqual(g.name, f.fileno())
3912 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003913 f.close()
3914 g.close()
3915
Victor Stinnere471e722019-10-28 15:40:08 +01003916 def test_removed_u_mode(self):
3917 # "U" mode has been removed in Python 3.9
3918 for mode in ("U", "rU", "r+U"):
3919 with self.assertRaises(ValueError) as cm:
3920 self.open(support.TESTFN, mode)
3921 self.assertIn('invalid mode', str(cm.exception))
3922
Benjamin Peterson74fa9f72019-11-12 14:51:34 -08003923 def test_open_pipe_with_append(self):
3924 # bpo-27805: Ignore ESPIPE from lseek() in open().
3925 r, w = os.pipe()
3926 self.addCleanup(os.close, r)
3927 f = self.open(w, 'a')
3928 self.addCleanup(f.close)
3929 # Check that the file is marked non-seekable. On Windows, however, lseek
3930 # somehow succeeds on pipes.
3931 if sys.platform != 'win32':
3932 self.assertFalse(f.seekable())
3933
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003934 def test_io_after_close(self):
3935 for kwargs in [
3936 {"mode": "w"},
3937 {"mode": "wb"},
3938 {"mode": "w", "buffering": 1},
3939 {"mode": "w", "buffering": 2},
3940 {"mode": "wb", "buffering": 0},
3941 {"mode": "r"},
3942 {"mode": "rb"},
3943 {"mode": "r", "buffering": 1},
3944 {"mode": "r", "buffering": 2},
3945 {"mode": "rb", "buffering": 0},
3946 {"mode": "w+"},
3947 {"mode": "w+b"},
3948 {"mode": "w+", "buffering": 1},
3949 {"mode": "w+", "buffering": 2},
3950 {"mode": "w+b", "buffering": 0},
3951 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003952 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003953 f.close()
3954 self.assertRaises(ValueError, f.flush)
3955 self.assertRaises(ValueError, f.fileno)
3956 self.assertRaises(ValueError, f.isatty)
3957 self.assertRaises(ValueError, f.__iter__)
3958 if hasattr(f, "peek"):
3959 self.assertRaises(ValueError, f.peek, 1)
3960 self.assertRaises(ValueError, f.read)
3961 if hasattr(f, "read1"):
3962 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003963 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003964 if hasattr(f, "readall"):
3965 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003966 if hasattr(f, "readinto"):
3967 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003968 if hasattr(f, "readinto1"):
3969 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003970 self.assertRaises(ValueError, f.readline)
3971 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003972 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003973 self.assertRaises(ValueError, f.seek, 0)
3974 self.assertRaises(ValueError, f.tell)
3975 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003976 self.assertRaises(ValueError, f.write,
3977 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003978 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003979 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003980
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003981 def test_blockingioerror(self):
3982 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003983 class C(str):
3984 pass
3985 c = C("")
3986 b = self.BlockingIOError(1, c)
3987 c.b = b
3988 b.c = c
3989 wr = weakref.ref(c)
3990 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003991 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003992 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003993
3994 def test_abcs(self):
3995 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003996 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3997 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3998 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3999 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004000
4001 def _check_abc_inheritance(self, abcmodule):
4002 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004003 self.assertIsInstance(f, abcmodule.IOBase)
4004 self.assertIsInstance(f, abcmodule.RawIOBase)
4005 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4006 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004007 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004008 self.assertIsInstance(f, abcmodule.IOBase)
4009 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4010 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4011 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004012 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004013 self.assertIsInstance(f, abcmodule.IOBase)
4014 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4015 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4016 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004017
4018 def test_abc_inheritance(self):
4019 # Test implementations inherit from their respective ABCs
4020 self._check_abc_inheritance(self)
4021
4022 def test_abc_inheritance_official(self):
4023 # Test implementations inherit from the official ABCs of the
4024 # baseline "io" module.
4025 self._check_abc_inheritance(io)
4026
Antoine Pitroue033e062010-10-29 10:38:18 +00004027 def _check_warn_on_dealloc(self, *args, **kwargs):
4028 f = open(*args, **kwargs)
4029 r = repr(f)
4030 with self.assertWarns(ResourceWarning) as cm:
4031 f = None
4032 support.gc_collect()
4033 self.assertIn(r, str(cm.warning.args[0]))
4034
4035 def test_warn_on_dealloc(self):
4036 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4037 self._check_warn_on_dealloc(support.TESTFN, "wb")
4038 self._check_warn_on_dealloc(support.TESTFN, "w")
4039
4040 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4041 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004042 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004043 for fd in fds:
4044 try:
4045 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004046 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004047 if e.errno != errno.EBADF:
4048 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004049 self.addCleanup(cleanup_fds)
4050 r, w = os.pipe()
4051 fds += r, w
4052 self._check_warn_on_dealloc(r, *args, **kwargs)
4053 # When using closefd=False, there's no warning
4054 r, w = os.pipe()
4055 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004056 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004057 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004058
4059 def test_warn_on_dealloc_fd(self):
4060 self._check_warn_on_dealloc_fd("rb", buffering=0)
4061 self._check_warn_on_dealloc_fd("rb")
4062 self._check_warn_on_dealloc_fd("r")
4063
4064
Antoine Pitrou243757e2010-11-05 21:15:39 +00004065 def test_pickling(self):
4066 # Pickling file objects is forbidden
4067 for kwargs in [
4068 {"mode": "w"},
4069 {"mode": "wb"},
4070 {"mode": "wb", "buffering": 0},
4071 {"mode": "r"},
4072 {"mode": "rb"},
4073 {"mode": "rb", "buffering": 0},
4074 {"mode": "w+"},
4075 {"mode": "w+b"},
4076 {"mode": "w+b", "buffering": 0},
4077 ]:
4078 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4079 with self.open(support.TESTFN, **kwargs) as f:
4080 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4081
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004082 def test_nonblock_pipe_write_bigbuf(self):
4083 self._test_nonblock_pipe_write(16*1024)
4084
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004085 def test_nonblock_pipe_write_smallbuf(self):
4086 self._test_nonblock_pipe_write(1024)
4087
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004088 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4089 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004090 def _test_nonblock_pipe_write(self, bufsize):
4091 sent = []
4092 received = []
4093 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004094 os.set_blocking(r, False)
4095 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004096
4097 # To exercise all code paths in the C implementation we need
4098 # to play with buffer sizes. For instance, if we choose a
4099 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4100 # then we will never get a partial write of the buffer.
4101 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4102 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4103
4104 with rf, wf:
4105 for N in 9999, 73, 7574:
4106 try:
4107 i = 0
4108 while True:
4109 msg = bytes([i % 26 + 97]) * N
4110 sent.append(msg)
4111 wf.write(msg)
4112 i += 1
4113
4114 except self.BlockingIOError as e:
4115 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004116 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004117 sent[-1] = sent[-1][:e.characters_written]
4118 received.append(rf.read())
4119 msg = b'BLOCKED'
4120 wf.write(msg)
4121 sent.append(msg)
4122
4123 while True:
4124 try:
4125 wf.flush()
4126 break
4127 except self.BlockingIOError as e:
4128 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004129 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004130 self.assertEqual(e.characters_written, 0)
4131 received.append(rf.read())
4132
4133 received += iter(rf.read, None)
4134
4135 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004136 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004137 self.assertTrue(wf.closed)
4138 self.assertTrue(rf.closed)
4139
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004140 def test_create_fail(self):
4141 # 'x' mode fails if file is existing
4142 with self.open(support.TESTFN, 'w'):
4143 pass
4144 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4145
4146 def test_create_writes(self):
4147 # 'x' mode opens for writing
4148 with self.open(support.TESTFN, 'xb') as f:
4149 f.write(b"spam")
4150 with self.open(support.TESTFN, 'rb') as f:
4151 self.assertEqual(b"spam", f.read())
4152
Christian Heimes7b648752012-09-10 14:48:43 +02004153 def test_open_allargs(self):
4154 # there used to be a buffer overflow in the parser for rawmode
4155 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4156
Victor Stinner22eb6892019-06-26 00:51:05 +02004157 def test_check_encoding_errors(self):
4158 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4159 # arguments in dev mode
4160 mod = self.io.__name__
4161 filename = __file__
4162 invalid = 'Boom, Shaka Laka, Boom!'
4163 code = textwrap.dedent(f'''
4164 import sys
4165 from {mod} import open, TextIOWrapper
4166
4167 try:
4168 open({filename!r}, encoding={invalid!r})
4169 except LookupError:
4170 pass
4171 else:
4172 sys.exit(21)
4173
4174 try:
4175 open({filename!r}, errors={invalid!r})
4176 except LookupError:
4177 pass
4178 else:
4179 sys.exit(22)
4180
4181 fp = open({filename!r}, "rb")
4182 with fp:
4183 try:
4184 TextIOWrapper(fp, encoding={invalid!r})
4185 except LookupError:
4186 pass
4187 else:
4188 sys.exit(23)
4189
4190 try:
4191 TextIOWrapper(fp, errors={invalid!r})
4192 except LookupError:
4193 pass
4194 else:
4195 sys.exit(24)
4196
4197 sys.exit(10)
4198 ''')
4199 proc = assert_python_failure('-X', 'dev', '-c', code)
4200 self.assertEqual(proc.rc, 10, proc)
4201
Christian Heimes7b648752012-09-10 14:48:43 +02004202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004203class CMiscIOTest(MiscIOTest):
4204 io = io
4205
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004206 def test_readinto_buffer_overflow(self):
4207 # Issue #18025
4208 class BadReader(self.io.BufferedIOBase):
4209 def read(self, n=-1):
4210 return b'x' * 10**6
4211 bufio = BadReader()
4212 b = bytearray(2)
4213 self.assertRaises(ValueError, bufio.readinto, b)
4214
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004215 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4216 # Issue #23309: deadlocks at shutdown should be avoided when a
4217 # daemon thread and the main thread both write to a file.
4218 code = """if 1:
4219 import sys
4220 import time
4221 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004222 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004223
4224 file = sys.{stream_name}
4225
4226 def run():
4227 while True:
4228 file.write('.')
4229 file.flush()
4230
Victor Stinner2a1aed02017-04-21 17:59:23 +02004231 crash = SuppressCrashReport()
4232 crash.__enter__()
4233 # don't call __exit__(): the crash occurs at Python shutdown
4234
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004235 thread = threading.Thread(target=run)
4236 thread.daemon = True
4237 thread.start()
4238
4239 time.sleep(0.5)
4240 file.write('!')
4241 file.flush()
4242 """.format_map(locals())
4243 res, _ = run_python_until_end("-c", code)
4244 err = res.err.decode()
4245 if res.rc != 0:
4246 # Failure: should be a fatal error
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004247 pattern = (r"Fatal Python error: could not acquire lock "
4248 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4249 r"at interpreter shutdown, possibly due to "
4250 r"daemon threads".format_map(locals()))
4251 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004252 else:
4253 self.assertFalse(err.strip('.!'))
4254
4255 def test_daemon_threads_shutdown_stdout_deadlock(self):
4256 self.check_daemon_threads_shutdown_deadlock('stdout')
4257
4258 def test_daemon_threads_shutdown_stderr_deadlock(self):
4259 self.check_daemon_threads_shutdown_deadlock('stderr')
4260
4261
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004262class PyMiscIOTest(MiscIOTest):
4263 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004264
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004265
4266@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4267class SignalsTest(unittest.TestCase):
4268
4269 def setUp(self):
4270 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4271
4272 def tearDown(self):
4273 signal.signal(signal.SIGALRM, self.oldalrm)
4274
4275 def alarm_interrupt(self, sig, frame):
4276 1/0
4277
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004278 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4279 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004280 invokes the signal handler, and bubbles up the exception raised
4281 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004282 read_results = []
4283 def _read():
4284 s = os.read(r, 1)
4285 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004286
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004287 t = threading.Thread(target=_read)
4288 t.daemon = True
4289 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004290 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004291 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004292 try:
4293 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004294 if hasattr(signal, 'pthread_sigmask'):
4295 # create the thread with SIGALRM signal blocked
4296 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4297 t.start()
4298 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4299 else:
4300 t.start()
4301
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004302 # Fill the pipe enough that the write will be blocking.
4303 # It will be interrupted by the timer armed above. Since the
4304 # other thread has read one byte, the low-level write will
4305 # return with a successful (partial) result rather than an EINTR.
4306 # The buffered IO layer must check for pending signal
4307 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004308 signal.alarm(1)
4309 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004310 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004311 finally:
4312 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004313 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004314 # We got one byte, get another one and check that it isn't a
4315 # repeat of the first one.
4316 read_results.append(os.read(r, 1))
4317 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4318 finally:
4319 os.close(w)
4320 os.close(r)
4321 # This is deliberate. If we didn't close the file descriptor
4322 # before closing wio, wio would try to flush its internal
4323 # buffer, and block again.
4324 try:
4325 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004326 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004327 if e.errno != errno.EBADF:
4328 raise
4329
4330 def test_interrupted_write_unbuffered(self):
4331 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4332
4333 def test_interrupted_write_buffered(self):
4334 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4335
4336 def test_interrupted_write_text(self):
4337 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4338
Brett Cannon31f59292011-02-21 19:29:56 +00004339 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004340 def check_reentrant_write(self, data, **fdopen_kwargs):
4341 def on_alarm(*args):
4342 # Will be called reentrantly from the same thread
4343 wio.write(data)
4344 1/0
4345 signal.signal(signal.SIGALRM, on_alarm)
4346 r, w = os.pipe()
4347 wio = self.io.open(w, **fdopen_kwargs)
4348 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004349 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004350 # Either the reentrant call to wio.write() fails with RuntimeError,
4351 # or the signal handler raises ZeroDivisionError.
4352 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4353 while 1:
4354 for i in range(100):
4355 wio.write(data)
4356 wio.flush()
4357 # Make sure the buffer doesn't fill up and block further writes
4358 os.read(r, len(data) * 100)
4359 exc = cm.exception
4360 if isinstance(exc, RuntimeError):
4361 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4362 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004363 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004364 wio.close()
4365 os.close(r)
4366
4367 def test_reentrant_write_buffered(self):
4368 self.check_reentrant_write(b"xy", mode="wb")
4369
4370 def test_reentrant_write_text(self):
4371 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4372
Antoine Pitrou707ce822011-02-25 21:24:11 +00004373 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4374 """Check that a buffered read, when it gets interrupted (either
4375 returning a partial result or EINTR), properly invokes the signal
4376 handler and retries if the latter returned successfully."""
4377 r, w = os.pipe()
4378 fdopen_kwargs["closefd"] = False
4379 def alarm_handler(sig, frame):
4380 os.write(w, b"bar")
4381 signal.signal(signal.SIGALRM, alarm_handler)
4382 try:
4383 rio = self.io.open(r, **fdopen_kwargs)
4384 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004385 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004386 # Expected behaviour:
4387 # - first raw read() returns partial b"foo"
4388 # - second raw read() returns EINTR
4389 # - third raw read() returns b"bar"
4390 self.assertEqual(decode(rio.read(6)), "foobar")
4391 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004392 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004393 rio.close()
4394 os.close(w)
4395 os.close(r)
4396
Antoine Pitrou20db5112011-08-19 20:32:34 +02004397 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004398 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4399 mode="rb")
4400
Antoine Pitrou20db5112011-08-19 20:32:34 +02004401 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004402 self.check_interrupted_read_retry(lambda x: x,
4403 mode="r")
4404
Antoine Pitrou707ce822011-02-25 21:24:11 +00004405 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4406 """Check that a buffered write, when it gets interrupted (either
4407 returning a partial result or EINTR), properly invokes the signal
4408 handler and retries if the latter returned successfully."""
4409 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004410
Antoine Pitrou707ce822011-02-25 21:24:11 +00004411 # A quantity that exceeds the buffer size of an anonymous pipe's
4412 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004413 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004414 r, w = os.pipe()
4415 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004416
Antoine Pitrou707ce822011-02-25 21:24:11 +00004417 # We need a separate thread to read from the pipe and allow the
4418 # write() to finish. This thread is started after the SIGALRM is
4419 # received (forcing a first EINTR in write()).
4420 read_results = []
4421 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004422 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004423 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004424 try:
4425 while not write_finished:
4426 while r in select.select([r], [], [], 1.0)[0]:
4427 s = os.read(r, 1024)
4428 read_results.append(s)
4429 except BaseException as exc:
4430 nonlocal error
4431 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004432 t = threading.Thread(target=_read)
4433 t.daemon = True
4434 def alarm1(sig, frame):
4435 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004436 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004437 def alarm2(sig, frame):
4438 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004439
4440 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004441 signal.signal(signal.SIGALRM, alarm1)
4442 try:
4443 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004444 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004445 # Expected behaviour:
4446 # - first raw write() is partial (because of the limited pipe buffer
4447 # and the first alarm)
4448 # - second raw write() returns EINTR (because of the second alarm)
4449 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004450 written = wio.write(large_data)
4451 self.assertEqual(N, written)
4452
Antoine Pitrou707ce822011-02-25 21:24:11 +00004453 wio.flush()
4454 write_finished = True
4455 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004456
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004457 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004458 self.assertEqual(N, sum(len(x) for x in read_results))
4459 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004460 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004461 write_finished = True
4462 os.close(w)
4463 os.close(r)
4464 # This is deliberate. If we didn't close the file descriptor
4465 # before closing wio, wio would try to flush its internal
4466 # buffer, and could block (in case of failure).
4467 try:
4468 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004469 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004470 if e.errno != errno.EBADF:
4471 raise
4472
Antoine Pitrou20db5112011-08-19 20:32:34 +02004473 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004474 self.check_interrupted_write_retry(b"x", mode="wb")
4475
Antoine Pitrou20db5112011-08-19 20:32:34 +02004476 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004477 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4478
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004479
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004480class CSignalsTest(SignalsTest):
4481 io = io
4482
4483class PySignalsTest(SignalsTest):
4484 io = pyio
4485
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004486 # Handling reentrancy issues would slow down _pyio even more, so the
4487 # tests are disabled.
4488 test_reentrant_write_buffered = None
4489 test_reentrant_write_text = None
4490
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004491
Ezio Melottidaa42c72013-03-23 16:30:16 +02004492def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004493 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004494 CBufferedReaderTest, PyBufferedReaderTest,
4495 CBufferedWriterTest, PyBufferedWriterTest,
4496 CBufferedRWPairTest, PyBufferedRWPairTest,
4497 CBufferedRandomTest, PyBufferedRandomTest,
4498 StatefulIncrementalDecoderTest,
4499 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4500 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004501 CMiscIOTest, PyMiscIOTest,
4502 CSignalsTest, PySignalsTest,
4503 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004504
4505 # Put the namespaces of the IO module we are testing and some useful mock
4506 # classes in the __dict__ of each test.
4507 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004508 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4509 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004510 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4511 c_io_ns = {name : getattr(io, name) for name in all_members}
4512 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4513 globs = globals()
4514 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4515 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4516 # Avoid turning open into a bound method.
4517 py_io_ns["open"] = pyio.OpenWrapper
4518 for test in tests:
4519 if test.__name__.startswith("C"):
4520 for name, obj in c_io_ns.items():
4521 setattr(test, name, obj)
4522 elif test.__name__.startswith("Py"):
4523 for name, obj in py_io_ns.items():
4524 setattr(test, name, obj)
4525
Ezio Melottidaa42c72013-03-23 16:30:16 +02004526 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4527 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004528
4529if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004530 unittest.main()