blob: 1fe1cba5167fc6e630c3ee7dd4c0a97b19eabfb1 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Victor Stinner22eb6892019-06-26 00:51:05 +020032import textwrap
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020033import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000034import time
Guido van Rossum28524c72007-02-27 05:47:44 +000035import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000036import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020037import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020038from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020039from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000040from test import support
Victor Stinner22eb6892019-06-26 00:51:05 +020041from test.support.script_helper import (
42 assert_python_ok, assert_python_failure, run_python_until_end)
Serhiy Storchakab21d1552018-03-02 11:53:51 +020043from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000044
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000045import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000046import io # C implementation of io
47import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000048
Martin Panter6bb91f32016-05-28 00:41:57 +000049try:
50 import ctypes
51except ImportError:
52 def byteslike(*pos, **kw):
53 return array.array("b", bytes(*pos, **kw))
54else:
55 def byteslike(*pos, **kw):
56 """Create a bytes-like object having no string or sequence methods"""
57 data = bytes(*pos, **kw)
58 obj = EmptyStruct()
59 ctypes.resize(obj, len(data))
60 memoryview(obj).cast("B")[:] = data
61 return obj
62 class EmptyStruct(ctypes.Structure):
63 pass
64
Gregory P. Smithe5796c42018-12-30 20:17:57 -080065_cflags = sysconfig.get_config_var('CFLAGS') or ''
66_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
67MEMORY_SANITIZER = (
68 '-fsanitize=memory' in _cflags or
69 '--with-memory-sanitizer' in _config_args
70)
71
Victor Stinnerbc2aa812019-05-23 03:45:09 +020072# Does io.IOBase finalizer log the exception if the close() method fails?
73# The exception is ignored silently by default in release build.
74IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020075
76
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000077def _default_chunk_size():
78 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000079 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000080 return f._CHUNK_SIZE
81
82
Antoine Pitrou328ec742010-09-14 18:37:24 +000083class MockRawIOWithoutRead:
84 """A RawIO implementation without read(), so as to exercise the default
85 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000086
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000087 def __init__(self, read_stack=()):
88 self._read_stack = list(read_stack)
89 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000090 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000091 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000092
Guido van Rossum01a27522007-03-07 01:00:12 +000093 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000094 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000095 return len(b)
96
97 def writable(self):
98 return True
99
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000100 def fileno(self):
101 return 42
102
103 def readable(self):
104 return True
105
Guido van Rossum01a27522007-03-07 01:00:12 +0000106 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000107 return True
108
Guido van Rossum01a27522007-03-07 01:00:12 +0000109 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000110 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000111
112 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000113 return 0 # same comment as above
114
115 def readinto(self, buf):
116 self._reads += 1
117 max_len = len(buf)
118 try:
119 data = self._read_stack[0]
120 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000121 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000122 return 0
123 if data is None:
124 del self._read_stack[0]
125 return None
126 n = len(data)
127 if len(data) <= max_len:
128 del self._read_stack[0]
129 buf[:n] = data
130 return n
131 else:
132 buf[:] = data[:max_len]
133 self._read_stack[0] = data[max_len:]
134 return max_len
135
136 def truncate(self, pos=None):
137 return pos
138
Antoine Pitrou328ec742010-09-14 18:37:24 +0000139class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
140 pass
141
142class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
143 pass
144
145
146class MockRawIO(MockRawIOWithoutRead):
147
148 def read(self, n=None):
149 self._reads += 1
150 try:
151 return self._read_stack.pop(0)
152 except:
153 self._extraneous_reads += 1
154 return b""
155
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000156class CMockRawIO(MockRawIO, io.RawIOBase):
157 pass
158
159class PyMockRawIO(MockRawIO, pyio.RawIOBase):
160 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000161
Guido van Rossuma9e20242007-03-08 00:43:48 +0000162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000163class MisbehavedRawIO(MockRawIO):
164 def write(self, b):
165 return super().write(b) * 2
166
167 def read(self, n=None):
168 return super().read(n) * 2
169
170 def seek(self, pos, whence):
171 return -123
172
173 def tell(self):
174 return -456
175
176 def readinto(self, buf):
177 super().readinto(buf)
178 return len(buf) * 5
179
180class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
181 pass
182
183class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
184 pass
185
186
benfogle9703f092017-11-10 16:03:40 -0500187class SlowFlushRawIO(MockRawIO):
188 def __init__(self):
189 super().__init__()
190 self.in_flush = threading.Event()
191
192 def flush(self):
193 self.in_flush.set()
194 time.sleep(0.25)
195
196class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
197 pass
198
199class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
200 pass
201
202
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000203class CloseFailureIO(MockRawIO):
204 closed = 0
205
206 def close(self):
207 if not self.closed:
208 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200209 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000210
211class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
212 pass
213
214class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
215 pass
216
217
218class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000219
220 def __init__(self, data):
221 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000222 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000223
224 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000225 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000226 self.read_history.append(None if res is None else len(res))
227 return res
228
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000229 def readinto(self, b):
230 res = super().readinto(b)
231 self.read_history.append(res)
232 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000233
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000234class CMockFileIO(MockFileIO, io.BytesIO):
235 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000236
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000237class PyMockFileIO(MockFileIO, pyio.BytesIO):
238 pass
239
240
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000241class MockUnseekableIO:
242 def seekable(self):
243 return False
244
245 def seek(self, *args):
246 raise self.UnsupportedOperation("not seekable")
247
248 def tell(self, *args):
249 raise self.UnsupportedOperation("not seekable")
250
Martin Panter754aab22016-03-31 07:21:56 +0000251 def truncate(self, *args):
252 raise self.UnsupportedOperation("not seekable")
253
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000254class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
255 UnsupportedOperation = io.UnsupportedOperation
256
257class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
258 UnsupportedOperation = pyio.UnsupportedOperation
259
260
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000261class MockNonBlockWriterIO:
262
263 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000264 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000266
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000267 def pop_written(self):
268 s = b"".join(self._write_stack)
269 self._write_stack[:] = []
270 return s
271
272 def block_on(self, char):
273 """Block when a given char is encountered."""
274 self._blocker_char = char
275
276 def readable(self):
277 return True
278
279 def seekable(self):
280 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000281
Victor Stinnerb589cef2019-06-11 03:10:59 +0200282 def seek(self, pos, whence=0):
283 # naive implementation, enough for tests
284 return 0
285
Guido van Rossum01a27522007-03-07 01:00:12 +0000286 def writable(self):
287 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000289 def write(self, b):
290 b = bytes(b)
291 n = -1
292 if self._blocker_char:
293 try:
294 n = b.index(self._blocker_char)
295 except ValueError:
296 pass
297 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100298 if n > 0:
299 # write data up to the first blocker
300 self._write_stack.append(b[:n])
301 return n
302 else:
303 # cancel blocker and indicate would block
304 self._blocker_char = None
305 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000306 self._write_stack.append(b)
307 return len(b)
308
309class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
310 BlockingIOError = io.BlockingIOError
311
312class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
313 BlockingIOError = pyio.BlockingIOError
314
Guido van Rossuma9e20242007-03-08 00:43:48 +0000315
Guido van Rossum28524c72007-02-27 05:47:44 +0000316class IOTest(unittest.TestCase):
317
Neal Norwitze7789b12008-03-24 06:18:09 +0000318 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000319 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000320
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000321 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000322 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000323
Guido van Rossum28524c72007-02-27 05:47:44 +0000324 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000325 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000326 f.truncate(0)
327 self.assertEqual(f.tell(), 5)
328 f.seek(0)
329
330 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000331 self.assertEqual(f.seek(0), 0)
332 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000334 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000335 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000336 buffer = bytearray(b" world\n\n\n")
337 self.assertEqual(f.write(buffer), 9)
338 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000340 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000341 self.assertEqual(f.seek(-1, 2), 13)
342 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000343
Guido van Rossum87429772007-04-10 21:06:59 +0000344 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000345 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000346 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000347
Guido van Rossum9b76da62007-04-11 01:09:03 +0000348 def read_ops(self, f, buffered=False):
349 data = f.read(5)
350 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000351 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000352 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000353 self.assertEqual(bytes(data), b" worl")
354 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000355 self.assertEqual(f.readinto(data), 2)
356 self.assertEqual(len(data), 5)
357 self.assertEqual(data[:2], b"d\n")
358 self.assertEqual(f.seek(0), 0)
359 self.assertEqual(f.read(20), b"hello world\n")
360 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000361 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000362 self.assertEqual(f.seek(-6, 2), 6)
363 self.assertEqual(f.read(5), b"world")
364 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000365 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000366 self.assertEqual(f.seek(-6, 1), 5)
367 self.assertEqual(f.read(5), b" worl")
368 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000369 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000370 if buffered:
371 f.seek(0)
372 self.assertEqual(f.read(), b"hello world\n")
373 f.seek(6)
374 self.assertEqual(f.read(), b"world\n")
375 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000376 f.seek(0)
377 data = byteslike(5)
378 self.assertEqual(f.readinto1(data), 5)
379 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000380
Guido van Rossum34d69e52007-04-10 20:08:41 +0000381 LARGE = 2**31
382
Guido van Rossum53807da2007-04-10 19:01:47 +0000383 def large_file_ops(self, f):
384 assert f.readable()
385 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100386 try:
387 self.assertEqual(f.seek(self.LARGE), self.LARGE)
388 except (OverflowError, ValueError):
389 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000390 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000391 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000392 self.assertEqual(f.tell(), self.LARGE + 3)
393 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000394 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000395 self.assertEqual(f.tell(), self.LARGE + 2)
396 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000397 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000398 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000399 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
400 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000401 self.assertEqual(f.read(2), b"x")
402
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000403 def test_invalid_operations(self):
404 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000405 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000406 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000407 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000408 self.assertRaises(exc, fp.read)
409 self.assertRaises(exc, fp.readline)
410 with self.open(support.TESTFN, "wb", buffering=0) as fp:
411 self.assertRaises(exc, fp.read)
412 self.assertRaises(exc, fp.readline)
413 with self.open(support.TESTFN, "rb", buffering=0) as fp:
414 self.assertRaises(exc, fp.write, b"blah")
415 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000416 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000417 self.assertRaises(exc, fp.write, b"blah")
418 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000419 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000420 self.assertRaises(exc, fp.write, "blah")
421 self.assertRaises(exc, fp.writelines, ["blah\n"])
422 # Non-zero seeking from current or end pos
423 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
424 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000425
Martin Panter754aab22016-03-31 07:21:56 +0000426 def test_optional_abilities(self):
427 # Test for OSError when optional APIs are not supported
428 # The purpose of this test is to try fileno(), reading, writing and
429 # seeking operations with various objects that indicate they do not
430 # support these operations.
431
432 def pipe_reader():
433 [r, w] = os.pipe()
434 os.close(w) # So that read() is harmless
435 return self.FileIO(r, "r")
436
437 def pipe_writer():
438 [r, w] = os.pipe()
439 self.addCleanup(os.close, r)
440 # Guarantee that we can write into the pipe without blocking
441 thread = threading.Thread(target=os.read, args=(r, 100))
442 thread.start()
443 self.addCleanup(thread.join)
444 return self.FileIO(w, "w")
445
446 def buffered_reader():
447 return self.BufferedReader(self.MockUnseekableIO())
448
449 def buffered_writer():
450 return self.BufferedWriter(self.MockUnseekableIO())
451
452 def buffered_random():
453 return self.BufferedRandom(self.BytesIO())
454
455 def buffered_rw_pair():
456 return self.BufferedRWPair(self.MockUnseekableIO(),
457 self.MockUnseekableIO())
458
459 def text_reader():
460 class UnseekableReader(self.MockUnseekableIO):
461 writable = self.BufferedIOBase.writable
462 write = self.BufferedIOBase.write
463 return self.TextIOWrapper(UnseekableReader(), "ascii")
464
465 def text_writer():
466 class UnseekableWriter(self.MockUnseekableIO):
467 readable = self.BufferedIOBase.readable
468 read = self.BufferedIOBase.read
469 return self.TextIOWrapper(UnseekableWriter(), "ascii")
470
471 tests = (
472 (pipe_reader, "fr"), (pipe_writer, "fw"),
473 (buffered_reader, "r"), (buffered_writer, "w"),
474 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
475 (text_reader, "r"), (text_writer, "w"),
476 (self.BytesIO, "rws"), (self.StringIO, "rws"),
477 )
478 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000479 with self.subTest(test), test() as obj:
480 readable = "r" in abilities
481 self.assertEqual(obj.readable(), readable)
482 writable = "w" in abilities
483 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000484
485 if isinstance(obj, self.TextIOBase):
486 data = "3"
487 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
488 data = b"3"
489 else:
490 self.fail("Unknown base class")
491
492 if "f" in abilities:
493 obj.fileno()
494 else:
495 self.assertRaises(OSError, obj.fileno)
496
497 if readable:
498 obj.read(1)
499 obj.read()
500 else:
501 self.assertRaises(OSError, obj.read, 1)
502 self.assertRaises(OSError, obj.read)
503
504 if writable:
505 obj.write(data)
506 else:
507 self.assertRaises(OSError, obj.write, data)
508
Martin Panter3ee147f2016-03-31 21:05:31 +0000509 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000510 pipe_reader, pipe_writer):
511 # Pipes seem to appear as seekable on Windows
512 continue
513 seekable = "s" in abilities
514 self.assertEqual(obj.seekable(), seekable)
515
Martin Panter754aab22016-03-31 07:21:56 +0000516 if seekable:
517 obj.tell()
518 obj.seek(0)
519 else:
520 self.assertRaises(OSError, obj.tell)
521 self.assertRaises(OSError, obj.seek, 0)
522
523 if writable and seekable:
524 obj.truncate()
525 obj.truncate(0)
526 else:
527 self.assertRaises(OSError, obj.truncate)
528 self.assertRaises(OSError, obj.truncate, 0)
529
Antoine Pitrou13348842012-01-29 18:36:34 +0100530 def test_open_handles_NUL_chars(self):
531 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300532 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100533
534 bytes_fn = bytes(fn_with_NUL, 'ascii')
535 with warnings.catch_warnings():
536 warnings.simplefilter("ignore", DeprecationWarning)
537 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100538
Guido van Rossum28524c72007-02-27 05:47:44 +0000539 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000540 with self.open(support.TESTFN, "wb", buffering=0) as f:
541 self.assertEqual(f.readable(), False)
542 self.assertEqual(f.writable(), True)
543 self.assertEqual(f.seekable(), True)
544 self.write_ops(f)
545 with self.open(support.TESTFN, "rb", buffering=0) as f:
546 self.assertEqual(f.readable(), True)
547 self.assertEqual(f.writable(), False)
548 self.assertEqual(f.seekable(), True)
549 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000550
Guido van Rossum87429772007-04-10 21:06:59 +0000551 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000552 with self.open(support.TESTFN, "wb") as f:
553 self.assertEqual(f.readable(), False)
554 self.assertEqual(f.writable(), True)
555 self.assertEqual(f.seekable(), True)
556 self.write_ops(f)
557 with self.open(support.TESTFN, "rb") as f:
558 self.assertEqual(f.readable(), True)
559 self.assertEqual(f.writable(), False)
560 self.assertEqual(f.seekable(), True)
561 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000562
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000563 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000564 with self.open(support.TESTFN, "wb") as f:
565 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
566 with self.open(support.TESTFN, "rb") as f:
567 self.assertEqual(f.readline(), b"abc\n")
568 self.assertEqual(f.readline(10), b"def\n")
569 self.assertEqual(f.readline(2), b"xy")
570 self.assertEqual(f.readline(4), b"zzy\n")
571 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000572 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000573 self.assertRaises(TypeError, f.readline, 5.3)
574 with self.open(support.TESTFN, "r") as f:
575 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000576
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300577 def test_readline_nonsizeable(self):
578 # Issue #30061
579 # Crash when readline() returns an object without __len__
580 class R(self.IOBase):
581 def readline(self):
582 return None
583 self.assertRaises((TypeError, StopIteration), next, R())
584
585 def test_next_nonsizeable(self):
586 # Issue #30061
587 # Crash when __next__() returns an object without __len__
588 class R(self.IOBase):
589 def __next__(self):
590 return None
591 self.assertRaises(TypeError, R().readlines, 1)
592
Guido van Rossum28524c72007-02-27 05:47:44 +0000593 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000594 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000595 self.write_ops(f)
596 data = f.getvalue()
597 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000598 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000599 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000600
Guido van Rossum53807da2007-04-10 19:01:47 +0000601 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300602 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800603 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000604 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200605 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600606 support.requires(
607 'largefile',
608 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000609 with self.open(support.TESTFN, "w+b", 0) as f:
610 self.large_file_ops(f)
611 with self.open(support.TESTFN, "w+b") as f:
612 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000613
614 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300615 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000616 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000617 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000618 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000619 self.assertEqual(f.closed, True)
620 f = None
621 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000622 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000623 1/0
624 except ZeroDivisionError:
625 self.assertEqual(f.closed, True)
626 else:
627 self.fail("1/0 didn't raise an exception")
628
Antoine Pitrou08838b62009-01-21 00:55:13 +0000629 # issue 5008
630 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000632 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000634 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000636 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000637 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300638 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000639
Guido van Rossum87429772007-04-10 21:06:59 +0000640 def test_destructor(self):
641 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000642 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000643 def __del__(self):
644 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000645 try:
646 f = super().__del__
647 except AttributeError:
648 pass
649 else:
650 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000651 def close(self):
652 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000653 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000654 def flush(self):
655 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000656 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000657 with support.check_warnings(('', ResourceWarning)):
658 f = MyFileIO(support.TESTFN, "wb")
659 f.write(b"xxx")
660 del f
661 support.gc_collect()
662 self.assertEqual(record, [1, 2, 3])
663 with self.open(support.TESTFN, "rb") as f:
664 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000665
666 def _check_base_destructor(self, base):
667 record = []
668 class MyIO(base):
669 def __init__(self):
670 # This exercises the availability of attributes on object
671 # destruction.
672 # (in the C version, close() is called by the tp_dealloc
673 # function, not by __del__)
674 self.on_del = 1
675 self.on_close = 2
676 self.on_flush = 3
677 def __del__(self):
678 record.append(self.on_del)
679 try:
680 f = super().__del__
681 except AttributeError:
682 pass
683 else:
684 f()
685 def close(self):
686 record.append(self.on_close)
687 super().close()
688 def flush(self):
689 record.append(self.on_flush)
690 super().flush()
691 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000692 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000693 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000694 self.assertEqual(record, [1, 2, 3])
695
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000696 def test_IOBase_destructor(self):
697 self._check_base_destructor(self.IOBase)
698
699 def test_RawIOBase_destructor(self):
700 self._check_base_destructor(self.RawIOBase)
701
702 def test_BufferedIOBase_destructor(self):
703 self._check_base_destructor(self.BufferedIOBase)
704
705 def test_TextIOBase_destructor(self):
706 self._check_base_destructor(self.TextIOBase)
707
Guido van Rossum87429772007-04-10 21:06:59 +0000708 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000709 with self.open(support.TESTFN, "wb") as f:
710 f.write(b"xxx")
711 with self.open(support.TESTFN, "rb") as f:
712 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000713
Guido van Rossumd4103952007-04-12 05:44:49 +0000714 def test_array_writes(self):
715 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000716 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000717 def check(f):
718 with f:
719 self.assertEqual(f.write(a), n)
720 f.writelines((a,))
721 check(self.BytesIO())
722 check(self.FileIO(support.TESTFN, "w"))
723 check(self.BufferedWriter(self.MockRawIO()))
724 check(self.BufferedRandom(self.MockRawIO()))
725 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000726
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000727 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000728 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000729 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000731 def test_read_closed(self):
732 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000733 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000734 with self.open(support.TESTFN, "r") as f:
735 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000736 self.assertEqual(file.read(), "egg\n")
737 file.seek(0)
738 file.close()
739 self.assertRaises(ValueError, file.read)
740
741 def test_no_closefd_with_filename(self):
742 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000743 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000744
745 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000747 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000749 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000750 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000751 self.assertEqual(file.buffer.raw.closefd, False)
752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753 def test_garbage_collection(self):
754 # FileIO objects are collected, and collecting them flushes
755 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000756 with support.check_warnings(('', ResourceWarning)):
757 f = self.FileIO(support.TESTFN, "wb")
758 f.write(b"abcxxx")
759 f.f = f
760 wr = weakref.ref(f)
761 del f
762 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300763 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000764 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000765 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000766
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000767 def test_unbounded_file(self):
768 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
769 zero = "/dev/zero"
770 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000771 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000772 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000773 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000774 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800775 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000776 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000777 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000778 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000779 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000780 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000781 self.assertRaises(OverflowError, f.read)
782
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200783 def check_flush_error_on_close(self, *args, **kwargs):
784 # Test that the file is closed despite failed flush
785 # and that flush() is called before file closed.
786 f = self.open(*args, **kwargs)
787 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000788 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200789 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200790 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000791 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200792 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600793 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200794 self.assertTrue(closed) # flush() called
795 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200796 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200797
798 def test_flush_error_on_close(self):
799 # raw file
800 # Issue #5700: io.FileIO calls flush() after file closed
801 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
802 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
803 self.check_flush_error_on_close(fd, 'wb', buffering=0)
804 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
805 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
806 os.close(fd)
807 # buffered io
808 self.check_flush_error_on_close(support.TESTFN, 'wb')
809 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
810 self.check_flush_error_on_close(fd, 'wb')
811 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
812 self.check_flush_error_on_close(fd, 'wb', closefd=False)
813 os.close(fd)
814 # text io
815 self.check_flush_error_on_close(support.TESTFN, 'w')
816 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
817 self.check_flush_error_on_close(fd, 'w')
818 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
819 self.check_flush_error_on_close(fd, 'w', closefd=False)
820 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000821
822 def test_multi_close(self):
823 f = self.open(support.TESTFN, "wb", buffering=0)
824 f.close()
825 f.close()
826 f.close()
827 self.assertRaises(ValueError, f.flush)
828
Antoine Pitrou328ec742010-09-14 18:37:24 +0000829 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530830 # Exercise the default limited RawIOBase.read(n) implementation (which
831 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000832 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
833 self.assertEqual(rawio.read(2), b"ab")
834 self.assertEqual(rawio.read(2), b"c")
835 self.assertEqual(rawio.read(2), b"d")
836 self.assertEqual(rawio.read(2), None)
837 self.assertEqual(rawio.read(2), b"ef")
838 self.assertEqual(rawio.read(2), b"g")
839 self.assertEqual(rawio.read(2), None)
840 self.assertEqual(rawio.read(2), b"")
841
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400842 def test_types_have_dict(self):
843 test = (
844 self.IOBase(),
845 self.RawIOBase(),
846 self.TextIOBase(),
847 self.StringIO(),
848 self.BytesIO()
849 )
850 for obj in test:
851 self.assertTrue(hasattr(obj, "__dict__"))
852
Ross Lagerwall59142db2011-10-31 20:34:46 +0200853 def test_opener(self):
854 with self.open(support.TESTFN, "w") as f:
855 f.write("egg\n")
856 fd = os.open(support.TESTFN, os.O_RDONLY)
857 def opener(path, flags):
858 return fd
859 with self.open("non-existent", "r", opener=opener) as f:
860 self.assertEqual(f.read(), "egg\n")
861
Barry Warsaw480e2852016-06-08 17:47:26 -0400862 def test_bad_opener_negative_1(self):
863 # Issue #27066.
864 def badopener(fname, flags):
865 return -1
866 with self.assertRaises(ValueError) as cm:
867 open('non-existent', 'r', opener=badopener)
868 self.assertEqual(str(cm.exception), 'opener returned -1')
869
870 def test_bad_opener_other_negative(self):
871 # Issue #27066.
872 def badopener(fname, flags):
873 return -2
874 with self.assertRaises(ValueError) as cm:
875 open('non-existent', 'r', opener=badopener)
876 self.assertEqual(str(cm.exception), 'opener returned -2')
877
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200878 def test_fileio_closefd(self):
879 # Issue #4841
880 with self.open(__file__, 'rb') as f1, \
881 self.open(__file__, 'rb') as f2:
882 fileio = self.FileIO(f1.fileno(), closefd=False)
883 # .__init__() must not close f1
884 fileio.__init__(f2.fileno(), closefd=False)
885 f1.readline()
886 # .close() must not close f2
887 fileio.close()
888 f2.readline()
889
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300890 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200891 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300892 with self.assertRaises(ValueError):
893 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300894
895 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200896 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300897 with self.assertRaises(ValueError):
898 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300899
Martin Panter6bb91f32016-05-28 00:41:57 +0000900 def test_buffered_readinto_mixin(self):
901 # Test the implementation provided by BufferedIOBase
902 class Stream(self.BufferedIOBase):
903 def read(self, size):
904 return b"12345"
905 read1 = read
906 stream = Stream()
907 for method in ("readinto", "readinto1"):
908 with self.subTest(method):
909 buffer = byteslike(5)
910 self.assertEqual(getattr(stream, method)(buffer), 5)
911 self.assertEqual(bytes(buffer), b"12345")
912
Ethan Furmand62548a2016-06-04 14:38:43 -0700913 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700914 def check_path_succeeds(path):
915 with self.open(path, "w") as f:
916 f.write("egg\n")
917
918 with self.open(path, "r") as f:
919 self.assertEqual(f.read(), "egg\n")
920
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200921 check_path_succeeds(FakePath(support.TESTFN))
922 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700923
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200924 with self.open(support.TESTFN, "w") as f:
925 bad_path = FakePath(f.fileno())
926 with self.assertRaises(TypeError):
927 self.open(bad_path, 'w')
928
929 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700930 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700931 self.open(bad_path, 'w')
932
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200933 bad_path = FakePath(FloatingPointError)
934 with self.assertRaises(FloatingPointError):
935 self.open(bad_path, 'w')
936
Ethan Furmand62548a2016-06-04 14:38:43 -0700937 # ensure that refcounting is correct with some error conditions
938 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200939 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700940
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530941 def test_RawIOBase_readall(self):
942 # Exercise the default unlimited RawIOBase.read() and readall()
943 # implementations.
944 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
945 self.assertEqual(rawio.read(), b"abcdefg")
946 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
947 self.assertEqual(rawio.readall(), b"abcdefg")
948
949 def test_BufferedIOBase_readinto(self):
950 # Exercise the default BufferedIOBase.readinto() and readinto1()
951 # implementations (which call read() or read1() internally).
952 class Reader(self.BufferedIOBase):
953 def __init__(self, avail):
954 self.avail = avail
955 def read(self, size):
956 result = self.avail[:size]
957 self.avail = self.avail[size:]
958 return result
959 def read1(self, size):
960 """Returns no more than 5 bytes at once"""
961 return self.read(min(size, 5))
962 tests = (
963 # (test method, total data available, read buffer size, expected
964 # read size)
965 ("readinto", 10, 5, 5),
966 ("readinto", 10, 6, 6), # More than read1() can return
967 ("readinto", 5, 6, 5), # Buffer larger than total available
968 ("readinto", 6, 7, 6),
969 ("readinto", 10, 0, 0), # Empty buffer
970 ("readinto1", 10, 5, 5), # Result limited to single read1() call
971 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
972 ("readinto1", 5, 6, 5), # Buffer larger than total available
973 ("readinto1", 6, 7, 5),
974 ("readinto1", 10, 0, 0), # Empty buffer
975 )
976 UNUSED_BYTE = 0x81
977 for test in tests:
978 with self.subTest(test):
979 method, avail, request, result = test
980 reader = Reader(bytes(range(avail)))
981 buffer = bytearray((UNUSED_BYTE,) * request)
982 method = getattr(reader, method)
983 self.assertEqual(method(buffer), result)
984 self.assertEqual(len(buffer), request)
985 self.assertSequenceEqual(buffer[:result], range(result))
986 unused = (UNUSED_BYTE,) * (request - result)
987 self.assertSequenceEqual(buffer[result:], unused)
988 self.assertEqual(len(reader.avail), avail - result)
989
Zackery Spytz28f07362018-07-17 00:31:44 -0600990 def test_close_assert(self):
991 class R(self.IOBase):
992 def __setattr__(self, name, value):
993 pass
994 def flush(self):
995 raise OSError()
996 f = R()
997 # This would cause an assertion failure.
998 self.assertRaises(OSError, f.close)
999
Victor Stinner472f7942019-04-12 21:58:24 +02001000 # Silence destructor error
1001 R.flush = lambda self: None
1002
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001003
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001004class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001005
1006 def test_IOBase_finalize(self):
1007 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1008 # class which inherits IOBase and an object of this class are caught
1009 # in a reference cycle and close() is already in the method cache.
1010 class MyIO(self.IOBase):
1011 def close(self):
1012 pass
1013
1014 # create an instance to populate the method cache
1015 MyIO()
1016 obj = MyIO()
1017 obj.obj = obj
1018 wr = weakref.ref(obj)
1019 del MyIO
1020 del obj
1021 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001022 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001023
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001024class PyIOTest(IOTest):
1025 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001026
Guido van Rossuma9e20242007-03-08 00:43:48 +00001027
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001028@support.cpython_only
1029class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001030
Gregory P. Smith054b0652015-04-14 12:58:05 -07001031 def test_RawIOBase_io_in_pyio_match(self):
1032 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001033 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1034 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001035 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1036
1037 def test_RawIOBase_pyio_in_io_match(self):
1038 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1039 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1040 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1041
1042
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001043class CommonBufferedTests:
1044 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1045
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001046 def test_detach(self):
1047 raw = self.MockRawIO()
1048 buf = self.tp(raw)
1049 self.assertIs(buf.detach(), raw)
1050 self.assertRaises(ValueError, buf.detach)
1051
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001052 repr(buf) # Should still work
1053
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001054 def test_fileno(self):
1055 rawio = self.MockRawIO()
1056 bufio = self.tp(rawio)
1057
Ezio Melottib3aedd42010-11-20 19:04:17 +00001058 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001059
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001060 def test_invalid_args(self):
1061 rawio = self.MockRawIO()
1062 bufio = self.tp(rawio)
1063 # Invalid whence
1064 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001065 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001066
1067 def test_override_destructor(self):
1068 tp = self.tp
1069 record = []
1070 class MyBufferedIO(tp):
1071 def __del__(self):
1072 record.append(1)
1073 try:
1074 f = super().__del__
1075 except AttributeError:
1076 pass
1077 else:
1078 f()
1079 def close(self):
1080 record.append(2)
1081 super().close()
1082 def flush(self):
1083 record.append(3)
1084 super().flush()
1085 rawio = self.MockRawIO()
1086 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001087 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001088 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001089 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090
1091 def test_context_manager(self):
1092 # Test usability as a context manager
1093 rawio = self.MockRawIO()
1094 bufio = self.tp(rawio)
1095 def _with():
1096 with bufio:
1097 pass
1098 _with()
1099 # bufio should now be closed, and using it a second time should raise
1100 # a ValueError.
1101 self.assertRaises(ValueError, _with)
1102
1103 def test_error_through_destructor(self):
1104 # Test that the exception state is not modified by a destructor,
1105 # even if close() fails.
1106 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001107 with support.catch_unraisable_exception() as cm:
1108 with self.assertRaises(AttributeError):
1109 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001110
1111 if not IOBASE_EMITS_UNRAISABLE:
1112 self.assertIsNone(cm.unraisable)
1113 elif cm.unraisable is not None:
1114 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001115
Antoine Pitrou716c4442009-05-23 19:04:03 +00001116 def test_repr(self):
1117 raw = self.MockRawIO()
1118 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001119 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1120 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001121 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001122 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001123 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001124 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001125
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001126 def test_recursive_repr(self):
1127 # Issue #25455
1128 raw = self.MockRawIO()
1129 b = self.tp(raw)
1130 with support.swap_attr(raw, 'name', b):
1131 try:
1132 repr(b) # Should not crash
1133 except RuntimeError:
1134 pass
1135
Antoine Pitrou6be88762010-05-03 16:48:20 +00001136 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001137 # Test that buffered file is closed despite failed flush
1138 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001139 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001140 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001141 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001142 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001143 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001144 raw.flush = bad_flush
1145 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001146 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001147 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001148 self.assertTrue(raw.closed)
1149 self.assertTrue(closed) # flush() called
1150 self.assertFalse(closed[0]) # flush() called before file closed
1151 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001152 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001153
1154 def test_close_error_on_close(self):
1155 raw = self.MockRawIO()
1156 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001157 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001158 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001159 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001160 raw.close = bad_close
1161 b = self.tp(raw)
1162 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001163 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001164 b.close()
1165 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001166 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001167 self.assertEqual(err.exception.__context__.args, ('flush',))
1168 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001169
Victor Stinner472f7942019-04-12 21:58:24 +02001170 # Silence destructor error
1171 raw.close = lambda: None
1172 b.flush = lambda: None
1173
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001174 def test_nonnormalized_close_error_on_close(self):
1175 # Issue #21677
1176 raw = self.MockRawIO()
1177 def bad_flush():
1178 raise non_existing_flush
1179 def bad_close():
1180 raise non_existing_close
1181 raw.close = bad_close
1182 b = self.tp(raw)
1183 b.flush = bad_flush
1184 with self.assertRaises(NameError) as err: # exception not swallowed
1185 b.close()
1186 self.assertIn('non_existing_close', str(err.exception))
1187 self.assertIsInstance(err.exception.__context__, NameError)
1188 self.assertIn('non_existing_flush', str(err.exception.__context__))
1189 self.assertFalse(b.closed)
1190
Victor Stinner472f7942019-04-12 21:58:24 +02001191 # Silence destructor error
1192 b.flush = lambda: None
1193 raw.close = lambda: None
1194
Antoine Pitrou6be88762010-05-03 16:48:20 +00001195 def test_multi_close(self):
1196 raw = self.MockRawIO()
1197 b = self.tp(raw)
1198 b.close()
1199 b.close()
1200 b.close()
1201 self.assertRaises(ValueError, b.flush)
1202
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001203 def test_unseekable(self):
1204 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1205 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1206 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1207
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001208 def test_readonly_attributes(self):
1209 raw = self.MockRawIO()
1210 buf = self.tp(raw)
1211 x = self.MockRawIO()
1212 with self.assertRaises(AttributeError):
1213 buf.raw = x
1214
Guido van Rossum78892e42007-04-06 17:31:18 +00001215
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001216class SizeofTest:
1217
1218 @support.cpython_only
1219 def test_sizeof(self):
1220 bufsize1 = 4096
1221 bufsize2 = 8192
1222 rawio = self.MockRawIO()
1223 bufio = self.tp(rawio, buffer_size=bufsize1)
1224 size = sys.getsizeof(bufio) - bufsize1
1225 rawio = self.MockRawIO()
1226 bufio = self.tp(rawio, buffer_size=bufsize2)
1227 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1228
Jesus Ceadc469452012-10-04 12:37:56 +02001229 @support.cpython_only
1230 def test_buffer_freeing(self) :
1231 bufsize = 4096
1232 rawio = self.MockRawIO()
1233 bufio = self.tp(rawio, buffer_size=bufsize)
1234 size = sys.getsizeof(bufio) - bufsize
1235 bufio.close()
1236 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001237
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001238class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1239 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241 def test_constructor(self):
1242 rawio = self.MockRawIO([b"abc"])
1243 bufio = self.tp(rawio)
1244 bufio.__init__(rawio)
1245 bufio.__init__(rawio, buffer_size=1024)
1246 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001247 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001248 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1249 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1250 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1251 rawio = self.MockRawIO([b"abc"])
1252 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001253 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001254
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001255 def test_uninitialized(self):
1256 bufio = self.tp.__new__(self.tp)
1257 del bufio
1258 bufio = self.tp.__new__(self.tp)
1259 self.assertRaisesRegex((ValueError, AttributeError),
1260 'uninitialized|has no attribute',
1261 bufio.read, 0)
1262 bufio.__init__(self.MockRawIO())
1263 self.assertEqual(bufio.read(0), b'')
1264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001265 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001266 for arg in (None, 7):
1267 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1268 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001269 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001270 # Invalid args
1271 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001272
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273 def test_read1(self):
1274 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1275 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001276 self.assertEqual(b"a", bufio.read(1))
1277 self.assertEqual(b"b", bufio.read1(1))
1278 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001279 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001280 self.assertEqual(b"c", bufio.read1(100))
1281 self.assertEqual(rawio._reads, 1)
1282 self.assertEqual(b"d", bufio.read1(100))
1283 self.assertEqual(rawio._reads, 2)
1284 self.assertEqual(b"efg", bufio.read1(100))
1285 self.assertEqual(rawio._reads, 3)
1286 self.assertEqual(b"", bufio.read1(100))
1287 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001288
1289 def test_read1_arbitrary(self):
1290 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1291 bufio = self.tp(rawio)
1292 self.assertEqual(b"a", bufio.read(1))
1293 self.assertEqual(b"bc", bufio.read1())
1294 self.assertEqual(b"d", bufio.read1())
1295 self.assertEqual(b"efg", bufio.read1(-1))
1296 self.assertEqual(rawio._reads, 3)
1297 self.assertEqual(b"", bufio.read1())
1298 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001299
1300 def test_readinto(self):
1301 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1302 bufio = self.tp(rawio)
1303 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001304 self.assertEqual(bufio.readinto(b), 2)
1305 self.assertEqual(b, b"ab")
1306 self.assertEqual(bufio.readinto(b), 2)
1307 self.assertEqual(b, b"cd")
1308 self.assertEqual(bufio.readinto(b), 2)
1309 self.assertEqual(b, b"ef")
1310 self.assertEqual(bufio.readinto(b), 1)
1311 self.assertEqual(b, b"gf")
1312 self.assertEqual(bufio.readinto(b), 0)
1313 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001314 rawio = self.MockRawIO((b"abc", None))
1315 bufio = self.tp(rawio)
1316 self.assertEqual(bufio.readinto(b), 2)
1317 self.assertEqual(b, b"ab")
1318 self.assertEqual(bufio.readinto(b), 1)
1319 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001320
Benjamin Petersona96fea02014-06-22 14:17:44 -07001321 def test_readinto1(self):
1322 buffer_size = 10
1323 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1324 bufio = self.tp(rawio, buffer_size=buffer_size)
1325 b = bytearray(2)
1326 self.assertEqual(bufio.peek(3), b'abc')
1327 self.assertEqual(rawio._reads, 1)
1328 self.assertEqual(bufio.readinto1(b), 2)
1329 self.assertEqual(b, b"ab")
1330 self.assertEqual(rawio._reads, 1)
1331 self.assertEqual(bufio.readinto1(b), 1)
1332 self.assertEqual(b[:1], b"c")
1333 self.assertEqual(rawio._reads, 1)
1334 self.assertEqual(bufio.readinto1(b), 2)
1335 self.assertEqual(b, b"de")
1336 self.assertEqual(rawio._reads, 2)
1337 b = bytearray(2*buffer_size)
1338 self.assertEqual(bufio.peek(3), b'fgh')
1339 self.assertEqual(rawio._reads, 3)
1340 self.assertEqual(bufio.readinto1(b), 6)
1341 self.assertEqual(b[:6], b"fghjkl")
1342 self.assertEqual(rawio._reads, 4)
1343
1344 def test_readinto_array(self):
1345 buffer_size = 60
1346 data = b"a" * 26
1347 rawio = self.MockRawIO((data,))
1348 bufio = self.tp(rawio, buffer_size=buffer_size)
1349
1350 # Create an array with element size > 1 byte
1351 b = array.array('i', b'x' * 32)
1352 assert len(b) != 16
1353
1354 # Read into it. We should get as many *bytes* as we can fit into b
1355 # (which is more than the number of elements)
1356 n = bufio.readinto(b)
1357 self.assertGreater(n, len(b))
1358
1359 # Check that old contents of b are preserved
1360 bm = memoryview(b).cast('B')
1361 self.assertLess(n, len(bm))
1362 self.assertEqual(bm[:n], data[:n])
1363 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1364
1365 def test_readinto1_array(self):
1366 buffer_size = 60
1367 data = b"a" * 26
1368 rawio = self.MockRawIO((data,))
1369 bufio = self.tp(rawio, buffer_size=buffer_size)
1370
1371 # Create an array with element size > 1 byte
1372 b = array.array('i', b'x' * 32)
1373 assert len(b) != 16
1374
1375 # Read into it. We should get as many *bytes* as we can fit into b
1376 # (which is more than the number of elements)
1377 n = bufio.readinto1(b)
1378 self.assertGreater(n, len(b))
1379
1380 # Check that old contents of b are preserved
1381 bm = memoryview(b).cast('B')
1382 self.assertLess(n, len(bm))
1383 self.assertEqual(bm[:n], data[:n])
1384 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1385
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001386 def test_readlines(self):
1387 def bufio():
1388 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1389 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001390 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1391 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1392 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001394 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001395 data = b"abcdefghi"
1396 dlen = len(data)
1397
1398 tests = [
1399 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1400 [ 100, [ 3, 3, 3], [ dlen ] ],
1401 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1402 ]
1403
1404 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001405 rawio = self.MockFileIO(data)
1406 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001407 pos = 0
1408 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001409 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001410 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001411 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001412 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001413
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001415 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001416 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1417 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001418 self.assertEqual(b"abcd", bufio.read(6))
1419 self.assertEqual(b"e", bufio.read(1))
1420 self.assertEqual(b"fg", bufio.read())
1421 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001422 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001423 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001424
Victor Stinnera80987f2011-05-25 22:47:16 +02001425 rawio = self.MockRawIO((b"a", None, None))
1426 self.assertEqual(b"a", rawio.readall())
1427 self.assertIsNone(rawio.readall())
1428
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001429 def test_read_past_eof(self):
1430 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1431 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001432
Ezio Melottib3aedd42010-11-20 19:04:17 +00001433 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001434
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001435 def test_read_all(self):
1436 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1437 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001438
Ezio Melottib3aedd42010-11-20 19:04:17 +00001439 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001440
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001441 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001442 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001443 try:
1444 # Write out many bytes with exactly the same number of 0's,
1445 # 1's... 255's. This will help us check that concurrent reading
1446 # doesn't duplicate or forget contents.
1447 N = 1000
1448 l = list(range(256)) * N
1449 random.shuffle(l)
1450 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001451 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001452 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001453 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001454 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001455 errors = []
1456 results = []
1457 def f():
1458 try:
1459 # Intra-buffer read then buffer-flushing read
1460 for n in cycle([1, 19]):
1461 s = bufio.read(n)
1462 if not s:
1463 break
1464 # list.append() is atomic
1465 results.append(s)
1466 except Exception as e:
1467 errors.append(e)
1468 raise
1469 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001470 with support.start_threads(threads):
1471 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001472 self.assertFalse(errors,
1473 "the following exceptions were caught: %r" % errors)
1474 s = b''.join(results)
1475 for i in range(256):
1476 c = bytes(bytearray([i]))
1477 self.assertEqual(s.count(c), N)
1478 finally:
1479 support.unlink(support.TESTFN)
1480
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001481 def test_unseekable(self):
1482 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1483 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1484 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1485 bufio.read(1)
1486 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1487 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1488
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001489 def test_misbehaved_io(self):
1490 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1491 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001492 self.assertRaises(OSError, bufio.seek, 0)
1493 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001494
Victor Stinnerb589cef2019-06-11 03:10:59 +02001495 # Silence destructor error
1496 bufio.close = lambda: None
1497
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001498 def test_no_extraneous_read(self):
1499 # Issue #9550; when the raw IO object has satisfied the read request,
1500 # we should not issue any additional reads, otherwise it may block
1501 # (e.g. socket).
1502 bufsize = 16
1503 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1504 rawio = self.MockRawIO([b"x" * n])
1505 bufio = self.tp(rawio, bufsize)
1506 self.assertEqual(bufio.read(n), b"x" * n)
1507 # Simple case: one raw read is enough to satisfy the request.
1508 self.assertEqual(rawio._extraneous_reads, 0,
1509 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1510 # A more complex case where two raw reads are needed to satisfy
1511 # the request.
1512 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1513 bufio = self.tp(rawio, bufsize)
1514 self.assertEqual(bufio.read(n), b"x" * n)
1515 self.assertEqual(rawio._extraneous_reads, 0,
1516 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1517
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001518 def test_read_on_closed(self):
1519 # Issue #23796
1520 b = io.BufferedReader(io.BytesIO(b"12"))
1521 b.read(1)
1522 b.close()
1523 self.assertRaises(ValueError, b.peek)
1524 self.assertRaises(ValueError, b.read1, 1)
1525
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001526
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001527class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001528 tp = io.BufferedReader
1529
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001530 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1531 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001532 def test_constructor(self):
1533 BufferedReaderTest.test_constructor(self)
1534 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001535 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001536 if sys.maxsize > 0x7FFFFFFF:
1537 rawio = self.MockRawIO()
1538 bufio = self.tp(rawio)
1539 self.assertRaises((OverflowError, MemoryError, ValueError),
1540 bufio.__init__, rawio, sys.maxsize)
1541
1542 def test_initialization(self):
1543 rawio = self.MockRawIO([b"abc"])
1544 bufio = self.tp(rawio)
1545 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1546 self.assertRaises(ValueError, bufio.read)
1547 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1548 self.assertRaises(ValueError, bufio.read)
1549 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1550 self.assertRaises(ValueError, bufio.read)
1551
1552 def test_misbehaved_io_read(self):
1553 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1554 bufio = self.tp(rawio)
1555 # _pyio.BufferedReader seems to implement reading different, so that
1556 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001557 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001558
1559 def test_garbage_collection(self):
1560 # C BufferedReader objects are collected.
1561 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001562 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001563 with support.check_warnings(('', ResourceWarning)):
1564 rawio = self.FileIO(support.TESTFN, "w+b")
1565 f = self.tp(rawio)
1566 f.f = f
1567 wr = weakref.ref(f)
1568 del f
1569 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001570 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001571
R David Murray67bfe802013-02-23 21:51:05 -05001572 def test_args_error(self):
1573 # Issue #17275
1574 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1575 self.tp(io.BytesIO(), 1024, 1024, 1024)
1576
1577
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001578class PyBufferedReaderTest(BufferedReaderTest):
1579 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001580
Guido van Rossuma9e20242007-03-08 00:43:48 +00001581
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001582class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1583 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001584
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585 def test_constructor(self):
1586 rawio = self.MockRawIO()
1587 bufio = self.tp(rawio)
1588 bufio.__init__(rawio)
1589 bufio.__init__(rawio, buffer_size=1024)
1590 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001591 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001592 bufio.flush()
1593 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1594 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1595 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1596 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001599 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001600
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001601 def test_uninitialized(self):
1602 bufio = self.tp.__new__(self.tp)
1603 del bufio
1604 bufio = self.tp.__new__(self.tp)
1605 self.assertRaisesRegex((ValueError, AttributeError),
1606 'uninitialized|has no attribute',
1607 bufio.write, b'')
1608 bufio.__init__(self.MockRawIO())
1609 self.assertEqual(bufio.write(b''), 0)
1610
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001611 def test_detach_flush(self):
1612 raw = self.MockRawIO()
1613 buf = self.tp(raw)
1614 buf.write(b"howdy!")
1615 self.assertFalse(raw._write_stack)
1616 buf.detach()
1617 self.assertEqual(raw._write_stack, [b"howdy!"])
1618
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001620 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001621 writer = self.MockRawIO()
1622 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001623 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001624 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001625 buffer = bytearray(b"def")
1626 bufio.write(buffer)
1627 buffer[:] = b"***" # Overwrite our copy of the data
1628 bufio.flush()
1629 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001630
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001631 def test_write_overflow(self):
1632 writer = self.MockRawIO()
1633 bufio = self.tp(writer, 8)
1634 contents = b"abcdefghijklmnop"
1635 for n in range(0, len(contents), 3):
1636 bufio.write(contents[n:n+3])
1637 flushed = b"".join(writer._write_stack)
1638 # At least (total - 8) bytes were implicitly flushed, perhaps more
1639 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001640 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001641
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001642 def check_writes(self, intermediate_func):
1643 # Lots of writes, test the flushed output is as expected.
1644 contents = bytes(range(256)) * 1000
1645 n = 0
1646 writer = self.MockRawIO()
1647 bufio = self.tp(writer, 13)
1648 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1649 def gen_sizes():
1650 for size in count(1):
1651 for i in range(15):
1652 yield size
1653 sizes = gen_sizes()
1654 while n < len(contents):
1655 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001656 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001657 intermediate_func(bufio)
1658 n += size
1659 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001660 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001661
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001662 def test_writes(self):
1663 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 def test_writes_and_flushes(self):
1666 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001667
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001668 def test_writes_and_seeks(self):
1669 def _seekabs(bufio):
1670 pos = bufio.tell()
1671 bufio.seek(pos + 1, 0)
1672 bufio.seek(pos - 1, 0)
1673 bufio.seek(pos, 0)
1674 self.check_writes(_seekabs)
1675 def _seekrel(bufio):
1676 pos = bufio.seek(0, 1)
1677 bufio.seek(+1, 1)
1678 bufio.seek(-1, 1)
1679 bufio.seek(pos, 0)
1680 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001681
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001682 def test_writes_and_truncates(self):
1683 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 def test_write_non_blocking(self):
1686 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001687 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001688
Ezio Melottib3aedd42010-11-20 19:04:17 +00001689 self.assertEqual(bufio.write(b"abcd"), 4)
1690 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001691 # 1 byte will be written, the rest will be buffered
1692 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001693 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001695 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1696 raw.block_on(b"0")
1697 try:
1698 bufio.write(b"opqrwxyz0123456789")
1699 except self.BlockingIOError as e:
1700 written = e.characters_written
1701 else:
1702 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001703 self.assertEqual(written, 16)
1704 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001705 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001706
Ezio Melottib3aedd42010-11-20 19:04:17 +00001707 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 s = raw.pop_written()
1709 # Previously buffered bytes were flushed
1710 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001711
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001712 def test_write_and_rewind(self):
1713 raw = io.BytesIO()
1714 bufio = self.tp(raw, 4)
1715 self.assertEqual(bufio.write(b"abcdef"), 6)
1716 self.assertEqual(bufio.tell(), 6)
1717 bufio.seek(0, 0)
1718 self.assertEqual(bufio.write(b"XY"), 2)
1719 bufio.seek(6, 0)
1720 self.assertEqual(raw.getvalue(), b"XYcdef")
1721 self.assertEqual(bufio.write(b"123456"), 6)
1722 bufio.flush()
1723 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001724
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001725 def test_flush(self):
1726 writer = self.MockRawIO()
1727 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001728 bufio.write(b"abc")
1729 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001730 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001731
Antoine Pitrou131a4892012-10-16 22:57:11 +02001732 def test_writelines(self):
1733 l = [b'ab', b'cd', b'ef']
1734 writer = self.MockRawIO()
1735 bufio = self.tp(writer, 8)
1736 bufio.writelines(l)
1737 bufio.flush()
1738 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1739
1740 def test_writelines_userlist(self):
1741 l = UserList([b'ab', b'cd', b'ef'])
1742 writer = self.MockRawIO()
1743 bufio = self.tp(writer, 8)
1744 bufio.writelines(l)
1745 bufio.flush()
1746 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1747
1748 def test_writelines_error(self):
1749 writer = self.MockRawIO()
1750 bufio = self.tp(writer, 8)
1751 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1752 self.assertRaises(TypeError, bufio.writelines, None)
1753 self.assertRaises(TypeError, bufio.writelines, 'abc')
1754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001755 def test_destructor(self):
1756 writer = self.MockRawIO()
1757 bufio = self.tp(writer, 8)
1758 bufio.write(b"abc")
1759 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001760 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001761 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001762
1763 def test_truncate(self):
1764 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001765 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001766 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001767 bufio = self.tp(raw, 8)
1768 bufio.write(b"abcdef")
1769 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001770 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001771 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001772 self.assertEqual(f.read(), b"abc")
1773
Nitish Chandra059f58c2018-01-28 21:30:09 +05301774 def test_truncate_after_write(self):
1775 # Ensure that truncate preserves the file position after
1776 # writes longer than the buffer size.
1777 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001778 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301779 with self.open(support.TESTFN, "wb") as f:
1780 # Fill with some buffer
1781 f.write(b'\x00' * 10000)
1782 buffer_sizes = [8192, 4096, 200]
1783 for buffer_size in buffer_sizes:
1784 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1785 f.write(b'\x00' * (buffer_size + 1))
1786 # After write write_pos and write_end are set to 0
1787 f.read(1)
1788 # read operation makes sure that pos != raw_pos
1789 f.truncate()
1790 self.assertEqual(f.tell(), buffer_size + 2)
1791
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001792 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001794 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001795 # Write out many bytes from many threads and test they were
1796 # all flushed.
1797 N = 1000
1798 contents = bytes(range(256)) * N
1799 sizes = cycle([1, 19])
1800 n = 0
1801 queue = deque()
1802 while n < len(contents):
1803 size = next(sizes)
1804 queue.append(contents[n:n+size])
1805 n += size
1806 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001807 # We use a real file object because it allows us to
1808 # exercise situations where the GIL is released before
1809 # writing the buffer to the raw streams. This is in addition
1810 # to concurrency issues due to switching threads in the middle
1811 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001812 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001813 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001814 errors = []
1815 def f():
1816 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001817 while True:
1818 try:
1819 s = queue.popleft()
1820 except IndexError:
1821 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001822 bufio.write(s)
1823 except Exception as e:
1824 errors.append(e)
1825 raise
1826 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001827 with support.start_threads(threads):
1828 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001829 self.assertFalse(errors,
1830 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001832 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001833 s = f.read()
1834 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001835 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001836 finally:
1837 support.unlink(support.TESTFN)
1838
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001839 def test_misbehaved_io(self):
1840 rawio = self.MisbehavedRawIO()
1841 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001842 self.assertRaises(OSError, bufio.seek, 0)
1843 self.assertRaises(OSError, bufio.tell)
1844 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001845
Victor Stinnerb589cef2019-06-11 03:10:59 +02001846 # Silence destructor error
1847 bufio.close = lambda: None
1848
Florent Xicluna109d5732012-07-07 17:03:22 +02001849 def test_max_buffer_size_removal(self):
1850 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001851 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001852
Benjamin Peterson68623612012-12-20 11:53:11 -06001853 def test_write_error_on_close(self):
1854 raw = self.MockRawIO()
1855 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001856 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001857 raw.write = bad_write
1858 b = self.tp(raw)
1859 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001860 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001861 self.assertTrue(b.closed)
1862
benfogle9703f092017-11-10 16:03:40 -05001863 def test_slow_close_from_thread(self):
1864 # Issue #31976
1865 rawio = self.SlowFlushRawIO()
1866 bufio = self.tp(rawio, 8)
1867 t = threading.Thread(target=bufio.close)
1868 t.start()
1869 rawio.in_flush.wait()
1870 self.assertRaises(ValueError, bufio.write, b'spam')
1871 self.assertTrue(bufio.closed)
1872 t.join()
1873
1874
Benjamin Peterson59406a92009-03-26 17:10:29 +00001875
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001876class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001877 tp = io.BufferedWriter
1878
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001879 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1880 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001881 def test_constructor(self):
1882 BufferedWriterTest.test_constructor(self)
1883 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001884 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001885 if sys.maxsize > 0x7FFFFFFF:
1886 rawio = self.MockRawIO()
1887 bufio = self.tp(rawio)
1888 self.assertRaises((OverflowError, MemoryError, ValueError),
1889 bufio.__init__, rawio, sys.maxsize)
1890
1891 def test_initialization(self):
1892 rawio = self.MockRawIO()
1893 bufio = self.tp(rawio)
1894 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1895 self.assertRaises(ValueError, bufio.write, b"def")
1896 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1897 self.assertRaises(ValueError, bufio.write, b"def")
1898 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1899 self.assertRaises(ValueError, bufio.write, b"def")
1900
1901 def test_garbage_collection(self):
1902 # C BufferedWriter objects are collected, and collecting them flushes
1903 # all data to disk.
1904 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001905 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001906 with support.check_warnings(('', ResourceWarning)):
1907 rawio = self.FileIO(support.TESTFN, "w+b")
1908 f = self.tp(rawio)
1909 f.write(b"123xxx")
1910 f.x = f
1911 wr = weakref.ref(f)
1912 del f
1913 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001914 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001915 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001916 self.assertEqual(f.read(), b"123xxx")
1917
R David Murray67bfe802013-02-23 21:51:05 -05001918 def test_args_error(self):
1919 # Issue #17275
1920 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1921 self.tp(io.BytesIO(), 1024, 1024, 1024)
1922
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001923
1924class PyBufferedWriterTest(BufferedWriterTest):
1925 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001926
Guido van Rossum01a27522007-03-07 01:00:12 +00001927class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001928
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001929 def test_constructor(self):
1930 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001931 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001932
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001933 def test_uninitialized(self):
1934 pair = self.tp.__new__(self.tp)
1935 del pair
1936 pair = self.tp.__new__(self.tp)
1937 self.assertRaisesRegex((ValueError, AttributeError),
1938 'uninitialized|has no attribute',
1939 pair.read, 0)
1940 self.assertRaisesRegex((ValueError, AttributeError),
1941 'uninitialized|has no attribute',
1942 pair.write, b'')
1943 pair.__init__(self.MockRawIO(), self.MockRawIO())
1944 self.assertEqual(pair.read(0), b'')
1945 self.assertEqual(pair.write(b''), 0)
1946
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001947 def test_detach(self):
1948 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1949 self.assertRaises(self.UnsupportedOperation, pair.detach)
1950
Florent Xicluna109d5732012-07-07 17:03:22 +02001951 def test_constructor_max_buffer_size_removal(self):
1952 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001953 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001954
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001955 def test_constructor_with_not_readable(self):
1956 class NotReadable(MockRawIO):
1957 def readable(self):
1958 return False
1959
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001960 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001961
1962 def test_constructor_with_not_writeable(self):
1963 class NotWriteable(MockRawIO):
1964 def writable(self):
1965 return False
1966
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001967 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001968
1969 def test_read(self):
1970 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1971
1972 self.assertEqual(pair.read(3), b"abc")
1973 self.assertEqual(pair.read(1), b"d")
1974 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001975 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1976 self.assertEqual(pair.read(None), b"abc")
1977
1978 def test_readlines(self):
1979 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1980 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1981 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1982 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001983
1984 def test_read1(self):
1985 # .read1() is delegated to the underlying reader object, so this test
1986 # can be shallow.
1987 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1988
1989 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001990 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001991
1992 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001993 for method in ("readinto", "readinto1"):
1994 with self.subTest(method):
1995 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001996
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001997 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001998 self.assertEqual(getattr(pair, method)(data), 5)
1999 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002000
2001 def test_write(self):
2002 w = self.MockRawIO()
2003 pair = self.tp(self.MockRawIO(), w)
2004
2005 pair.write(b"abc")
2006 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002007 buffer = bytearray(b"def")
2008 pair.write(buffer)
2009 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002010 pair.flush()
2011 self.assertEqual(w._write_stack, [b"abc", b"def"])
2012
2013 def test_peek(self):
2014 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2015
2016 self.assertTrue(pair.peek(3).startswith(b"abc"))
2017 self.assertEqual(pair.read(3), b"abc")
2018
2019 def test_readable(self):
2020 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2021 self.assertTrue(pair.readable())
2022
2023 def test_writeable(self):
2024 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2025 self.assertTrue(pair.writable())
2026
2027 def test_seekable(self):
2028 # BufferedRWPairs are never seekable, even if their readers and writers
2029 # are.
2030 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2031 self.assertFalse(pair.seekable())
2032
2033 # .flush() is delegated to the underlying writer object and has been
2034 # tested in the test_write method.
2035
2036 def test_close_and_closed(self):
2037 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2038 self.assertFalse(pair.closed)
2039 pair.close()
2040 self.assertTrue(pair.closed)
2041
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002042 def test_reader_close_error_on_close(self):
2043 def reader_close():
2044 reader_non_existing
2045 reader = self.MockRawIO()
2046 reader.close = reader_close
2047 writer = self.MockRawIO()
2048 pair = self.tp(reader, writer)
2049 with self.assertRaises(NameError) as err:
2050 pair.close()
2051 self.assertIn('reader_non_existing', str(err.exception))
2052 self.assertTrue(pair.closed)
2053 self.assertFalse(reader.closed)
2054 self.assertTrue(writer.closed)
2055
Victor Stinner472f7942019-04-12 21:58:24 +02002056 # Silence destructor error
2057 reader.close = lambda: None
2058
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002059 def test_writer_close_error_on_close(self):
2060 def writer_close():
2061 writer_non_existing
2062 reader = self.MockRawIO()
2063 writer = self.MockRawIO()
2064 writer.close = writer_close
2065 pair = self.tp(reader, writer)
2066 with self.assertRaises(NameError) as err:
2067 pair.close()
2068 self.assertIn('writer_non_existing', str(err.exception))
2069 self.assertFalse(pair.closed)
2070 self.assertTrue(reader.closed)
2071 self.assertFalse(writer.closed)
2072
Victor Stinner472f7942019-04-12 21:58:24 +02002073 # Silence destructor error
2074 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002075 writer = None
2076
Victor Stinner212646c2019-06-14 18:03:22 +02002077 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinner913fa1c2019-06-12 23:57:11 +02002078 with support.catch_unraisable_exception():
Victor Stinner212646c2019-06-14 18:03:22 +02002079 # Ignore BufferedRWPair unraisable exception
2080 with support.catch_unraisable_exception():
2081 pair = None
2082 support.gc_collect()
Victor Stinner913fa1c2019-06-12 23:57:11 +02002083 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002084
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002085 def test_reader_writer_close_error_on_close(self):
2086 def reader_close():
2087 reader_non_existing
2088 def writer_close():
2089 writer_non_existing
2090 reader = self.MockRawIO()
2091 reader.close = reader_close
2092 writer = self.MockRawIO()
2093 writer.close = writer_close
2094 pair = self.tp(reader, writer)
2095 with self.assertRaises(NameError) as err:
2096 pair.close()
2097 self.assertIn('reader_non_existing', str(err.exception))
2098 self.assertIsInstance(err.exception.__context__, NameError)
2099 self.assertIn('writer_non_existing', str(err.exception.__context__))
2100 self.assertFalse(pair.closed)
2101 self.assertFalse(reader.closed)
2102 self.assertFalse(writer.closed)
2103
Victor Stinner472f7942019-04-12 21:58:24 +02002104 # Silence destructor error
2105 reader.close = lambda: None
2106 writer.close = lambda: None
2107
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002108 def test_isatty(self):
2109 class SelectableIsAtty(MockRawIO):
2110 def __init__(self, isatty):
2111 MockRawIO.__init__(self)
2112 self._isatty = isatty
2113
2114 def isatty(self):
2115 return self._isatty
2116
2117 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2118 self.assertFalse(pair.isatty())
2119
2120 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2121 self.assertTrue(pair.isatty())
2122
2123 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2124 self.assertTrue(pair.isatty())
2125
2126 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2127 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002128
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002129 def test_weakref_clearing(self):
2130 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2131 ref = weakref.ref(brw)
2132 brw = None
2133 ref = None # Shouldn't segfault.
2134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135class CBufferedRWPairTest(BufferedRWPairTest):
2136 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002138class PyBufferedRWPairTest(BufferedRWPairTest):
2139 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002140
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002141
2142class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2143 read_mode = "rb+"
2144 write_mode = "wb+"
2145
2146 def test_constructor(self):
2147 BufferedReaderTest.test_constructor(self)
2148 BufferedWriterTest.test_constructor(self)
2149
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002150 def test_uninitialized(self):
2151 BufferedReaderTest.test_uninitialized(self)
2152 BufferedWriterTest.test_uninitialized(self)
2153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 def test_read_and_write(self):
2155 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002156 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002157
2158 self.assertEqual(b"as", rw.read(2))
2159 rw.write(b"ddd")
2160 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002161 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002162 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002163 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002164
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002165 def test_seek_and_tell(self):
2166 raw = self.BytesIO(b"asdfghjkl")
2167 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002168
Ezio Melottib3aedd42010-11-20 19:04:17 +00002169 self.assertEqual(b"as", rw.read(2))
2170 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002171 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002172 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002173
Antoine Pitroue05565e2011-08-20 14:39:23 +02002174 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002175 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002176 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002177 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002178 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002179 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002180 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002181 self.assertEqual(7, rw.tell())
2182 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002183 rw.flush()
2184 self.assertEqual(b"asdf123fl", raw.getvalue())
2185
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002186 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002187
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002188 def check_flush_and_read(self, read_func):
2189 raw = self.BytesIO(b"abcdefghi")
2190 bufio = self.tp(raw)
2191
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002194 self.assertEqual(b"ef", read_func(bufio, 2))
2195 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002196 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002197 self.assertEqual(6, bufio.tell())
2198 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002199 raw.seek(0, 0)
2200 raw.write(b"XYZ")
2201 # flush() resets the read buffer
2202 bufio.flush()
2203 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002204 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002205
2206 def test_flush_and_read(self):
2207 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2208
2209 def test_flush_and_readinto(self):
2210 def _readinto(bufio, n=-1):
2211 b = bytearray(n if n >= 0 else 9999)
2212 n = bufio.readinto(b)
2213 return bytes(b[:n])
2214 self.check_flush_and_read(_readinto)
2215
2216 def test_flush_and_peek(self):
2217 def _peek(bufio, n=-1):
2218 # This relies on the fact that the buffer can contain the whole
2219 # raw stream, otherwise peek() can return less.
2220 b = bufio.peek(n)
2221 if n != -1:
2222 b = b[:n]
2223 bufio.seek(len(b), 1)
2224 return b
2225 self.check_flush_and_read(_peek)
2226
2227 def test_flush_and_write(self):
2228 raw = self.BytesIO(b"abcdefghi")
2229 bufio = self.tp(raw)
2230
2231 bufio.write(b"123")
2232 bufio.flush()
2233 bufio.write(b"45")
2234 bufio.flush()
2235 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002236 self.assertEqual(b"12345fghi", raw.getvalue())
2237 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002238
2239 def test_threads(self):
2240 BufferedReaderTest.test_threads(self)
2241 BufferedWriterTest.test_threads(self)
2242
2243 def test_writes_and_peek(self):
2244 def _peek(bufio):
2245 bufio.peek(1)
2246 self.check_writes(_peek)
2247 def _peek(bufio):
2248 pos = bufio.tell()
2249 bufio.seek(-1, 1)
2250 bufio.peek(1)
2251 bufio.seek(pos, 0)
2252 self.check_writes(_peek)
2253
2254 def test_writes_and_reads(self):
2255 def _read(bufio):
2256 bufio.seek(-1, 1)
2257 bufio.read(1)
2258 self.check_writes(_read)
2259
2260 def test_writes_and_read1s(self):
2261 def _read1(bufio):
2262 bufio.seek(-1, 1)
2263 bufio.read1(1)
2264 self.check_writes(_read1)
2265
2266 def test_writes_and_readintos(self):
2267 def _read(bufio):
2268 bufio.seek(-1, 1)
2269 bufio.readinto(bytearray(1))
2270 self.check_writes(_read)
2271
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002272 def test_write_after_readahead(self):
2273 # Issue #6629: writing after the buffer was filled by readahead should
2274 # first rewind the raw stream.
2275 for overwrite_size in [1, 5]:
2276 raw = self.BytesIO(b"A" * 10)
2277 bufio = self.tp(raw, 4)
2278 # Trigger readahead
2279 self.assertEqual(bufio.read(1), b"A")
2280 self.assertEqual(bufio.tell(), 1)
2281 # Overwriting should rewind the raw stream if it needs so
2282 bufio.write(b"B" * overwrite_size)
2283 self.assertEqual(bufio.tell(), overwrite_size + 1)
2284 # If the write size was smaller than the buffer size, flush() and
2285 # check that rewind happens.
2286 bufio.flush()
2287 self.assertEqual(bufio.tell(), overwrite_size + 1)
2288 s = raw.getvalue()
2289 self.assertEqual(s,
2290 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2291
Antoine Pitrou7c404892011-05-13 00:13:33 +02002292 def test_write_rewind_write(self):
2293 # Various combinations of reading / writing / seeking backwards / writing again
2294 def mutate(bufio, pos1, pos2):
2295 assert pos2 >= pos1
2296 # Fill the buffer
2297 bufio.seek(pos1)
2298 bufio.read(pos2 - pos1)
2299 bufio.write(b'\x02')
2300 # This writes earlier than the previous write, but still inside
2301 # the buffer.
2302 bufio.seek(pos1)
2303 bufio.write(b'\x01')
2304
2305 b = b"\x80\x81\x82\x83\x84"
2306 for i in range(0, len(b)):
2307 for j in range(i, len(b)):
2308 raw = self.BytesIO(b)
2309 bufio = self.tp(raw, 100)
2310 mutate(bufio, i, j)
2311 bufio.flush()
2312 expected = bytearray(b)
2313 expected[j] = 2
2314 expected[i] = 1
2315 self.assertEqual(raw.getvalue(), expected,
2316 "failed result for i=%d, j=%d" % (i, j))
2317
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002318 def test_truncate_after_read_or_write(self):
2319 raw = self.BytesIO(b"A" * 10)
2320 bufio = self.tp(raw, 100)
2321 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2322 self.assertEqual(bufio.truncate(), 2)
2323 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2324 self.assertEqual(bufio.truncate(), 4)
2325
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002326 def test_misbehaved_io(self):
2327 BufferedReaderTest.test_misbehaved_io(self)
2328 BufferedWriterTest.test_misbehaved_io(self)
2329
Antoine Pitroue05565e2011-08-20 14:39:23 +02002330 def test_interleaved_read_write(self):
2331 # Test for issue #12213
2332 with self.BytesIO(b'abcdefgh') as raw:
2333 with self.tp(raw, 100) as f:
2334 f.write(b"1")
2335 self.assertEqual(f.read(1), b'b')
2336 f.write(b'2')
2337 self.assertEqual(f.read1(1), b'd')
2338 f.write(b'3')
2339 buf = bytearray(1)
2340 f.readinto(buf)
2341 self.assertEqual(buf, b'f')
2342 f.write(b'4')
2343 self.assertEqual(f.peek(1), b'h')
2344 f.flush()
2345 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2346
2347 with self.BytesIO(b'abc') as raw:
2348 with self.tp(raw, 100) as f:
2349 self.assertEqual(f.read(1), b'a')
2350 f.write(b"2")
2351 self.assertEqual(f.read(1), b'c')
2352 f.flush()
2353 self.assertEqual(raw.getvalue(), b'a2c')
2354
2355 def test_interleaved_readline_write(self):
2356 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2357 with self.tp(raw) as f:
2358 f.write(b'1')
2359 self.assertEqual(f.readline(), b'b\n')
2360 f.write(b'2')
2361 self.assertEqual(f.readline(), b'def\n')
2362 f.write(b'3')
2363 self.assertEqual(f.readline(), b'\n')
2364 f.flush()
2365 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2366
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002367 # You can't construct a BufferedRandom over a non-seekable stream.
2368 test_unseekable = None
2369
R David Murray67bfe802013-02-23 21:51:05 -05002370
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002371class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002372 tp = io.BufferedRandom
2373
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002374 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2375 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002376 def test_constructor(self):
2377 BufferedRandomTest.test_constructor(self)
2378 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002379 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002380 if sys.maxsize > 0x7FFFFFFF:
2381 rawio = self.MockRawIO()
2382 bufio = self.tp(rawio)
2383 self.assertRaises((OverflowError, MemoryError, ValueError),
2384 bufio.__init__, rawio, sys.maxsize)
2385
2386 def test_garbage_collection(self):
2387 CBufferedReaderTest.test_garbage_collection(self)
2388 CBufferedWriterTest.test_garbage_collection(self)
2389
R David Murray67bfe802013-02-23 21:51:05 -05002390 def test_args_error(self):
2391 # Issue #17275
2392 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2393 self.tp(io.BytesIO(), 1024, 1024, 1024)
2394
2395
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002396class PyBufferedRandomTest(BufferedRandomTest):
2397 tp = pyio.BufferedRandom
2398
2399
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002400# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2401# properties:
2402# - A single output character can correspond to many bytes of input.
2403# - The number of input bytes to complete the character can be
2404# undetermined until the last input byte is received.
2405# - The number of input bytes can vary depending on previous input.
2406# - A single input byte can correspond to many characters of output.
2407# - The number of output characters can be undetermined until the
2408# last input byte is received.
2409# - The number of output characters can vary depending on previous input.
2410
2411class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2412 """
2413 For testing seek/tell behavior with a stateful, buffering decoder.
2414
2415 Input is a sequence of words. Words may be fixed-length (length set
2416 by input) or variable-length (period-terminated). In variable-length
2417 mode, extra periods are ignored. Possible words are:
2418 - 'i' followed by a number sets the input length, I (maximum 99).
2419 When I is set to 0, words are space-terminated.
2420 - 'o' followed by a number sets the output length, O (maximum 99).
2421 - Any other word is converted into a word followed by a period on
2422 the output. The output word consists of the input word truncated
2423 or padded out with hyphens to make its length equal to O. If O
2424 is 0, the word is output verbatim without truncating or padding.
2425 I and O are initially set to 1. When I changes, any buffered input is
2426 re-scanned according to the new I. EOF also terminates the last word.
2427 """
2428
2429 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002430 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002431 self.reset()
2432
2433 def __repr__(self):
2434 return '<SID %x>' % id(self)
2435
2436 def reset(self):
2437 self.i = 1
2438 self.o = 1
2439 self.buffer = bytearray()
2440
2441 def getstate(self):
2442 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2443 return bytes(self.buffer), i*100 + o
2444
2445 def setstate(self, state):
2446 buffer, io = state
2447 self.buffer = bytearray(buffer)
2448 i, o = divmod(io, 100)
2449 self.i, self.o = i ^ 1, o ^ 1
2450
2451 def decode(self, input, final=False):
2452 output = ''
2453 for b in input:
2454 if self.i == 0: # variable-length, terminated with period
2455 if b == ord('.'):
2456 if self.buffer:
2457 output += self.process_word()
2458 else:
2459 self.buffer.append(b)
2460 else: # fixed-length, terminate after self.i bytes
2461 self.buffer.append(b)
2462 if len(self.buffer) == self.i:
2463 output += self.process_word()
2464 if final and self.buffer: # EOF terminates the last word
2465 output += self.process_word()
2466 return output
2467
2468 def process_word(self):
2469 output = ''
2470 if self.buffer[0] == ord('i'):
2471 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2472 elif self.buffer[0] == ord('o'):
2473 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2474 else:
2475 output = self.buffer.decode('ascii')
2476 if len(output) < self.o:
2477 output += '-'*self.o # pad out with hyphens
2478 if self.o:
2479 output = output[:self.o] # truncate to output length
2480 output += '.'
2481 self.buffer = bytearray()
2482 return output
2483
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002484 codecEnabled = False
2485
2486 @classmethod
2487 def lookupTestDecoder(cls, name):
2488 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002489 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002490 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002491 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002492 incrementalencoder=None,
2493 streamreader=None, streamwriter=None,
2494 incrementaldecoder=cls)
2495
2496# Register the previous decoder for testing.
2497# Disabled by default, tests will enable it.
2498codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2499
2500
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002501class StatefulIncrementalDecoderTest(unittest.TestCase):
2502 """
2503 Make sure the StatefulIncrementalDecoder actually works.
2504 """
2505
2506 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002507 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002508 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002509 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002510 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002511 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002512 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002513 # I=0, O=6 (variable-length input, fixed-length output)
2514 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2515 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002516 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002517 # I=6, O=3 (fixed-length input > fixed-length output)
2518 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2519 # I=0, then 3; O=29, then 15 (with longer output)
2520 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2521 'a----------------------------.' +
2522 'b----------------------------.' +
2523 'cde--------------------------.' +
2524 'abcdefghijabcde.' +
2525 'a.b------------.' +
2526 '.c.------------.' +
2527 'd.e------------.' +
2528 'k--------------.' +
2529 'l--------------.' +
2530 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002531 ]
2532
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002533 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002534 # Try a few one-shot test cases.
2535 for input, eof, output in self.test_cases:
2536 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002537 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002538
2539 # Also test an unfinished decode, followed by forcing EOF.
2540 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002541 self.assertEqual(d.decode(b'oiabcd'), '')
2542 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002543
2544class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002545
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002546 def setUp(self):
2547 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2548 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002549 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002550
Guido van Rossumd0712812007-04-11 16:32:43 +00002551 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002552 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002553
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002554 def test_constructor(self):
2555 r = self.BytesIO(b"\xc3\xa9\n\n")
2556 b = self.BufferedReader(r, 1000)
2557 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002558 t.__init__(b, encoding="latin-1", newline="\r\n")
2559 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002560 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002561 t.__init__(b, encoding="utf-8", line_buffering=True)
2562 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002563 self.assertEqual(t.line_buffering, True)
2564 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002565 self.assertRaises(TypeError, t.__init__, b, newline=42)
2566 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2567
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002568 def test_uninitialized(self):
2569 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2570 del t
2571 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2572 self.assertRaises(Exception, repr, t)
2573 self.assertRaisesRegex((ValueError, AttributeError),
2574 'uninitialized|has no attribute',
2575 t.read, 0)
2576 t.__init__(self.MockRawIO())
2577 self.assertEqual(t.read(0), '')
2578
Nick Coghlana9b15242014-02-04 22:11:18 +10002579 def test_non_text_encoding_codecs_are_rejected(self):
2580 # Ensure the constructor complains if passed a codec that isn't
2581 # marked as a text encoding
2582 # http://bugs.python.org/issue20404
2583 r = self.BytesIO()
2584 b = self.BufferedWriter(r)
2585 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2586 self.TextIOWrapper(b, encoding="hex")
2587
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002588 def test_detach(self):
2589 r = self.BytesIO()
2590 b = self.BufferedWriter(r)
2591 t = self.TextIOWrapper(b)
2592 self.assertIs(t.detach(), b)
2593
2594 t = self.TextIOWrapper(b, encoding="ascii")
2595 t.write("howdy")
2596 self.assertFalse(r.getvalue())
2597 t.detach()
2598 self.assertEqual(r.getvalue(), b"howdy")
2599 self.assertRaises(ValueError, t.detach)
2600
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002601 # Operations independent of the detached stream should still work
2602 repr(t)
2603 self.assertEqual(t.encoding, "ascii")
2604 self.assertEqual(t.errors, "strict")
2605 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002606 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002607
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002608 def test_repr(self):
2609 raw = self.BytesIO("hello".encode("utf-8"))
2610 b = self.BufferedReader(raw)
2611 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002612 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002613 self.assertRegex(repr(t),
2614 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002615 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002616 self.assertRegex(repr(t),
2617 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002618 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002619 self.assertRegex(repr(t),
2620 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002621 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002622 self.assertRegex(repr(t),
2623 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002624
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002625 t.buffer.detach()
2626 repr(t) # Should not raise an exception
2627
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002628 def test_recursive_repr(self):
2629 # Issue #25455
2630 raw = self.BytesIO()
2631 t = self.TextIOWrapper(raw)
2632 with support.swap_attr(raw, 'name', t):
2633 try:
2634 repr(t) # Should not crash
2635 except RuntimeError:
2636 pass
2637
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002638 def test_line_buffering(self):
2639 r = self.BytesIO()
2640 b = self.BufferedWriter(r, 1000)
2641 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002642 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002643 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002644 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002645 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002646 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002647 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002648
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002649 def test_reconfigure_line_buffering(self):
2650 r = self.BytesIO()
2651 b = self.BufferedWriter(r, 1000)
2652 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2653 t.write("AB\nC")
2654 self.assertEqual(r.getvalue(), b"")
2655
2656 t.reconfigure(line_buffering=True) # implicit flush
2657 self.assertEqual(r.getvalue(), b"AB\nC")
2658 t.write("DEF\nG")
2659 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2660 t.write("H")
2661 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2662 t.reconfigure(line_buffering=False) # implicit flush
2663 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2664 t.write("IJ")
2665 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2666
2667 # Keeping default value
2668 t.reconfigure()
2669 t.reconfigure(line_buffering=None)
2670 self.assertEqual(t.line_buffering, False)
2671 t.reconfigure(line_buffering=True)
2672 t.reconfigure()
2673 t.reconfigure(line_buffering=None)
2674 self.assertEqual(t.line_buffering, True)
2675
Victor Stinner91106cd2017-12-13 12:29:09 +01002676 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002677 def test_default_encoding(self):
2678 old_environ = dict(os.environ)
2679 try:
2680 # try to get a user preferred encoding different than the current
2681 # locale encoding to check that TextIOWrapper() uses the current
2682 # locale encoding and not the user preferred encoding
2683 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2684 if key in os.environ:
2685 del os.environ[key]
2686
2687 current_locale_encoding = locale.getpreferredencoding(False)
2688 b = self.BytesIO()
2689 t = self.TextIOWrapper(b)
2690 self.assertEqual(t.encoding, current_locale_encoding)
2691 finally:
2692 os.environ.clear()
2693 os.environ.update(old_environ)
2694
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002695 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002696 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002697 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002698 # Issue 15989
2699 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002700 b = self.BytesIO()
2701 b.fileno = lambda: _testcapi.INT_MAX + 1
2702 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2703 b.fileno = lambda: _testcapi.UINT_MAX + 1
2704 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2705
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002706 def test_encoding(self):
2707 # Check the encoding attribute is always set, and valid
2708 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002709 t = self.TextIOWrapper(b, encoding="utf-8")
2710 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002711 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002712 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002713 codecs.lookup(t.encoding)
2714
2715 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002716 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002717 b = self.BytesIO(b"abc\n\xff\n")
2718 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002719 self.assertRaises(UnicodeError, t.read)
2720 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002721 b = self.BytesIO(b"abc\n\xff\n")
2722 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002723 self.assertRaises(UnicodeError, t.read)
2724 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002725 b = self.BytesIO(b"abc\n\xff\n")
2726 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002727 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002728 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 b = self.BytesIO(b"abc\n\xff\n")
2730 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002731 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002732
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002733 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002734 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002735 b = self.BytesIO()
2736 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002737 self.assertRaises(UnicodeError, t.write, "\xff")
2738 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002739 b = self.BytesIO()
2740 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002741 self.assertRaises(UnicodeError, t.write, "\xff")
2742 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002743 b = self.BytesIO()
2744 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002745 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002746 t.write("abc\xffdef\n")
2747 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002748 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002749 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002750 b = self.BytesIO()
2751 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002752 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002753 t.write("abc\xffdef\n")
2754 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002755 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002756
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002757 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002758 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2759
2760 tests = [
2761 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002762 [ '', input_lines ],
2763 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2764 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2765 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002766 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002767 encodings = (
2768 'utf-8', 'latin-1',
2769 'utf-16', 'utf-16-le', 'utf-16-be',
2770 'utf-32', 'utf-32-le', 'utf-32-be',
2771 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002772
Guido van Rossum8358db22007-08-18 21:39:55 +00002773 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002774 # character in TextIOWrapper._pending_line.
2775 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002776 # XXX: str.encode() should return bytes
2777 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002778 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002779 for bufsize in range(1, 10):
2780 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002781 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2782 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002783 encoding=encoding)
2784 if do_reads:
2785 got_lines = []
2786 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002787 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002788 if c2 == '':
2789 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002790 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002791 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002792 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002793 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002794
2795 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002796 self.assertEqual(got_line, exp_line)
2797 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002798
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002799 def test_newlines_input(self):
2800 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002801 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2802 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002803 (None, normalized.decode("ascii").splitlines(keepends=True)),
2804 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002805 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2806 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2807 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002808 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002809 buf = self.BytesIO(testdata)
2810 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002811 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002812 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002813 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002814
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002815 def test_newlines_output(self):
2816 testdict = {
2817 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2818 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2819 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2820 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2821 }
2822 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2823 for newline, expected in tests:
2824 buf = self.BytesIO()
2825 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2826 txt.write("AAA\nB")
2827 txt.write("BB\nCCC\n")
2828 txt.write("X\rY\r\nZ")
2829 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002830 self.assertEqual(buf.closed, False)
2831 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002832
2833 def test_destructor(self):
2834 l = []
2835 base = self.BytesIO
2836 class MyBytesIO(base):
2837 def close(self):
2838 l.append(self.getvalue())
2839 base.close(self)
2840 b = MyBytesIO()
2841 t = self.TextIOWrapper(b, encoding="ascii")
2842 t.write("abc")
2843 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002844 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002845 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002846
2847 def test_override_destructor(self):
2848 record = []
2849 class MyTextIO(self.TextIOWrapper):
2850 def __del__(self):
2851 record.append(1)
2852 try:
2853 f = super().__del__
2854 except AttributeError:
2855 pass
2856 else:
2857 f()
2858 def close(self):
2859 record.append(2)
2860 super().close()
2861 def flush(self):
2862 record.append(3)
2863 super().flush()
2864 b = self.BytesIO()
2865 t = MyTextIO(b, encoding="ascii")
2866 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002867 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002868 self.assertEqual(record, [1, 2, 3])
2869
2870 def test_error_through_destructor(self):
2871 # Test that the exception state is not modified by a destructor,
2872 # even if close() fails.
2873 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002874 with support.catch_unraisable_exception() as cm:
2875 with self.assertRaises(AttributeError):
2876 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002877
2878 if not IOBASE_EMITS_UNRAISABLE:
2879 self.assertIsNone(cm.unraisable)
2880 elif cm.unraisable is not None:
2881 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002882
Guido van Rossum9b76da62007-04-11 01:09:03 +00002883 # Systematic tests of the text I/O API
2884
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002885 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002886 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002887 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002888 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002889 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002890 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002891 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002892 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002893 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002894 self.assertEqual(f.tell(), 0)
2895 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002896 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002897 self.assertEqual(f.seek(0), 0)
2898 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002899 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002900 self.assertEqual(f.read(2), "ab")
2901 self.assertEqual(f.read(1), "c")
2902 self.assertEqual(f.read(1), "")
2903 self.assertEqual(f.read(), "")
2904 self.assertEqual(f.tell(), cookie)
2905 self.assertEqual(f.seek(0), 0)
2906 self.assertEqual(f.seek(0, 2), cookie)
2907 self.assertEqual(f.write("def"), 3)
2908 self.assertEqual(f.seek(cookie), cookie)
2909 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002910 if enc.startswith("utf"):
2911 self.multi_line_test(f, enc)
2912 f.close()
2913
2914 def multi_line_test(self, f, enc):
2915 f.seek(0)
2916 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002917 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002918 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002919 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002920 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002921 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002922 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002923 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002924 wlines.append((f.tell(), line))
2925 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002926 f.seek(0)
2927 rlines = []
2928 while True:
2929 pos = f.tell()
2930 line = f.readline()
2931 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002932 break
2933 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002934 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002935
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002936 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002937 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002938 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002939 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002940 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002941 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002942 p2 = f.tell()
2943 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002944 self.assertEqual(f.tell(), p0)
2945 self.assertEqual(f.readline(), "\xff\n")
2946 self.assertEqual(f.tell(), p1)
2947 self.assertEqual(f.readline(), "\xff\n")
2948 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002949 f.seek(0)
2950 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002951 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002952 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002953 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002954 f.close()
2955
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002956 def test_seeking(self):
2957 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002958 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002959 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002960 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002961 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002962 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002963 suffix = bytes(u_suffix.encode("utf-8"))
2964 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002965 with self.open(support.TESTFN, "wb") as f:
2966 f.write(line*2)
2967 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2968 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002969 self.assertEqual(s, str(prefix, "ascii"))
2970 self.assertEqual(f.tell(), prefix_size)
2971 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002973 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002974 # Regression test for a specific bug
2975 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002976 with self.open(support.TESTFN, "wb") as f:
2977 f.write(data)
2978 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2979 f._CHUNK_SIZE # Just test that it exists
2980 f._CHUNK_SIZE = 2
2981 f.readline()
2982 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002983
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002984 def test_seek_and_tell(self):
2985 #Test seek/tell using the StatefulIncrementalDecoder.
2986 # Make test faster by doing smaller seeks
2987 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002988
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002989 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002990 """Tell/seek to various points within a data stream and ensure
2991 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002992 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002993 f.write(data)
2994 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002995 f = self.open(support.TESTFN, encoding='test_decoder')
2996 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002997 decoded = f.read()
2998 f.close()
2999
Neal Norwitze2b07052008-03-18 19:52:05 +00003000 for i in range(min_pos, len(decoded) + 1): # seek positions
3001 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003002 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003003 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003004 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003005 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003006 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003007 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003008 f.close()
3009
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003010 # Enable the test decoder.
3011 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003012
3013 # Run the tests.
3014 try:
3015 # Try each test case.
3016 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003017 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003018
3019 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003020 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3021 offset = CHUNK_SIZE - len(input)//2
3022 prefix = b'.'*offset
3023 # Don't bother seeking into the prefix (takes too long).
3024 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003025 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003026
3027 # Ensure our test decoder won't interfere with subsequent tests.
3028 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003029 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003030
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003031 def test_multibyte_seek_and_tell(self):
3032 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3033 f.write("AB\n\u3046\u3048\n")
3034 f.close()
3035
3036 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3037 self.assertEqual(f.readline(), "AB\n")
3038 p0 = f.tell()
3039 self.assertEqual(f.readline(), "\u3046\u3048\n")
3040 p1 = f.tell()
3041 f.seek(p0)
3042 self.assertEqual(f.readline(), "\u3046\u3048\n")
3043 self.assertEqual(f.tell(), p1)
3044 f.close()
3045
3046 def test_seek_with_encoder_state(self):
3047 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3048 f.write("\u00e6\u0300")
3049 p0 = f.tell()
3050 f.write("\u00e6")
3051 f.seek(p0)
3052 f.write("\u0300")
3053 f.close()
3054
3055 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3056 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3057 f.close()
3058
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003059 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003060 data = "1234567890"
3061 tests = ("utf-16",
3062 "utf-16-le",
3063 "utf-16-be",
3064 "utf-32",
3065 "utf-32-le",
3066 "utf-32-be")
3067 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003068 buf = self.BytesIO()
3069 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003070 # Check if the BOM is written only once (see issue1753).
3071 f.write(data)
3072 f.write(data)
3073 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003074 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003075 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003076 self.assertEqual(f.read(), data * 2)
3077 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003078
Benjamin Petersona1b49012009-03-31 23:11:32 +00003079 def test_unreadable(self):
3080 class UnReadable(self.BytesIO):
3081 def readable(self):
3082 return False
3083 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003084 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003085
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003086 def test_read_one_by_one(self):
3087 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003088 reads = ""
3089 while True:
3090 c = txt.read(1)
3091 if not c:
3092 break
3093 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003094 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003095
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003096 def test_readlines(self):
3097 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3098 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3099 txt.seek(0)
3100 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3101 txt.seek(0)
3102 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3103
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003104 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003105 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003106 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003107 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003108 reads = ""
3109 while True:
3110 c = txt.read(128)
3111 if not c:
3112 break
3113 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003114 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003115
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003116 def test_writelines(self):
3117 l = ['ab', 'cd', 'ef']
3118 buf = self.BytesIO()
3119 txt = self.TextIOWrapper(buf)
3120 txt.writelines(l)
3121 txt.flush()
3122 self.assertEqual(buf.getvalue(), b'abcdef')
3123
3124 def test_writelines_userlist(self):
3125 l = UserList(['ab', 'cd', 'ef'])
3126 buf = self.BytesIO()
3127 txt = self.TextIOWrapper(buf)
3128 txt.writelines(l)
3129 txt.flush()
3130 self.assertEqual(buf.getvalue(), b'abcdef')
3131
3132 def test_writelines_error(self):
3133 txt = self.TextIOWrapper(self.BytesIO())
3134 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3135 self.assertRaises(TypeError, txt.writelines, None)
3136 self.assertRaises(TypeError, txt.writelines, b'abc')
3137
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003138 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003139 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003140
3141 # read one char at a time
3142 reads = ""
3143 while True:
3144 c = txt.read(1)
3145 if not c:
3146 break
3147 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003148 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003149
3150 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003151 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003152 txt._CHUNK_SIZE = 4
3153
3154 reads = ""
3155 while True:
3156 c = txt.read(4)
3157 if not c:
3158 break
3159 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003160 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003161
3162 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003163 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003164 txt._CHUNK_SIZE = 4
3165
3166 reads = txt.read(4)
3167 reads += txt.read(4)
3168 reads += txt.readline()
3169 reads += txt.readline()
3170 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003171 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003172
3173 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003174 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003175 txt._CHUNK_SIZE = 4
3176
3177 reads = txt.read(4)
3178 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003179 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003180
3181 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003182 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003183 txt._CHUNK_SIZE = 4
3184
3185 reads = txt.read(4)
3186 pos = txt.tell()
3187 txt.seek(0)
3188 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003189 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003190
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003191 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003192 buffer = self.BytesIO(self.testdata)
3193 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003194
3195 self.assertEqual(buffer.seekable(), txt.seekable())
3196
Antoine Pitroue4501852009-05-14 18:55:55 +00003197 def test_append_bom(self):
3198 # The BOM is not written again when appending to a non-empty file
3199 filename = support.TESTFN
3200 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3201 with self.open(filename, 'w', encoding=charset) as f:
3202 f.write('aaa')
3203 pos = f.tell()
3204 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003205 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003206
3207 with self.open(filename, 'a', encoding=charset) as f:
3208 f.write('xxx')
3209 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003210 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003211
3212 def test_seek_bom(self):
3213 # Same test, but when seeking manually
3214 filename = support.TESTFN
3215 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3216 with self.open(filename, 'w', encoding=charset) as f:
3217 f.write('aaa')
3218 pos = f.tell()
3219 with self.open(filename, 'r+', encoding=charset) as f:
3220 f.seek(pos)
3221 f.write('zzz')
3222 f.seek(0)
3223 f.write('bbb')
3224 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003225 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003226
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003227 def test_seek_append_bom(self):
3228 # Same test, but first seek to the start and then to the end
3229 filename = support.TESTFN
3230 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3231 with self.open(filename, 'w', encoding=charset) as f:
3232 f.write('aaa')
3233 with self.open(filename, 'a', encoding=charset) as f:
3234 f.seek(0)
3235 f.seek(0, self.SEEK_END)
3236 f.write('xxx')
3237 with self.open(filename, 'rb') as f:
3238 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3239
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003240 def test_errors_property(self):
3241 with self.open(support.TESTFN, "w") as f:
3242 self.assertEqual(f.errors, "strict")
3243 with self.open(support.TESTFN, "w", errors="replace") as f:
3244 self.assertEqual(f.errors, "replace")
3245
Brett Cannon31f59292011-02-21 19:29:56 +00003246 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003247 def test_threads_write(self):
3248 # Issue6750: concurrent writes could duplicate data
3249 event = threading.Event()
3250 with self.open(support.TESTFN, "w", buffering=1) as f:
3251 def run(n):
3252 text = "Thread%03d\n" % n
3253 event.wait()
3254 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003255 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003256 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003257 with support.start_threads(threads, event.set):
3258 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003259 with self.open(support.TESTFN) as f:
3260 content = f.read()
3261 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003262 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003263
Antoine Pitrou6be88762010-05-03 16:48:20 +00003264 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003265 # Test that text file is closed despite failed flush
3266 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003267 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003268 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003269 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003270 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003271 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003272 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003273 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003274 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003275 self.assertTrue(txt.buffer.closed)
3276 self.assertTrue(closed) # flush() called
3277 self.assertFalse(closed[0]) # flush() called before file closed
3278 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003279 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003280
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003281 def test_close_error_on_close(self):
3282 buffer = self.BytesIO(self.testdata)
3283 def bad_flush():
3284 raise OSError('flush')
3285 def bad_close():
3286 raise OSError('close')
3287 buffer.close = bad_close
3288 txt = self.TextIOWrapper(buffer, encoding="ascii")
3289 txt.flush = bad_flush
3290 with self.assertRaises(OSError) as err: # exception not swallowed
3291 txt.close()
3292 self.assertEqual(err.exception.args, ('close',))
3293 self.assertIsInstance(err.exception.__context__, OSError)
3294 self.assertEqual(err.exception.__context__.args, ('flush',))
3295 self.assertFalse(txt.closed)
3296
Victor Stinner472f7942019-04-12 21:58:24 +02003297 # Silence destructor error
3298 buffer.close = lambda: None
3299 txt.flush = lambda: None
3300
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003301 def test_nonnormalized_close_error_on_close(self):
3302 # Issue #21677
3303 buffer = self.BytesIO(self.testdata)
3304 def bad_flush():
3305 raise non_existing_flush
3306 def bad_close():
3307 raise non_existing_close
3308 buffer.close = bad_close
3309 txt = self.TextIOWrapper(buffer, encoding="ascii")
3310 txt.flush = bad_flush
3311 with self.assertRaises(NameError) as err: # exception not swallowed
3312 txt.close()
3313 self.assertIn('non_existing_close', str(err.exception))
3314 self.assertIsInstance(err.exception.__context__, NameError)
3315 self.assertIn('non_existing_flush', str(err.exception.__context__))
3316 self.assertFalse(txt.closed)
3317
Victor Stinner472f7942019-04-12 21:58:24 +02003318 # Silence destructor error
3319 buffer.close = lambda: None
3320 txt.flush = lambda: None
3321
Antoine Pitrou6be88762010-05-03 16:48:20 +00003322 def test_multi_close(self):
3323 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3324 txt.close()
3325 txt.close()
3326 txt.close()
3327 self.assertRaises(ValueError, txt.flush)
3328
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003329 def test_unseekable(self):
3330 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3331 self.assertRaises(self.UnsupportedOperation, txt.tell)
3332 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3333
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003334 def test_readonly_attributes(self):
3335 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3336 buf = self.BytesIO(self.testdata)
3337 with self.assertRaises(AttributeError):
3338 txt.buffer = buf
3339
Antoine Pitroue96ec682011-07-23 21:46:35 +02003340 def test_rawio(self):
3341 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3342 # that subprocess.Popen() can have the required unbuffered
3343 # semantics with universal_newlines=True.
3344 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3345 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3346 # Reads
3347 self.assertEqual(txt.read(4), 'abcd')
3348 self.assertEqual(txt.readline(), 'efghi\n')
3349 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3350
3351 def test_rawio_write_through(self):
3352 # Issue #12591: with write_through=True, writes don't need a flush
3353 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3354 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3355 write_through=True)
3356 txt.write('1')
3357 txt.write('23\n4')
3358 txt.write('5')
3359 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3360
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003361 def test_bufio_write_through(self):
3362 # Issue #21396: write_through=True doesn't force a flush()
3363 # on the underlying binary buffered object.
3364 flush_called, write_called = [], []
3365 class BufferedWriter(self.BufferedWriter):
3366 def flush(self, *args, **kwargs):
3367 flush_called.append(True)
3368 return super().flush(*args, **kwargs)
3369 def write(self, *args, **kwargs):
3370 write_called.append(True)
3371 return super().write(*args, **kwargs)
3372
3373 rawio = self.BytesIO()
3374 data = b"a"
3375 bufio = BufferedWriter(rawio, len(data)*2)
3376 textio = self.TextIOWrapper(bufio, encoding='ascii',
3377 write_through=True)
3378 # write to the buffered io but don't overflow the buffer
3379 text = data.decode('ascii')
3380 textio.write(text)
3381
3382 # buffer.flush is not called with write_through=True
3383 self.assertFalse(flush_called)
3384 # buffer.write *is* called with write_through=True
3385 self.assertTrue(write_called)
3386 self.assertEqual(rawio.getvalue(), b"") # no flush
3387
3388 write_called = [] # reset
3389 textio.write(text * 10) # total content is larger than bufio buffer
3390 self.assertTrue(write_called)
3391 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3392
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003393 def test_reconfigure_write_through(self):
3394 raw = self.MockRawIO([])
3395 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3396 t.write('1')
3397 t.reconfigure(write_through=True) # implied flush
3398 self.assertEqual(t.write_through, True)
3399 self.assertEqual(b''.join(raw._write_stack), b'1')
3400 t.write('23')
3401 self.assertEqual(b''.join(raw._write_stack), b'123')
3402 t.reconfigure(write_through=False)
3403 self.assertEqual(t.write_through, False)
3404 t.write('45')
3405 t.flush()
3406 self.assertEqual(b''.join(raw._write_stack), b'12345')
3407 # Keeping default value
3408 t.reconfigure()
3409 t.reconfigure(write_through=None)
3410 self.assertEqual(t.write_through, False)
3411 t.reconfigure(write_through=True)
3412 t.reconfigure()
3413 t.reconfigure(write_through=None)
3414 self.assertEqual(t.write_through, True)
3415
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003416 def test_read_nonbytes(self):
3417 # Issue #17106
3418 # Crash when underlying read() returns non-bytes
3419 t = self.TextIOWrapper(self.StringIO('a'))
3420 self.assertRaises(TypeError, t.read, 1)
3421 t = self.TextIOWrapper(self.StringIO('a'))
3422 self.assertRaises(TypeError, t.readline)
3423 t = self.TextIOWrapper(self.StringIO('a'))
3424 self.assertRaises(TypeError, t.read)
3425
Oren Milmana5b4ea12017-08-25 21:14:54 +03003426 def test_illegal_encoder(self):
3427 # Issue 31271: Calling write() while the return value of encoder's
3428 # encode() is invalid shouldn't cause an assertion failure.
3429 rot13 = codecs.lookup("rot13")
3430 with support.swap_attr(rot13, '_is_text_encoding', True):
3431 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3432 self.assertRaises(TypeError, t.write, 'bar')
3433
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003434 def test_illegal_decoder(self):
3435 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003436 # Bypass the early encoding check added in issue 20404
3437 def _make_illegal_wrapper():
3438 quopri = codecs.lookup("quopri")
3439 quopri._is_text_encoding = True
3440 try:
3441 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3442 newline='\n', encoding="quopri")
3443 finally:
3444 quopri._is_text_encoding = False
3445 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003446 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003447 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003448 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003449 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003450 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003451 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003452 self.assertRaises(TypeError, t.read)
3453
Oren Milmanba7d7362017-08-29 11:58:27 +03003454 # Issue 31243: calling read() while the return value of decoder's
3455 # getstate() is invalid should neither crash the interpreter nor
3456 # raise a SystemError.
3457 def _make_very_illegal_wrapper(getstate_ret_val):
3458 class BadDecoder:
3459 def getstate(self):
3460 return getstate_ret_val
3461 def _get_bad_decoder(dummy):
3462 return BadDecoder()
3463 quopri = codecs.lookup("quopri")
3464 with support.swap_attr(quopri, 'incrementaldecoder',
3465 _get_bad_decoder):
3466 return _make_illegal_wrapper()
3467 t = _make_very_illegal_wrapper(42)
3468 self.assertRaises(TypeError, t.read, 42)
3469 t = _make_very_illegal_wrapper(())
3470 self.assertRaises(TypeError, t.read, 42)
3471 t = _make_very_illegal_wrapper((1, 2))
3472 self.assertRaises(TypeError, t.read, 42)
3473
Antoine Pitrou712cb732013-12-21 15:51:54 +01003474 def _check_create_at_shutdown(self, **kwargs):
3475 # Issue #20037: creating a TextIOWrapper at shutdown
3476 # shouldn't crash the interpreter.
3477 iomod = self.io.__name__
3478 code = """if 1:
3479 import codecs
3480 import {iomod} as io
3481
3482 # Avoid looking up codecs at shutdown
3483 codecs.lookup('utf-8')
3484
3485 class C:
3486 def __init__(self):
3487 self.buf = io.BytesIO()
3488 def __del__(self):
3489 io.TextIOWrapper(self.buf, **{kwargs})
3490 print("ok")
3491 c = C()
3492 """.format(iomod=iomod, kwargs=kwargs)
3493 return assert_python_ok("-c", code)
3494
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003495 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003496 def test_create_at_shutdown_without_encoding(self):
3497 rc, out, err = self._check_create_at_shutdown()
3498 if err:
3499 # Can error out with a RuntimeError if the module state
3500 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003501 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003502 else:
3503 self.assertEqual("ok", out.decode().strip())
3504
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003505 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003506 def test_create_at_shutdown_with_encoding(self):
3507 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3508 errors='strict')
3509 self.assertFalse(err)
3510 self.assertEqual("ok", out.decode().strip())
3511
Antoine Pitroub8503892014-04-29 10:14:02 +02003512 def test_read_byteslike(self):
3513 r = MemviewBytesIO(b'Just some random string\n')
3514 t = self.TextIOWrapper(r, 'utf-8')
3515
3516 # TextIOwrapper will not read the full string, because
3517 # we truncate it to a multiple of the native int size
3518 # so that we can construct a more complex memoryview.
3519 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3520
3521 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3522
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003523 def test_issue22849(self):
3524 class F(object):
3525 def readable(self): return True
3526 def writable(self): return True
3527 def seekable(self): return True
3528
3529 for i in range(10):
3530 try:
3531 self.TextIOWrapper(F(), encoding='utf-8')
3532 except Exception:
3533 pass
3534
3535 F.tell = lambda x: 0
3536 t = self.TextIOWrapper(F(), encoding='utf-8')
3537
INADA Naoki507434f2017-12-21 09:59:53 +09003538 def test_reconfigure_encoding_read(self):
3539 # latin1 -> utf8
3540 # (latin1 can decode utf-8 encoded string)
3541 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3542 raw = self.BytesIO(data)
3543 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3544 self.assertEqual(txt.readline(), 'abc\xe9\n')
3545 with self.assertRaises(self.UnsupportedOperation):
3546 txt.reconfigure(encoding='utf-8')
3547 with self.assertRaises(self.UnsupportedOperation):
3548 txt.reconfigure(newline=None)
3549
3550 def test_reconfigure_write_fromascii(self):
3551 # ascii has a specific encodefunc in the C implementation,
3552 # but utf-8-sig has not. Make sure that we get rid of the
3553 # cached encodefunc when we switch encoders.
3554 raw = self.BytesIO()
3555 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3556 txt.write('foo\n')
3557 txt.reconfigure(encoding='utf-8-sig')
3558 txt.write('\xe9\n')
3559 txt.flush()
3560 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3561
3562 def test_reconfigure_write(self):
3563 # latin -> utf8
3564 raw = self.BytesIO()
3565 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3566 txt.write('abc\xe9\n')
3567 txt.reconfigure(encoding='utf-8')
3568 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3569 txt.write('d\xe9f\n')
3570 txt.flush()
3571 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3572
3573 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3574 # the file
3575 raw = self.BytesIO()
3576 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3577 txt.write('abc\n')
3578 txt.reconfigure(encoding='utf-8-sig')
3579 txt.write('d\xe9f\n')
3580 txt.flush()
3581 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3582
3583 def test_reconfigure_write_non_seekable(self):
3584 raw = self.BytesIO()
3585 raw.seekable = lambda: False
3586 raw.seek = None
3587 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3588 txt.write('abc\n')
3589 txt.reconfigure(encoding='utf-8-sig')
3590 txt.write('d\xe9f\n')
3591 txt.flush()
3592
3593 # If the raw stream is not seekable, there'll be a BOM
3594 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3595
3596 def test_reconfigure_defaults(self):
3597 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3598 txt.reconfigure(encoding=None)
3599 self.assertEqual(txt.encoding, 'ascii')
3600 self.assertEqual(txt.errors, 'replace')
3601 txt.write('LF\n')
3602
3603 txt.reconfigure(newline='\r\n')
3604 self.assertEqual(txt.encoding, 'ascii')
3605 self.assertEqual(txt.errors, 'replace')
3606
3607 txt.reconfigure(errors='ignore')
3608 self.assertEqual(txt.encoding, 'ascii')
3609 self.assertEqual(txt.errors, 'ignore')
3610 txt.write('CRLF\n')
3611
3612 txt.reconfigure(encoding='utf-8', newline=None)
3613 self.assertEqual(txt.errors, 'strict')
3614 txt.seek(0)
3615 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3616
3617 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3618
3619 def test_reconfigure_newline(self):
3620 raw = self.BytesIO(b'CR\rEOF')
3621 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3622 txt.reconfigure(newline=None)
3623 self.assertEqual(txt.readline(), 'CR\n')
3624 raw = self.BytesIO(b'CR\rEOF')
3625 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3626 txt.reconfigure(newline='')
3627 self.assertEqual(txt.readline(), 'CR\r')
3628 raw = self.BytesIO(b'CR\rLF\nEOF')
3629 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3630 txt.reconfigure(newline='\n')
3631 self.assertEqual(txt.readline(), 'CR\rLF\n')
3632 raw = self.BytesIO(b'LF\nCR\rEOF')
3633 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3634 txt.reconfigure(newline='\r')
3635 self.assertEqual(txt.readline(), 'LF\nCR\r')
3636 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3637 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3638 txt.reconfigure(newline='\r\n')
3639 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3640
3641 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3642 txt.reconfigure(newline=None)
3643 txt.write('linesep\n')
3644 txt.reconfigure(newline='')
3645 txt.write('LF\n')
3646 txt.reconfigure(newline='\n')
3647 txt.write('LF\n')
3648 txt.reconfigure(newline='\r')
3649 txt.write('CR\n')
3650 txt.reconfigure(newline='\r\n')
3651 txt.write('CRLF\n')
3652 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3653 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3654
Zackery Spytz23db9352018-06-29 04:14:58 -06003655 def test_issue25862(self):
3656 # Assertion failures occurred in tell() after read() and write().
3657 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3658 t.read(1)
3659 t.read()
3660 t.tell()
3661 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3662 t.read(1)
3663 t.write('x')
3664 t.tell()
3665
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003666
Antoine Pitroub8503892014-04-29 10:14:02 +02003667class MemviewBytesIO(io.BytesIO):
3668 '''A BytesIO object whose read method returns memoryviews
3669 rather than bytes'''
3670
3671 def read1(self, len_):
3672 return _to_memoryview(super().read1(len_))
3673
3674 def read(self, len_):
3675 return _to_memoryview(super().read(len_))
3676
3677def _to_memoryview(buf):
3678 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3679
3680 arr = array.array('i')
3681 idx = len(buf) - len(buf) % arr.itemsize
3682 arr.frombytes(buf[:idx])
3683 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003684
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003685
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003686class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003687 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003688 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003689
3690 def test_initialization(self):
3691 r = self.BytesIO(b"\xc3\xa9\n\n")
3692 b = self.BufferedReader(r, 1000)
3693 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003694 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3695 self.assertRaises(ValueError, t.read)
3696
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003697 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3698 self.assertRaises(Exception, repr, t)
3699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003700 def test_garbage_collection(self):
3701 # C TextIOWrapper objects are collected, and collecting them flushes
3702 # all data to disk.
3703 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003704 with support.check_warnings(('', ResourceWarning)):
3705 rawio = io.FileIO(support.TESTFN, "wb")
3706 b = self.BufferedWriter(rawio)
3707 t = self.TextIOWrapper(b, encoding="ascii")
3708 t.write("456def")
3709 t.x = t
3710 wr = weakref.ref(t)
3711 del t
3712 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003713 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003714 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003715 self.assertEqual(f.read(), b"456def")
3716
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003717 def test_rwpair_cleared_before_textio(self):
3718 # Issue 13070: TextIOWrapper's finalization would crash when called
3719 # after the reference to the underlying BufferedRWPair's writer got
3720 # cleared by the GC.
3721 for i in range(1000):
3722 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3723 t1 = self.TextIOWrapper(b1, encoding="ascii")
3724 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3725 t2 = self.TextIOWrapper(b2, encoding="ascii")
3726 # circular references
3727 t1.buddy = t2
3728 t2.buddy = t1
3729 support.gc_collect()
3730
Zackery Spytz842acaa2018-12-17 07:52:45 -07003731 def test_del__CHUNK_SIZE_SystemError(self):
3732 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3733 with self.assertRaises(AttributeError):
3734 del t._CHUNK_SIZE
3735
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003736
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003737class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003738 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003739 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003740
3741
3742class IncrementalNewlineDecoderTest(unittest.TestCase):
3743
3744 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003745 # UTF-8 specific tests for a newline decoder
3746 def _check_decode(b, s, **kwargs):
3747 # We exercise getstate() / setstate() as well as decode()
3748 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003749 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003750 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003751 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003752
Antoine Pitrou180a3362008-12-14 16:36:46 +00003753 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003754
Antoine Pitrou180a3362008-12-14 16:36:46 +00003755 _check_decode(b'\xe8', "")
3756 _check_decode(b'\xa2', "")
3757 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003758
Antoine Pitrou180a3362008-12-14 16:36:46 +00003759 _check_decode(b'\xe8', "")
3760 _check_decode(b'\xa2', "")
3761 _check_decode(b'\x88', "\u8888")
3762
3763 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003764 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3765
Antoine Pitrou180a3362008-12-14 16:36:46 +00003766 decoder.reset()
3767 _check_decode(b'\n', "\n")
3768 _check_decode(b'\r', "")
3769 _check_decode(b'', "\n", final=True)
3770 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003771
Antoine Pitrou180a3362008-12-14 16:36:46 +00003772 _check_decode(b'\r', "")
3773 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003774
Antoine Pitrou180a3362008-12-14 16:36:46 +00003775 _check_decode(b'\r\r\n', "\n\n")
3776 _check_decode(b'\r', "")
3777 _check_decode(b'\r', "\n")
3778 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003779
Antoine Pitrou180a3362008-12-14 16:36:46 +00003780 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3781 _check_decode(b'\xe8\xa2\x88', "\u8888")
3782 _check_decode(b'\n', "\n")
3783 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3784 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003785
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003786 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003787 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003788 if encoding is not None:
3789 encoder = codecs.getincrementalencoder(encoding)()
3790 def _decode_bytewise(s):
3791 # Decode one byte at a time
3792 for b in encoder.encode(s):
3793 result.append(decoder.decode(bytes([b])))
3794 else:
3795 encoder = None
3796 def _decode_bytewise(s):
3797 # Decode one char at a time
3798 for c in s:
3799 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003800 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003801 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003802 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003803 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003804 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003805 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003806 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003807 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003808 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003809 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003810 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003811 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003812 input = "abc"
3813 if encoder is not None:
3814 encoder.reset()
3815 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003816 self.assertEqual(decoder.decode(input), "abc")
3817 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003818
3819 def test_newline_decoder(self):
3820 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003821 # None meaning the IncrementalNewlineDecoder takes unicode input
3822 # rather than bytes input
3823 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003824 'utf-16', 'utf-16-le', 'utf-16-be',
3825 'utf-32', 'utf-32-le', 'utf-32-be',
3826 )
3827 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003828 decoder = enc and codecs.getincrementaldecoder(enc)()
3829 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3830 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003831 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003832 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3833 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003834 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003835
Antoine Pitrou66913e22009-03-06 23:40:56 +00003836 def test_newline_bytes(self):
3837 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3838 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003839 self.assertEqual(dec.newlines, None)
3840 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3841 self.assertEqual(dec.newlines, None)
3842 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3843 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003844 dec = self.IncrementalNewlineDecoder(None, translate=False)
3845 _check(dec)
3846 dec = self.IncrementalNewlineDecoder(None, translate=True)
3847 _check(dec)
3848
Xiang Zhangb08746b2018-10-31 19:49:16 +08003849 def test_translate(self):
3850 # issue 35062
3851 for translate in (-2, -1, 1, 2):
3852 decoder = codecs.getincrementaldecoder("utf-8")()
3853 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3854 self.check_newline_decoding_utf8(decoder)
3855 decoder = codecs.getincrementaldecoder("utf-8")()
3856 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3857 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3858
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003859class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3860 pass
3861
3862class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3863 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003864
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003865
Guido van Rossum01a27522007-03-07 01:00:12 +00003866# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003867
Guido van Rossum5abbf752007-08-27 17:39:33 +00003868class MiscIOTest(unittest.TestCase):
3869
Barry Warsaw40e82462008-11-20 20:14:50 +00003870 def tearDown(self):
3871 support.unlink(support.TESTFN)
3872
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003873 def test___all__(self):
3874 for name in self.io.__all__:
3875 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003876 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003877 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003878 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003879 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003880 self.assertTrue(issubclass(obj, Exception), name)
3881 elif not name.startswith("SEEK_"):
3882 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003883
Barry Warsaw40e82462008-11-20 20:14:50 +00003884 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003885 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003886 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003887 f.close()
3888
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003889 with support.check_warnings(('', DeprecationWarning)):
3890 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003891 self.assertEqual(f.name, support.TESTFN)
3892 self.assertEqual(f.buffer.name, support.TESTFN)
3893 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3894 self.assertEqual(f.mode, "U")
3895 self.assertEqual(f.buffer.mode, "rb")
3896 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003897 f.close()
3898
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003899 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003900 self.assertEqual(f.mode, "w+")
3901 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3902 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003903
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003904 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003905 self.assertEqual(g.mode, "wb")
3906 self.assertEqual(g.raw.mode, "wb")
3907 self.assertEqual(g.name, f.fileno())
3908 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003909 f.close()
3910 g.close()
3911
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003912 def test_io_after_close(self):
3913 for kwargs in [
3914 {"mode": "w"},
3915 {"mode": "wb"},
3916 {"mode": "w", "buffering": 1},
3917 {"mode": "w", "buffering": 2},
3918 {"mode": "wb", "buffering": 0},
3919 {"mode": "r"},
3920 {"mode": "rb"},
3921 {"mode": "r", "buffering": 1},
3922 {"mode": "r", "buffering": 2},
3923 {"mode": "rb", "buffering": 0},
3924 {"mode": "w+"},
3925 {"mode": "w+b"},
3926 {"mode": "w+", "buffering": 1},
3927 {"mode": "w+", "buffering": 2},
3928 {"mode": "w+b", "buffering": 0},
3929 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003930 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003931 f.close()
3932 self.assertRaises(ValueError, f.flush)
3933 self.assertRaises(ValueError, f.fileno)
3934 self.assertRaises(ValueError, f.isatty)
3935 self.assertRaises(ValueError, f.__iter__)
3936 if hasattr(f, "peek"):
3937 self.assertRaises(ValueError, f.peek, 1)
3938 self.assertRaises(ValueError, f.read)
3939 if hasattr(f, "read1"):
3940 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003941 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003942 if hasattr(f, "readall"):
3943 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003944 if hasattr(f, "readinto"):
3945 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003946 if hasattr(f, "readinto1"):
3947 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003948 self.assertRaises(ValueError, f.readline)
3949 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003950 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003951 self.assertRaises(ValueError, f.seek, 0)
3952 self.assertRaises(ValueError, f.tell)
3953 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003954 self.assertRaises(ValueError, f.write,
3955 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003956 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003957 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003959 def test_blockingioerror(self):
3960 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003961 class C(str):
3962 pass
3963 c = C("")
3964 b = self.BlockingIOError(1, c)
3965 c.b = b
3966 b.c = c
3967 wr = weakref.ref(c)
3968 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003969 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003970 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003971
3972 def test_abcs(self):
3973 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003974 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3975 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3976 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3977 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003978
3979 def _check_abc_inheritance(self, abcmodule):
3980 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003981 self.assertIsInstance(f, abcmodule.IOBase)
3982 self.assertIsInstance(f, abcmodule.RawIOBase)
3983 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3984 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003985 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003986 self.assertIsInstance(f, abcmodule.IOBase)
3987 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3988 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3989 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003990 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003991 self.assertIsInstance(f, abcmodule.IOBase)
3992 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3993 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3994 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003995
3996 def test_abc_inheritance(self):
3997 # Test implementations inherit from their respective ABCs
3998 self._check_abc_inheritance(self)
3999
4000 def test_abc_inheritance_official(self):
4001 # Test implementations inherit from the official ABCs of the
4002 # baseline "io" module.
4003 self._check_abc_inheritance(io)
4004
Antoine Pitroue033e062010-10-29 10:38:18 +00004005 def _check_warn_on_dealloc(self, *args, **kwargs):
4006 f = open(*args, **kwargs)
4007 r = repr(f)
4008 with self.assertWarns(ResourceWarning) as cm:
4009 f = None
4010 support.gc_collect()
4011 self.assertIn(r, str(cm.warning.args[0]))
4012
4013 def test_warn_on_dealloc(self):
4014 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4015 self._check_warn_on_dealloc(support.TESTFN, "wb")
4016 self._check_warn_on_dealloc(support.TESTFN, "w")
4017
4018 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4019 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004020 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004021 for fd in fds:
4022 try:
4023 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004024 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004025 if e.errno != errno.EBADF:
4026 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004027 self.addCleanup(cleanup_fds)
4028 r, w = os.pipe()
4029 fds += r, w
4030 self._check_warn_on_dealloc(r, *args, **kwargs)
4031 # When using closefd=False, there's no warning
4032 r, w = os.pipe()
4033 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004034 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004035 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004036
4037 def test_warn_on_dealloc_fd(self):
4038 self._check_warn_on_dealloc_fd("rb", buffering=0)
4039 self._check_warn_on_dealloc_fd("rb")
4040 self._check_warn_on_dealloc_fd("r")
4041
4042
Antoine Pitrou243757e2010-11-05 21:15:39 +00004043 def test_pickling(self):
4044 # Pickling file objects is forbidden
4045 for kwargs in [
4046 {"mode": "w"},
4047 {"mode": "wb"},
4048 {"mode": "wb", "buffering": 0},
4049 {"mode": "r"},
4050 {"mode": "rb"},
4051 {"mode": "rb", "buffering": 0},
4052 {"mode": "w+"},
4053 {"mode": "w+b"},
4054 {"mode": "w+b", "buffering": 0},
4055 ]:
4056 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4057 with self.open(support.TESTFN, **kwargs) as f:
4058 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4059
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004060 def test_nonblock_pipe_write_bigbuf(self):
4061 self._test_nonblock_pipe_write(16*1024)
4062
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004063 def test_nonblock_pipe_write_smallbuf(self):
4064 self._test_nonblock_pipe_write(1024)
4065
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004066 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4067 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004068 def _test_nonblock_pipe_write(self, bufsize):
4069 sent = []
4070 received = []
4071 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004072 os.set_blocking(r, False)
4073 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004074
4075 # To exercise all code paths in the C implementation we need
4076 # to play with buffer sizes. For instance, if we choose a
4077 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4078 # then we will never get a partial write of the buffer.
4079 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4080 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4081
4082 with rf, wf:
4083 for N in 9999, 73, 7574:
4084 try:
4085 i = 0
4086 while True:
4087 msg = bytes([i % 26 + 97]) * N
4088 sent.append(msg)
4089 wf.write(msg)
4090 i += 1
4091
4092 except self.BlockingIOError as e:
4093 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004094 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004095 sent[-1] = sent[-1][:e.characters_written]
4096 received.append(rf.read())
4097 msg = b'BLOCKED'
4098 wf.write(msg)
4099 sent.append(msg)
4100
4101 while True:
4102 try:
4103 wf.flush()
4104 break
4105 except self.BlockingIOError as e:
4106 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004107 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004108 self.assertEqual(e.characters_written, 0)
4109 received.append(rf.read())
4110
4111 received += iter(rf.read, None)
4112
4113 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004114 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004115 self.assertTrue(wf.closed)
4116 self.assertTrue(rf.closed)
4117
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004118 def test_create_fail(self):
4119 # 'x' mode fails if file is existing
4120 with self.open(support.TESTFN, 'w'):
4121 pass
4122 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4123
4124 def test_create_writes(self):
4125 # 'x' mode opens for writing
4126 with self.open(support.TESTFN, 'xb') as f:
4127 f.write(b"spam")
4128 with self.open(support.TESTFN, 'rb') as f:
4129 self.assertEqual(b"spam", f.read())
4130
Christian Heimes7b648752012-09-10 14:48:43 +02004131 def test_open_allargs(self):
4132 # there used to be a buffer overflow in the parser for rawmode
4133 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4134
Victor Stinner22eb6892019-06-26 00:51:05 +02004135 def test_check_encoding_errors(self):
4136 # bpo-37388: open() and TextIOWrapper must check encoding and errors
4137 # arguments in dev mode
4138 mod = self.io.__name__
4139 filename = __file__
4140 invalid = 'Boom, Shaka Laka, Boom!'
4141 code = textwrap.dedent(f'''
4142 import sys
4143 from {mod} import open, TextIOWrapper
4144
4145 try:
4146 open({filename!r}, encoding={invalid!r})
4147 except LookupError:
4148 pass
4149 else:
4150 sys.exit(21)
4151
4152 try:
4153 open({filename!r}, errors={invalid!r})
4154 except LookupError:
4155 pass
4156 else:
4157 sys.exit(22)
4158
4159 fp = open({filename!r}, "rb")
4160 with fp:
4161 try:
4162 TextIOWrapper(fp, encoding={invalid!r})
4163 except LookupError:
4164 pass
4165 else:
4166 sys.exit(23)
4167
4168 try:
4169 TextIOWrapper(fp, errors={invalid!r})
4170 except LookupError:
4171 pass
4172 else:
4173 sys.exit(24)
4174
4175 sys.exit(10)
4176 ''')
4177 proc = assert_python_failure('-X', 'dev', '-c', code)
4178 self.assertEqual(proc.rc, 10, proc)
4179
Christian Heimes7b648752012-09-10 14:48:43 +02004180
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004181class CMiscIOTest(MiscIOTest):
4182 io = io
4183
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004184 def test_readinto_buffer_overflow(self):
4185 # Issue #18025
4186 class BadReader(self.io.BufferedIOBase):
4187 def read(self, n=-1):
4188 return b'x' * 10**6
4189 bufio = BadReader()
4190 b = bytearray(2)
4191 self.assertRaises(ValueError, bufio.readinto, b)
4192
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004193 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4194 # Issue #23309: deadlocks at shutdown should be avoided when a
4195 # daemon thread and the main thread both write to a file.
4196 code = """if 1:
4197 import sys
4198 import time
4199 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004200 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004201
4202 file = sys.{stream_name}
4203
4204 def run():
4205 while True:
4206 file.write('.')
4207 file.flush()
4208
Victor Stinner2a1aed02017-04-21 17:59:23 +02004209 crash = SuppressCrashReport()
4210 crash.__enter__()
4211 # don't call __exit__(): the crash occurs at Python shutdown
4212
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004213 thread = threading.Thread(target=run)
4214 thread.daemon = True
4215 thread.start()
4216
4217 time.sleep(0.5)
4218 file.write('!')
4219 file.flush()
4220 """.format_map(locals())
4221 res, _ = run_python_until_end("-c", code)
4222 err = res.err.decode()
4223 if res.rc != 0:
4224 # Failure: should be a fatal error
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004225 pattern = (r"Fatal Python error: could not acquire lock "
4226 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4227 r"at interpreter shutdown, possibly due to "
4228 r"daemon threads".format_map(locals()))
4229 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004230 else:
4231 self.assertFalse(err.strip('.!'))
4232
4233 def test_daemon_threads_shutdown_stdout_deadlock(self):
4234 self.check_daemon_threads_shutdown_deadlock('stdout')
4235
4236 def test_daemon_threads_shutdown_stderr_deadlock(self):
4237 self.check_daemon_threads_shutdown_deadlock('stderr')
4238
4239
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004240class PyMiscIOTest(MiscIOTest):
4241 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004242
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004243
4244@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4245class SignalsTest(unittest.TestCase):
4246
4247 def setUp(self):
4248 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4249
4250 def tearDown(self):
4251 signal.signal(signal.SIGALRM, self.oldalrm)
4252
4253 def alarm_interrupt(self, sig, frame):
4254 1/0
4255
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004256 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4257 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004258 invokes the signal handler, and bubbles up the exception raised
4259 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004260 read_results = []
4261 def _read():
4262 s = os.read(r, 1)
4263 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004264
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004265 t = threading.Thread(target=_read)
4266 t.daemon = True
4267 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004268 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004269 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004270 try:
4271 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004272 if hasattr(signal, 'pthread_sigmask'):
4273 # create the thread with SIGALRM signal blocked
4274 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4275 t.start()
4276 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4277 else:
4278 t.start()
4279
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004280 # Fill the pipe enough that the write will be blocking.
4281 # It will be interrupted by the timer armed above. Since the
4282 # other thread has read one byte, the low-level write will
4283 # return with a successful (partial) result rather than an EINTR.
4284 # The buffered IO layer must check for pending signal
4285 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004286 signal.alarm(1)
4287 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004288 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004289 finally:
4290 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004291 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004292 # We got one byte, get another one and check that it isn't a
4293 # repeat of the first one.
4294 read_results.append(os.read(r, 1))
4295 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4296 finally:
4297 os.close(w)
4298 os.close(r)
4299 # This is deliberate. If we didn't close the file descriptor
4300 # before closing wio, wio would try to flush its internal
4301 # buffer, and block again.
4302 try:
4303 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004304 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004305 if e.errno != errno.EBADF:
4306 raise
4307
4308 def test_interrupted_write_unbuffered(self):
4309 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4310
4311 def test_interrupted_write_buffered(self):
4312 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4313
4314 def test_interrupted_write_text(self):
4315 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4316
Brett Cannon31f59292011-02-21 19:29:56 +00004317 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004318 def check_reentrant_write(self, data, **fdopen_kwargs):
4319 def on_alarm(*args):
4320 # Will be called reentrantly from the same thread
4321 wio.write(data)
4322 1/0
4323 signal.signal(signal.SIGALRM, on_alarm)
4324 r, w = os.pipe()
4325 wio = self.io.open(w, **fdopen_kwargs)
4326 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004327 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004328 # Either the reentrant call to wio.write() fails with RuntimeError,
4329 # or the signal handler raises ZeroDivisionError.
4330 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4331 while 1:
4332 for i in range(100):
4333 wio.write(data)
4334 wio.flush()
4335 # Make sure the buffer doesn't fill up and block further writes
4336 os.read(r, len(data) * 100)
4337 exc = cm.exception
4338 if isinstance(exc, RuntimeError):
4339 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4340 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004341 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004342 wio.close()
4343 os.close(r)
4344
4345 def test_reentrant_write_buffered(self):
4346 self.check_reentrant_write(b"xy", mode="wb")
4347
4348 def test_reentrant_write_text(self):
4349 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4350
Antoine Pitrou707ce822011-02-25 21:24:11 +00004351 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4352 """Check that a buffered read, when it gets interrupted (either
4353 returning a partial result or EINTR), properly invokes the signal
4354 handler and retries if the latter returned successfully."""
4355 r, w = os.pipe()
4356 fdopen_kwargs["closefd"] = False
4357 def alarm_handler(sig, frame):
4358 os.write(w, b"bar")
4359 signal.signal(signal.SIGALRM, alarm_handler)
4360 try:
4361 rio = self.io.open(r, **fdopen_kwargs)
4362 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004363 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004364 # Expected behaviour:
4365 # - first raw read() returns partial b"foo"
4366 # - second raw read() returns EINTR
4367 # - third raw read() returns b"bar"
4368 self.assertEqual(decode(rio.read(6)), "foobar")
4369 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004370 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004371 rio.close()
4372 os.close(w)
4373 os.close(r)
4374
Antoine Pitrou20db5112011-08-19 20:32:34 +02004375 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004376 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4377 mode="rb")
4378
Antoine Pitrou20db5112011-08-19 20:32:34 +02004379 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004380 self.check_interrupted_read_retry(lambda x: x,
4381 mode="r")
4382
Antoine Pitrou707ce822011-02-25 21:24:11 +00004383 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4384 """Check that a buffered write, when it gets interrupted (either
4385 returning a partial result or EINTR), properly invokes the signal
4386 handler and retries if the latter returned successfully."""
4387 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004388
Antoine Pitrou707ce822011-02-25 21:24:11 +00004389 # A quantity that exceeds the buffer size of an anonymous pipe's
4390 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004391 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004392 r, w = os.pipe()
4393 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004394
Antoine Pitrou707ce822011-02-25 21:24:11 +00004395 # We need a separate thread to read from the pipe and allow the
4396 # write() to finish. This thread is started after the SIGALRM is
4397 # received (forcing a first EINTR in write()).
4398 read_results = []
4399 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004400 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004401 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004402 try:
4403 while not write_finished:
4404 while r in select.select([r], [], [], 1.0)[0]:
4405 s = os.read(r, 1024)
4406 read_results.append(s)
4407 except BaseException as exc:
4408 nonlocal error
4409 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004410 t = threading.Thread(target=_read)
4411 t.daemon = True
4412 def alarm1(sig, frame):
4413 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004414 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004415 def alarm2(sig, frame):
4416 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004417
4418 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004419 signal.signal(signal.SIGALRM, alarm1)
4420 try:
4421 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004422 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004423 # Expected behaviour:
4424 # - first raw write() is partial (because of the limited pipe buffer
4425 # and the first alarm)
4426 # - second raw write() returns EINTR (because of the second alarm)
4427 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004428 written = wio.write(large_data)
4429 self.assertEqual(N, written)
4430
Antoine Pitrou707ce822011-02-25 21:24:11 +00004431 wio.flush()
4432 write_finished = True
4433 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004434
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004435 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004436 self.assertEqual(N, sum(len(x) for x in read_results))
4437 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004438 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004439 write_finished = True
4440 os.close(w)
4441 os.close(r)
4442 # This is deliberate. If we didn't close the file descriptor
4443 # before closing wio, wio would try to flush its internal
4444 # buffer, and could block (in case of failure).
4445 try:
4446 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004447 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004448 if e.errno != errno.EBADF:
4449 raise
4450
Antoine Pitrou20db5112011-08-19 20:32:34 +02004451 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004452 self.check_interrupted_write_retry(b"x", mode="wb")
4453
Antoine Pitrou20db5112011-08-19 20:32:34 +02004454 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004455 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4456
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004457
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004458class CSignalsTest(SignalsTest):
4459 io = io
4460
4461class PySignalsTest(SignalsTest):
4462 io = pyio
4463
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004464 # Handling reentrancy issues would slow down _pyio even more, so the
4465 # tests are disabled.
4466 test_reentrant_write_buffered = None
4467 test_reentrant_write_text = None
4468
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004469
Ezio Melottidaa42c72013-03-23 16:30:16 +02004470def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004471 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004472 CBufferedReaderTest, PyBufferedReaderTest,
4473 CBufferedWriterTest, PyBufferedWriterTest,
4474 CBufferedRWPairTest, PyBufferedRWPairTest,
4475 CBufferedRandomTest, PyBufferedRandomTest,
4476 StatefulIncrementalDecoderTest,
4477 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4478 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004479 CMiscIOTest, PyMiscIOTest,
4480 CSignalsTest, PySignalsTest,
4481 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004482
4483 # Put the namespaces of the IO module we are testing and some useful mock
4484 # classes in the __dict__ of each test.
4485 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004486 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4487 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004488 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4489 c_io_ns = {name : getattr(io, name) for name in all_members}
4490 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4491 globs = globals()
4492 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4493 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4494 # Avoid turning open into a bound method.
4495 py_io_ns["open"] = pyio.OpenWrapper
4496 for test in tests:
4497 if test.__name__.startswith("C"):
4498 for name, obj in c_io_ns.items():
4499 setattr(test, name, obj)
4500 elif test.__name__.startswith("Py"):
4501 for name, obj in py_io_ns.items():
4502 setattr(test, name, obj)
4503
Ezio Melottidaa42c72013-03-23 16:30:16 +02004504 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4505 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004506
4507if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004508 unittest.main()