blob: fc474c99053df2356cd273a3b28e8016663b21c0 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000033import time
Guido van Rossum28524c72007-02-27 05:47:44 +000034import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000035import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020037from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020038from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000039from test import support
Berker Peksagce643912015-05-06 06:33:17 +030040from test.support.script_helper import assert_python_ok, run_python_until_end
Serhiy Storchakab21d1552018-03-02 11:53:51 +020041from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000042
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000043import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044import io # C implementation of io
45import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Martin Panter6bb91f32016-05-28 00:41:57 +000047try:
48 import ctypes
49except ImportError:
50 def byteslike(*pos, **kw):
51 return array.array("b", bytes(*pos, **kw))
52else:
53 def byteslike(*pos, **kw):
54 """Create a bytes-like object having no string or sequence methods"""
55 data = bytes(*pos, **kw)
56 obj = EmptyStruct()
57 ctypes.resize(obj, len(data))
58 memoryview(obj).cast("B")[:] = data
59 return obj
60 class EmptyStruct(ctypes.Structure):
61 pass
62
Gregory P. Smithe5796c42018-12-30 20:17:57 -080063_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66 '-fsanitize=memory' in _cflags or
67 '--with-memory-sanitizer' in _config_args
68)
69
Victor Stinnerbc2aa812019-05-23 03:45:09 +020070# Does io.IOBase finalizer log the exception if the close() method fails?
71# The exception is ignored silently by default in release build.
72IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020073
74
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000075def _default_chunk_size():
76 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000077 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return f._CHUNK_SIZE
79
80
Antoine Pitrou328ec742010-09-14 18:37:24 +000081class MockRawIOWithoutRead:
82 """A RawIO implementation without read(), so as to exercise the default
83 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000084
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000085 def __init__(self, read_stack=()):
86 self._read_stack = list(read_stack)
87 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000089 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000090
Guido van Rossum01a27522007-03-07 01:00:12 +000091 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000093 return len(b)
94
95 def writable(self):
96 return True
97
Guido van Rossum68bbcd22007-02-27 17:19:33 +000098 def fileno(self):
99 return 42
100
101 def readable(self):
102 return True
103
Guido van Rossum01a27522007-03-07 01:00:12 +0000104 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000105 return True
106
Guido van Rossum01a27522007-03-07 01:00:12 +0000107 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000108 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000109
110 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000111 return 0 # same comment as above
112
113 def readinto(self, buf):
114 self._reads += 1
115 max_len = len(buf)
116 try:
117 data = self._read_stack[0]
118 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000119 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000120 return 0
121 if data is None:
122 del self._read_stack[0]
123 return None
124 n = len(data)
125 if len(data) <= max_len:
126 del self._read_stack[0]
127 buf[:n] = data
128 return n
129 else:
130 buf[:] = data[:max_len]
131 self._read_stack[0] = data[max_len:]
132 return max_len
133
134 def truncate(self, pos=None):
135 return pos
136
Antoine Pitrou328ec742010-09-14 18:37:24 +0000137class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
138 pass
139
140class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
141 pass
142
143
144class MockRawIO(MockRawIOWithoutRead):
145
146 def read(self, n=None):
147 self._reads += 1
148 try:
149 return self._read_stack.pop(0)
150 except:
151 self._extraneous_reads += 1
152 return b""
153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000154class CMockRawIO(MockRawIO, io.RawIOBase):
155 pass
156
157class PyMockRawIO(MockRawIO, pyio.RawIOBase):
158 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000159
Guido van Rossuma9e20242007-03-08 00:43:48 +0000160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000161class MisbehavedRawIO(MockRawIO):
162 def write(self, b):
163 return super().write(b) * 2
164
165 def read(self, n=None):
166 return super().read(n) * 2
167
168 def seek(self, pos, whence):
169 return -123
170
171 def tell(self):
172 return -456
173
174 def readinto(self, buf):
175 super().readinto(buf)
176 return len(buf) * 5
177
178class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
179 pass
180
181class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
182 pass
183
184
benfogle9703f092017-11-10 16:03:40 -0500185class SlowFlushRawIO(MockRawIO):
186 def __init__(self):
187 super().__init__()
188 self.in_flush = threading.Event()
189
190 def flush(self):
191 self.in_flush.set()
192 time.sleep(0.25)
193
194class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
195 pass
196
197class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
198 pass
199
200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000201class CloseFailureIO(MockRawIO):
202 closed = 0
203
204 def close(self):
205 if not self.closed:
206 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200207 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208
209class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
210 pass
211
212class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
213 pass
214
215
216class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000217
218 def __init__(self, data):
219 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000221
222 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000224 self.read_history.append(None if res is None else len(res))
225 return res
226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000227 def readinto(self, b):
228 res = super().readinto(b)
229 self.read_history.append(res)
230 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class CMockFileIO(MockFileIO, io.BytesIO):
233 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235class PyMockFileIO(MockFileIO, pyio.BytesIO):
236 pass
237
238
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000239class MockUnseekableIO:
240 def seekable(self):
241 return False
242
243 def seek(self, *args):
244 raise self.UnsupportedOperation("not seekable")
245
246 def tell(self, *args):
247 raise self.UnsupportedOperation("not seekable")
248
Martin Panter754aab22016-03-31 07:21:56 +0000249 def truncate(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000252class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
253 UnsupportedOperation = io.UnsupportedOperation
254
255class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
256 UnsupportedOperation = pyio.UnsupportedOperation
257
258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000259class MockNonBlockWriterIO:
260
261 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000262 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000263 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265 def pop_written(self):
266 s = b"".join(self._write_stack)
267 self._write_stack[:] = []
268 return s
269
270 def block_on(self, char):
271 """Block when a given char is encountered."""
272 self._blocker_char = char
273
274 def readable(self):
275 return True
276
277 def seekable(self):
278 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000279
Victor Stinnerc15a6822019-06-13 00:23:49 +0200280 def seek(self, pos, whence=0):
281 # naive implementation, enough for tests
282 return 0
283
Guido van Rossum01a27522007-03-07 01:00:12 +0000284 def writable(self):
285 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000287 def write(self, b):
288 b = bytes(b)
289 n = -1
290 if self._blocker_char:
291 try:
292 n = b.index(self._blocker_char)
293 except ValueError:
294 pass
295 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100296 if n > 0:
297 # write data up to the first blocker
298 self._write_stack.append(b[:n])
299 return n
300 else:
301 # cancel blocker and indicate would block
302 self._blocker_char = None
303 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000304 self._write_stack.append(b)
305 return len(b)
306
307class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
308 BlockingIOError = io.BlockingIOError
309
310class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
311 BlockingIOError = pyio.BlockingIOError
312
Guido van Rossuma9e20242007-03-08 00:43:48 +0000313
Guido van Rossum28524c72007-02-27 05:47:44 +0000314class IOTest(unittest.TestCase):
315
Neal Norwitze7789b12008-03-24 06:18:09 +0000316 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000317 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000318
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000319 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000320 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000321
Guido van Rossum28524c72007-02-27 05:47:44 +0000322 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000323 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000324 f.truncate(0)
325 self.assertEqual(f.tell(), 5)
326 f.seek(0)
327
328 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.seek(0), 0)
330 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000331 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000334 buffer = bytearray(b" world\n\n\n")
335 self.assertEqual(f.write(buffer), 9)
336 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000338 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.seek(-1, 2), 13)
340 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000341
Guido van Rossum87429772007-04-10 21:06:59 +0000342 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000343 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000344 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000345
Guido van Rossum9b76da62007-04-11 01:09:03 +0000346 def read_ops(self, f, buffered=False):
347 data = f.read(5)
348 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000349 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000351 self.assertEqual(bytes(data), b" worl")
352 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000353 self.assertEqual(f.readinto(data), 2)
354 self.assertEqual(len(data), 5)
355 self.assertEqual(data[:2], b"d\n")
356 self.assertEqual(f.seek(0), 0)
357 self.assertEqual(f.read(20), b"hello world\n")
358 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000359 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360 self.assertEqual(f.seek(-6, 2), 6)
361 self.assertEqual(f.read(5), b"world")
362 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000363 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000364 self.assertEqual(f.seek(-6, 1), 5)
365 self.assertEqual(f.read(5), b" worl")
366 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000367 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000368 if buffered:
369 f.seek(0)
370 self.assertEqual(f.read(), b"hello world\n")
371 f.seek(6)
372 self.assertEqual(f.read(), b"world\n")
373 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000374 f.seek(0)
375 data = byteslike(5)
376 self.assertEqual(f.readinto1(data), 5)
377 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000378
Guido van Rossum34d69e52007-04-10 20:08:41 +0000379 LARGE = 2**31
380
Guido van Rossum53807da2007-04-10 19:01:47 +0000381 def large_file_ops(self, f):
382 assert f.readable()
383 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100384 try:
385 self.assertEqual(f.seek(self.LARGE), self.LARGE)
386 except (OverflowError, ValueError):
387 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000388 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000389 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000390 self.assertEqual(f.tell(), self.LARGE + 3)
391 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000392 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000393 self.assertEqual(f.tell(), self.LARGE + 2)
394 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000395 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000396 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000397 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
398 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000399 self.assertEqual(f.read(2), b"x")
400
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000401 def test_invalid_operations(self):
402 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000403 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000404 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000405 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000406 self.assertRaises(exc, fp.read)
407 self.assertRaises(exc, fp.readline)
408 with self.open(support.TESTFN, "wb", buffering=0) as fp:
409 self.assertRaises(exc, fp.read)
410 self.assertRaises(exc, fp.readline)
411 with self.open(support.TESTFN, "rb", buffering=0) as fp:
412 self.assertRaises(exc, fp.write, b"blah")
413 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000414 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000415 self.assertRaises(exc, fp.write, b"blah")
416 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000417 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, "blah")
419 self.assertRaises(exc, fp.writelines, ["blah\n"])
420 # Non-zero seeking from current or end pos
421 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
422 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000423
Martin Panter754aab22016-03-31 07:21:56 +0000424 def test_optional_abilities(self):
425 # Test for OSError when optional APIs are not supported
426 # The purpose of this test is to try fileno(), reading, writing and
427 # seeking operations with various objects that indicate they do not
428 # support these operations.
429
430 def pipe_reader():
431 [r, w] = os.pipe()
432 os.close(w) # So that read() is harmless
433 return self.FileIO(r, "r")
434
435 def pipe_writer():
436 [r, w] = os.pipe()
437 self.addCleanup(os.close, r)
438 # Guarantee that we can write into the pipe without blocking
439 thread = threading.Thread(target=os.read, args=(r, 100))
440 thread.start()
441 self.addCleanup(thread.join)
442 return self.FileIO(w, "w")
443
444 def buffered_reader():
445 return self.BufferedReader(self.MockUnseekableIO())
446
447 def buffered_writer():
448 return self.BufferedWriter(self.MockUnseekableIO())
449
450 def buffered_random():
451 return self.BufferedRandom(self.BytesIO())
452
453 def buffered_rw_pair():
454 return self.BufferedRWPair(self.MockUnseekableIO(),
455 self.MockUnseekableIO())
456
457 def text_reader():
458 class UnseekableReader(self.MockUnseekableIO):
459 writable = self.BufferedIOBase.writable
460 write = self.BufferedIOBase.write
461 return self.TextIOWrapper(UnseekableReader(), "ascii")
462
463 def text_writer():
464 class UnseekableWriter(self.MockUnseekableIO):
465 readable = self.BufferedIOBase.readable
466 read = self.BufferedIOBase.read
467 return self.TextIOWrapper(UnseekableWriter(), "ascii")
468
469 tests = (
470 (pipe_reader, "fr"), (pipe_writer, "fw"),
471 (buffered_reader, "r"), (buffered_writer, "w"),
472 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
473 (text_reader, "r"), (text_writer, "w"),
474 (self.BytesIO, "rws"), (self.StringIO, "rws"),
475 )
476 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000477 with self.subTest(test), test() as obj:
478 readable = "r" in abilities
479 self.assertEqual(obj.readable(), readable)
480 writable = "w" in abilities
481 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000482
483 if isinstance(obj, self.TextIOBase):
484 data = "3"
485 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
486 data = b"3"
487 else:
488 self.fail("Unknown base class")
489
490 if "f" in abilities:
491 obj.fileno()
492 else:
493 self.assertRaises(OSError, obj.fileno)
494
495 if readable:
496 obj.read(1)
497 obj.read()
498 else:
499 self.assertRaises(OSError, obj.read, 1)
500 self.assertRaises(OSError, obj.read)
501
502 if writable:
503 obj.write(data)
504 else:
505 self.assertRaises(OSError, obj.write, data)
506
Martin Panter3ee147f2016-03-31 21:05:31 +0000507 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000508 pipe_reader, pipe_writer):
509 # Pipes seem to appear as seekable on Windows
510 continue
511 seekable = "s" in abilities
512 self.assertEqual(obj.seekable(), seekable)
513
Martin Panter754aab22016-03-31 07:21:56 +0000514 if seekable:
515 obj.tell()
516 obj.seek(0)
517 else:
518 self.assertRaises(OSError, obj.tell)
519 self.assertRaises(OSError, obj.seek, 0)
520
521 if writable and seekable:
522 obj.truncate()
523 obj.truncate(0)
524 else:
525 self.assertRaises(OSError, obj.truncate)
526 self.assertRaises(OSError, obj.truncate, 0)
527
Antoine Pitrou13348842012-01-29 18:36:34 +0100528 def test_open_handles_NUL_chars(self):
529 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300530 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100531
532 bytes_fn = bytes(fn_with_NUL, 'ascii')
533 with warnings.catch_warnings():
534 warnings.simplefilter("ignore", DeprecationWarning)
535 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100536
Guido van Rossum28524c72007-02-27 05:47:44 +0000537 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", buffering=0) as f:
539 self.assertEqual(f.readable(), False)
540 self.assertEqual(f.writable(), True)
541 self.assertEqual(f.seekable(), True)
542 self.write_ops(f)
543 with self.open(support.TESTFN, "rb", buffering=0) as f:
544 self.assertEqual(f.readable(), True)
545 self.assertEqual(f.writable(), False)
546 self.assertEqual(f.seekable(), True)
547 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000548
Guido van Rossum87429772007-04-10 21:06:59 +0000549 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000550 with self.open(support.TESTFN, "wb") as f:
551 self.assertEqual(f.readable(), False)
552 self.assertEqual(f.writable(), True)
553 self.assertEqual(f.seekable(), True)
554 self.write_ops(f)
555 with self.open(support.TESTFN, "rb") as f:
556 self.assertEqual(f.readable(), True)
557 self.assertEqual(f.writable(), False)
558 self.assertEqual(f.seekable(), True)
559 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000560
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000561 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000562 with self.open(support.TESTFN, "wb") as f:
563 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
564 with self.open(support.TESTFN, "rb") as f:
565 self.assertEqual(f.readline(), b"abc\n")
566 self.assertEqual(f.readline(10), b"def\n")
567 self.assertEqual(f.readline(2), b"xy")
568 self.assertEqual(f.readline(4), b"zzy\n")
569 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000570 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000571 self.assertRaises(TypeError, f.readline, 5.3)
572 with self.open(support.TESTFN, "r") as f:
573 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000574
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300575 def test_readline_nonsizeable(self):
576 # Issue #30061
577 # Crash when readline() returns an object without __len__
578 class R(self.IOBase):
579 def readline(self):
580 return None
581 self.assertRaises((TypeError, StopIteration), next, R())
582
583 def test_next_nonsizeable(self):
584 # Issue #30061
585 # Crash when __next__() returns an object without __len__
586 class R(self.IOBase):
587 def __next__(self):
588 return None
589 self.assertRaises(TypeError, R().readlines, 1)
590
Guido van Rossum28524c72007-02-27 05:47:44 +0000591 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000592 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000593 self.write_ops(f)
594 data = f.getvalue()
595 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000596 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000597 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000598
Guido van Rossum53807da2007-04-10 19:01:47 +0000599 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300600 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800601 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000602 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200603 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600604 support.requires(
605 'largefile',
606 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000607 with self.open(support.TESTFN, "w+b", 0) as f:
608 self.large_file_ops(f)
609 with self.open(support.TESTFN, "w+b") as f:
610 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000611
612 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300613 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000614 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000615 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000616 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000617 self.assertEqual(f.closed, True)
618 f = None
619 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000620 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000621 1/0
622 except ZeroDivisionError:
623 self.assertEqual(f.closed, True)
624 else:
625 self.fail("1/0 didn't raise an exception")
626
Antoine Pitrou08838b62009-01-21 00:55:13 +0000627 # issue 5008
628 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000630 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000632 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000634 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300636 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000637
Guido van Rossum87429772007-04-10 21:06:59 +0000638 def test_destructor(self):
639 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000640 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000641 def __del__(self):
642 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000643 try:
644 f = super().__del__
645 except AttributeError:
646 pass
647 else:
648 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000649 def close(self):
650 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000651 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000652 def flush(self):
653 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000655 with support.check_warnings(('', ResourceWarning)):
656 f = MyFileIO(support.TESTFN, "wb")
657 f.write(b"xxx")
658 del f
659 support.gc_collect()
660 self.assertEqual(record, [1, 2, 3])
661 with self.open(support.TESTFN, "rb") as f:
662 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000663
664 def _check_base_destructor(self, base):
665 record = []
666 class MyIO(base):
667 def __init__(self):
668 # This exercises the availability of attributes on object
669 # destruction.
670 # (in the C version, close() is called by the tp_dealloc
671 # function, not by __del__)
672 self.on_del = 1
673 self.on_close = 2
674 self.on_flush = 3
675 def __del__(self):
676 record.append(self.on_del)
677 try:
678 f = super().__del__
679 except AttributeError:
680 pass
681 else:
682 f()
683 def close(self):
684 record.append(self.on_close)
685 super().close()
686 def flush(self):
687 record.append(self.on_flush)
688 super().flush()
689 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000690 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000691 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000692 self.assertEqual(record, [1, 2, 3])
693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 def test_IOBase_destructor(self):
695 self._check_base_destructor(self.IOBase)
696
697 def test_RawIOBase_destructor(self):
698 self._check_base_destructor(self.RawIOBase)
699
700 def test_BufferedIOBase_destructor(self):
701 self._check_base_destructor(self.BufferedIOBase)
702
703 def test_TextIOBase_destructor(self):
704 self._check_base_destructor(self.TextIOBase)
705
Guido van Rossum87429772007-04-10 21:06:59 +0000706 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000707 with self.open(support.TESTFN, "wb") as f:
708 f.write(b"xxx")
709 with self.open(support.TESTFN, "rb") as f:
710 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000711
Guido van Rossumd4103952007-04-12 05:44:49 +0000712 def test_array_writes(self):
713 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000714 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000715 def check(f):
716 with f:
717 self.assertEqual(f.write(a), n)
718 f.writelines((a,))
719 check(self.BytesIO())
720 check(self.FileIO(support.TESTFN, "w"))
721 check(self.BufferedWriter(self.MockRawIO()))
722 check(self.BufferedRandom(self.MockRawIO()))
723 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000724
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000725 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000727 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 def test_read_closed(self):
730 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000731 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 with self.open(support.TESTFN, "r") as f:
733 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000734 self.assertEqual(file.read(), "egg\n")
735 file.seek(0)
736 file.close()
737 self.assertRaises(ValueError, file.read)
738
739 def test_no_closefd_with_filename(self):
740 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000742
743 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000745 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000747 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000749 self.assertEqual(file.buffer.raw.closefd, False)
750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 def test_garbage_collection(self):
752 # FileIO objects are collected, and collecting them flushes
753 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000754 with support.check_warnings(('', ResourceWarning)):
755 f = self.FileIO(support.TESTFN, "wb")
756 f.write(b"abcxxx")
757 f.f = f
758 wr = weakref.ref(f)
759 del f
760 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300761 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000762 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000763 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000764
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000765 def test_unbounded_file(self):
766 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
767 zero = "/dev/zero"
768 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000769 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000770 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000771 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000772 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800773 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000774 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000775 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000776 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000777 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000778 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000779 self.assertRaises(OverflowError, f.read)
780
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200781 def check_flush_error_on_close(self, *args, **kwargs):
782 # Test that the file is closed despite failed flush
783 # and that flush() is called before file closed.
784 f = self.open(*args, **kwargs)
785 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000786 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200787 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200788 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000789 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200790 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600791 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200792 self.assertTrue(closed) # flush() called
793 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200794 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200795
796 def test_flush_error_on_close(self):
797 # raw file
798 # Issue #5700: io.FileIO calls flush() after file closed
799 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
800 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
801 self.check_flush_error_on_close(fd, 'wb', buffering=0)
802 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
803 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
804 os.close(fd)
805 # buffered io
806 self.check_flush_error_on_close(support.TESTFN, 'wb')
807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808 self.check_flush_error_on_close(fd, 'wb')
809 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
810 self.check_flush_error_on_close(fd, 'wb', closefd=False)
811 os.close(fd)
812 # text io
813 self.check_flush_error_on_close(support.TESTFN, 'w')
814 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
815 self.check_flush_error_on_close(fd, 'w')
816 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
817 self.check_flush_error_on_close(fd, 'w', closefd=False)
818 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000819
820 def test_multi_close(self):
821 f = self.open(support.TESTFN, "wb", buffering=0)
822 f.close()
823 f.close()
824 f.close()
825 self.assertRaises(ValueError, f.flush)
826
Antoine Pitrou328ec742010-09-14 18:37:24 +0000827 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530828 # Exercise the default limited RawIOBase.read(n) implementation (which
829 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000830 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
831 self.assertEqual(rawio.read(2), b"ab")
832 self.assertEqual(rawio.read(2), b"c")
833 self.assertEqual(rawio.read(2), b"d")
834 self.assertEqual(rawio.read(2), None)
835 self.assertEqual(rawio.read(2), b"ef")
836 self.assertEqual(rawio.read(2), b"g")
837 self.assertEqual(rawio.read(2), None)
838 self.assertEqual(rawio.read(2), b"")
839
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400840 def test_types_have_dict(self):
841 test = (
842 self.IOBase(),
843 self.RawIOBase(),
844 self.TextIOBase(),
845 self.StringIO(),
846 self.BytesIO()
847 )
848 for obj in test:
849 self.assertTrue(hasattr(obj, "__dict__"))
850
Ross Lagerwall59142db2011-10-31 20:34:46 +0200851 def test_opener(self):
852 with self.open(support.TESTFN, "w") as f:
853 f.write("egg\n")
854 fd = os.open(support.TESTFN, os.O_RDONLY)
855 def opener(path, flags):
856 return fd
857 with self.open("non-existent", "r", opener=opener) as f:
858 self.assertEqual(f.read(), "egg\n")
859
Barry Warsaw480e2852016-06-08 17:47:26 -0400860 def test_bad_opener_negative_1(self):
861 # Issue #27066.
862 def badopener(fname, flags):
863 return -1
864 with self.assertRaises(ValueError) as cm:
865 open('non-existent', 'r', opener=badopener)
866 self.assertEqual(str(cm.exception), 'opener returned -1')
867
868 def test_bad_opener_other_negative(self):
869 # Issue #27066.
870 def badopener(fname, flags):
871 return -2
872 with self.assertRaises(ValueError) as cm:
873 open('non-existent', 'r', opener=badopener)
874 self.assertEqual(str(cm.exception), 'opener returned -2')
875
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200876 def test_fileio_closefd(self):
877 # Issue #4841
878 with self.open(__file__, 'rb') as f1, \
879 self.open(__file__, 'rb') as f2:
880 fileio = self.FileIO(f1.fileno(), closefd=False)
881 # .__init__() must not close f1
882 fileio.__init__(f2.fileno(), closefd=False)
883 f1.readline()
884 # .close() must not close f2
885 fileio.close()
886 f2.readline()
887
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300888 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200889 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300890 with self.assertRaises(ValueError):
891 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300892
893 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200894 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300895 with self.assertRaises(ValueError):
896 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300897
Martin Panter6bb91f32016-05-28 00:41:57 +0000898 def test_buffered_readinto_mixin(self):
899 # Test the implementation provided by BufferedIOBase
900 class Stream(self.BufferedIOBase):
901 def read(self, size):
902 return b"12345"
903 read1 = read
904 stream = Stream()
905 for method in ("readinto", "readinto1"):
906 with self.subTest(method):
907 buffer = byteslike(5)
908 self.assertEqual(getattr(stream, method)(buffer), 5)
909 self.assertEqual(bytes(buffer), b"12345")
910
Ethan Furmand62548a2016-06-04 14:38:43 -0700911 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700912 def check_path_succeeds(path):
913 with self.open(path, "w") as f:
914 f.write("egg\n")
915
916 with self.open(path, "r") as f:
917 self.assertEqual(f.read(), "egg\n")
918
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200919 check_path_succeeds(FakePath(support.TESTFN))
920 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700921
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200922 with self.open(support.TESTFN, "w") as f:
923 bad_path = FakePath(f.fileno())
924 with self.assertRaises(TypeError):
925 self.open(bad_path, 'w')
926
927 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700928 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700929 self.open(bad_path, 'w')
930
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200931 bad_path = FakePath(FloatingPointError)
932 with self.assertRaises(FloatingPointError):
933 self.open(bad_path, 'w')
934
Ethan Furmand62548a2016-06-04 14:38:43 -0700935 # ensure that refcounting is correct with some error conditions
936 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200937 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700938
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530939 def test_RawIOBase_readall(self):
940 # Exercise the default unlimited RawIOBase.read() and readall()
941 # implementations.
942 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
943 self.assertEqual(rawio.read(), b"abcdefg")
944 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
945 self.assertEqual(rawio.readall(), b"abcdefg")
946
947 def test_BufferedIOBase_readinto(self):
948 # Exercise the default BufferedIOBase.readinto() and readinto1()
949 # implementations (which call read() or read1() internally).
950 class Reader(self.BufferedIOBase):
951 def __init__(self, avail):
952 self.avail = avail
953 def read(self, size):
954 result = self.avail[:size]
955 self.avail = self.avail[size:]
956 return result
957 def read1(self, size):
958 """Returns no more than 5 bytes at once"""
959 return self.read(min(size, 5))
960 tests = (
961 # (test method, total data available, read buffer size, expected
962 # read size)
963 ("readinto", 10, 5, 5),
964 ("readinto", 10, 6, 6), # More than read1() can return
965 ("readinto", 5, 6, 5), # Buffer larger than total available
966 ("readinto", 6, 7, 6),
967 ("readinto", 10, 0, 0), # Empty buffer
968 ("readinto1", 10, 5, 5), # Result limited to single read1() call
969 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
970 ("readinto1", 5, 6, 5), # Buffer larger than total available
971 ("readinto1", 6, 7, 5),
972 ("readinto1", 10, 0, 0), # Empty buffer
973 )
974 UNUSED_BYTE = 0x81
975 for test in tests:
976 with self.subTest(test):
977 method, avail, request, result = test
978 reader = Reader(bytes(range(avail)))
979 buffer = bytearray((UNUSED_BYTE,) * request)
980 method = getattr(reader, method)
981 self.assertEqual(method(buffer), result)
982 self.assertEqual(len(buffer), request)
983 self.assertSequenceEqual(buffer[:result], range(result))
984 unused = (UNUSED_BYTE,) * (request - result)
985 self.assertSequenceEqual(buffer[result:], unused)
986 self.assertEqual(len(reader.avail), avail - result)
987
Zackery Spytz28f07362018-07-17 00:31:44 -0600988 def test_close_assert(self):
989 class R(self.IOBase):
990 def __setattr__(self, name, value):
991 pass
992 def flush(self):
993 raise OSError()
994 f = R()
995 # This would cause an assertion failure.
996 self.assertRaises(OSError, f.close)
997
Victor Stinner472f7942019-04-12 21:58:24 +0200998 # Silence destructor error
999 R.flush = lambda self: None
1000
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001003
1004 def test_IOBase_finalize(self):
1005 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1006 # class which inherits IOBase and an object of this class are caught
1007 # in a reference cycle and close() is already in the method cache.
1008 class MyIO(self.IOBase):
1009 def close(self):
1010 pass
1011
1012 # create an instance to populate the method cache
1013 MyIO()
1014 obj = MyIO()
1015 obj.obj = obj
1016 wr = weakref.ref(obj)
1017 del MyIO
1018 del obj
1019 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001020 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022class PyIOTest(IOTest):
1023 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001024
Guido van Rossuma9e20242007-03-08 00:43:48 +00001025
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001026@support.cpython_only
1027class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001028
Gregory P. Smith054b0652015-04-14 12:58:05 -07001029 def test_RawIOBase_io_in_pyio_match(self):
1030 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001031 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1032 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001033 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1034
1035 def test_RawIOBase_pyio_in_io_match(self):
1036 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1037 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1038 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1039
1040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041class CommonBufferedTests:
1042 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1043
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001044 def test_detach(self):
1045 raw = self.MockRawIO()
1046 buf = self.tp(raw)
1047 self.assertIs(buf.detach(), raw)
1048 self.assertRaises(ValueError, buf.detach)
1049
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001050 repr(buf) # Should still work
1051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 def test_fileno(self):
1053 rawio = self.MockRawIO()
1054 bufio = self.tp(rawio)
1055
Ezio Melottib3aedd42010-11-20 19:04:17 +00001056 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058 def test_invalid_args(self):
1059 rawio = self.MockRawIO()
1060 bufio = self.tp(rawio)
1061 # Invalid whence
1062 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001063 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064
1065 def test_override_destructor(self):
1066 tp = self.tp
1067 record = []
1068 class MyBufferedIO(tp):
1069 def __del__(self):
1070 record.append(1)
1071 try:
1072 f = super().__del__
1073 except AttributeError:
1074 pass
1075 else:
1076 f()
1077 def close(self):
1078 record.append(2)
1079 super().close()
1080 def flush(self):
1081 record.append(3)
1082 super().flush()
1083 rawio = self.MockRawIO()
1084 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001086 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001087 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088
1089 def test_context_manager(self):
1090 # Test usability as a context manager
1091 rawio = self.MockRawIO()
1092 bufio = self.tp(rawio)
1093 def _with():
1094 with bufio:
1095 pass
1096 _with()
1097 # bufio should now be closed, and using it a second time should raise
1098 # a ValueError.
1099 self.assertRaises(ValueError, _with)
1100
1101 def test_error_through_destructor(self):
1102 # Test that the exception state is not modified by a destructor,
1103 # even if close() fails.
1104 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001105 with support.catch_unraisable_exception() as cm:
1106 with self.assertRaises(AttributeError):
1107 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001108
1109 if not IOBASE_EMITS_UNRAISABLE:
1110 self.assertIsNone(cm.unraisable)
1111 elif cm.unraisable is not None:
1112 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001113
Antoine Pitrou716c4442009-05-23 19:04:03 +00001114 def test_repr(self):
1115 raw = self.MockRawIO()
1116 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001117 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1118 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001119 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001120 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001121 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001122 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001123
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001124 def test_recursive_repr(self):
1125 # Issue #25455
1126 raw = self.MockRawIO()
1127 b = self.tp(raw)
1128 with support.swap_attr(raw, 'name', b):
1129 try:
1130 repr(b) # Should not crash
1131 except RuntimeError:
1132 pass
1133
Antoine Pitrou6be88762010-05-03 16:48:20 +00001134 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001135 # Test that buffered file is closed despite failed flush
1136 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001137 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001138 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001139 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001140 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001141 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001142 raw.flush = bad_flush
1143 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001144 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001145 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001146 self.assertTrue(raw.closed)
1147 self.assertTrue(closed) # flush() called
1148 self.assertFalse(closed[0]) # flush() called before file closed
1149 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001150 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001151
1152 def test_close_error_on_close(self):
1153 raw = self.MockRawIO()
1154 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001155 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001156 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001157 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001158 raw.close = bad_close
1159 b = self.tp(raw)
1160 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001161 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001162 b.close()
1163 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001164 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001165 self.assertEqual(err.exception.__context__.args, ('flush',))
1166 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001167
Victor Stinner472f7942019-04-12 21:58:24 +02001168 # Silence destructor error
1169 raw.close = lambda: None
1170 b.flush = lambda: None
1171
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001172 def test_nonnormalized_close_error_on_close(self):
1173 # Issue #21677
1174 raw = self.MockRawIO()
1175 def bad_flush():
1176 raise non_existing_flush
1177 def bad_close():
1178 raise non_existing_close
1179 raw.close = bad_close
1180 b = self.tp(raw)
1181 b.flush = bad_flush
1182 with self.assertRaises(NameError) as err: # exception not swallowed
1183 b.close()
1184 self.assertIn('non_existing_close', str(err.exception))
1185 self.assertIsInstance(err.exception.__context__, NameError)
1186 self.assertIn('non_existing_flush', str(err.exception.__context__))
1187 self.assertFalse(b.closed)
1188
Victor Stinner472f7942019-04-12 21:58:24 +02001189 # Silence destructor error
1190 b.flush = lambda: None
1191 raw.close = lambda: None
1192
Antoine Pitrou6be88762010-05-03 16:48:20 +00001193 def test_multi_close(self):
1194 raw = self.MockRawIO()
1195 b = self.tp(raw)
1196 b.close()
1197 b.close()
1198 b.close()
1199 self.assertRaises(ValueError, b.flush)
1200
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001201 def test_unseekable(self):
1202 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1203 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1204 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1205
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001206 def test_readonly_attributes(self):
1207 raw = self.MockRawIO()
1208 buf = self.tp(raw)
1209 x = self.MockRawIO()
1210 with self.assertRaises(AttributeError):
1211 buf.raw = x
1212
Guido van Rossum78892e42007-04-06 17:31:18 +00001213
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001214class SizeofTest:
1215
1216 @support.cpython_only
1217 def test_sizeof(self):
1218 bufsize1 = 4096
1219 bufsize2 = 8192
1220 rawio = self.MockRawIO()
1221 bufio = self.tp(rawio, buffer_size=bufsize1)
1222 size = sys.getsizeof(bufio) - bufsize1
1223 rawio = self.MockRawIO()
1224 bufio = self.tp(rawio, buffer_size=bufsize2)
1225 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1226
Jesus Ceadc469452012-10-04 12:37:56 +02001227 @support.cpython_only
1228 def test_buffer_freeing(self) :
1229 bufsize = 4096
1230 rawio = self.MockRawIO()
1231 bufio = self.tp(rawio, buffer_size=bufsize)
1232 size = sys.getsizeof(bufio) - bufsize
1233 bufio.close()
1234 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001236class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1237 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001239 def test_constructor(self):
1240 rawio = self.MockRawIO([b"abc"])
1241 bufio = self.tp(rawio)
1242 bufio.__init__(rawio)
1243 bufio.__init__(rawio, buffer_size=1024)
1244 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001245 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001246 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1247 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1248 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1249 rawio = self.MockRawIO([b"abc"])
1250 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001251 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001252
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001253 def test_uninitialized(self):
1254 bufio = self.tp.__new__(self.tp)
1255 del bufio
1256 bufio = self.tp.__new__(self.tp)
1257 self.assertRaisesRegex((ValueError, AttributeError),
1258 'uninitialized|has no attribute',
1259 bufio.read, 0)
1260 bufio.__init__(self.MockRawIO())
1261 self.assertEqual(bufio.read(0), b'')
1262
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001264 for arg in (None, 7):
1265 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1266 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001267 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 # Invalid args
1269 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 def test_read1(self):
1272 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1273 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001274 self.assertEqual(b"a", bufio.read(1))
1275 self.assertEqual(b"b", bufio.read1(1))
1276 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001277 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001278 self.assertEqual(b"c", bufio.read1(100))
1279 self.assertEqual(rawio._reads, 1)
1280 self.assertEqual(b"d", bufio.read1(100))
1281 self.assertEqual(rawio._reads, 2)
1282 self.assertEqual(b"efg", bufio.read1(100))
1283 self.assertEqual(rawio._reads, 3)
1284 self.assertEqual(b"", bufio.read1(100))
1285 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001286
1287 def test_read1_arbitrary(self):
1288 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1289 bufio = self.tp(rawio)
1290 self.assertEqual(b"a", bufio.read(1))
1291 self.assertEqual(b"bc", bufio.read1())
1292 self.assertEqual(b"d", bufio.read1())
1293 self.assertEqual(b"efg", bufio.read1(-1))
1294 self.assertEqual(rawio._reads, 3)
1295 self.assertEqual(b"", bufio.read1())
1296 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001297
1298 def test_readinto(self):
1299 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1300 bufio = self.tp(rawio)
1301 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001302 self.assertEqual(bufio.readinto(b), 2)
1303 self.assertEqual(b, b"ab")
1304 self.assertEqual(bufio.readinto(b), 2)
1305 self.assertEqual(b, b"cd")
1306 self.assertEqual(bufio.readinto(b), 2)
1307 self.assertEqual(b, b"ef")
1308 self.assertEqual(bufio.readinto(b), 1)
1309 self.assertEqual(b, b"gf")
1310 self.assertEqual(bufio.readinto(b), 0)
1311 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001312 rawio = self.MockRawIO((b"abc", None))
1313 bufio = self.tp(rawio)
1314 self.assertEqual(bufio.readinto(b), 2)
1315 self.assertEqual(b, b"ab")
1316 self.assertEqual(bufio.readinto(b), 1)
1317 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318
Benjamin Petersona96fea02014-06-22 14:17:44 -07001319 def test_readinto1(self):
1320 buffer_size = 10
1321 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1322 bufio = self.tp(rawio, buffer_size=buffer_size)
1323 b = bytearray(2)
1324 self.assertEqual(bufio.peek(3), b'abc')
1325 self.assertEqual(rawio._reads, 1)
1326 self.assertEqual(bufio.readinto1(b), 2)
1327 self.assertEqual(b, b"ab")
1328 self.assertEqual(rawio._reads, 1)
1329 self.assertEqual(bufio.readinto1(b), 1)
1330 self.assertEqual(b[:1], b"c")
1331 self.assertEqual(rawio._reads, 1)
1332 self.assertEqual(bufio.readinto1(b), 2)
1333 self.assertEqual(b, b"de")
1334 self.assertEqual(rawio._reads, 2)
1335 b = bytearray(2*buffer_size)
1336 self.assertEqual(bufio.peek(3), b'fgh')
1337 self.assertEqual(rawio._reads, 3)
1338 self.assertEqual(bufio.readinto1(b), 6)
1339 self.assertEqual(b[:6], b"fghjkl")
1340 self.assertEqual(rawio._reads, 4)
1341
1342 def test_readinto_array(self):
1343 buffer_size = 60
1344 data = b"a" * 26
1345 rawio = self.MockRawIO((data,))
1346 bufio = self.tp(rawio, buffer_size=buffer_size)
1347
1348 # Create an array with element size > 1 byte
1349 b = array.array('i', b'x' * 32)
1350 assert len(b) != 16
1351
1352 # Read into it. We should get as many *bytes* as we can fit into b
1353 # (which is more than the number of elements)
1354 n = bufio.readinto(b)
1355 self.assertGreater(n, len(b))
1356
1357 # Check that old contents of b are preserved
1358 bm = memoryview(b).cast('B')
1359 self.assertLess(n, len(bm))
1360 self.assertEqual(bm[:n], data[:n])
1361 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1362
1363 def test_readinto1_array(self):
1364 buffer_size = 60
1365 data = b"a" * 26
1366 rawio = self.MockRawIO((data,))
1367 bufio = self.tp(rawio, buffer_size=buffer_size)
1368
1369 # Create an array with element size > 1 byte
1370 b = array.array('i', b'x' * 32)
1371 assert len(b) != 16
1372
1373 # Read into it. We should get as many *bytes* as we can fit into b
1374 # (which is more than the number of elements)
1375 n = bufio.readinto1(b)
1376 self.assertGreater(n, len(b))
1377
1378 # Check that old contents of b are preserved
1379 bm = memoryview(b).cast('B')
1380 self.assertLess(n, len(bm))
1381 self.assertEqual(bm[:n], data[:n])
1382 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1383
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001384 def test_readlines(self):
1385 def bufio():
1386 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1387 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001388 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1389 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1390 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001391
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001392 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001393 data = b"abcdefghi"
1394 dlen = len(data)
1395
1396 tests = [
1397 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1398 [ 100, [ 3, 3, 3], [ dlen ] ],
1399 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1400 ]
1401
1402 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 rawio = self.MockFileIO(data)
1404 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001405 pos = 0
1406 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001407 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001408 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001410 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001411
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001412 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001413 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1415 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001416 self.assertEqual(b"abcd", bufio.read(6))
1417 self.assertEqual(b"e", bufio.read(1))
1418 self.assertEqual(b"fg", bufio.read())
1419 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001420 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001422
Victor Stinnera80987f2011-05-25 22:47:16 +02001423 rawio = self.MockRawIO((b"a", None, None))
1424 self.assertEqual(b"a", rawio.readall())
1425 self.assertIsNone(rawio.readall())
1426
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001427 def test_read_past_eof(self):
1428 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1429 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001430
Ezio Melottib3aedd42010-11-20 19:04:17 +00001431 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001432
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001433 def test_read_all(self):
1434 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1435 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001436
Ezio Melottib3aedd42010-11-20 19:04:17 +00001437 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001438
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001439 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001441 try:
1442 # Write out many bytes with exactly the same number of 0's,
1443 # 1's... 255's. This will help us check that concurrent reading
1444 # doesn't duplicate or forget contents.
1445 N = 1000
1446 l = list(range(256)) * N
1447 random.shuffle(l)
1448 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001450 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001451 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001452 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001453 errors = []
1454 results = []
1455 def f():
1456 try:
1457 # Intra-buffer read then buffer-flushing read
1458 for n in cycle([1, 19]):
1459 s = bufio.read(n)
1460 if not s:
1461 break
1462 # list.append() is atomic
1463 results.append(s)
1464 except Exception as e:
1465 errors.append(e)
1466 raise
1467 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001468 with support.start_threads(threads):
1469 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001470 self.assertFalse(errors,
1471 "the following exceptions were caught: %r" % errors)
1472 s = b''.join(results)
1473 for i in range(256):
1474 c = bytes(bytearray([i]))
1475 self.assertEqual(s.count(c), N)
1476 finally:
1477 support.unlink(support.TESTFN)
1478
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001479 def test_unseekable(self):
1480 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1481 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1482 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1483 bufio.read(1)
1484 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1485 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 def test_misbehaved_io(self):
1488 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1489 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001490 self.assertRaises(OSError, bufio.seek, 0)
1491 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492
Victor Stinnerc15a6822019-06-13 00:23:49 +02001493 # Silence destructor error
1494 bufio.close = lambda: None
1495
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001496 def test_no_extraneous_read(self):
1497 # Issue #9550; when the raw IO object has satisfied the read request,
1498 # we should not issue any additional reads, otherwise it may block
1499 # (e.g. socket).
1500 bufsize = 16
1501 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1502 rawio = self.MockRawIO([b"x" * n])
1503 bufio = self.tp(rawio, bufsize)
1504 self.assertEqual(bufio.read(n), b"x" * n)
1505 # Simple case: one raw read is enough to satisfy the request.
1506 self.assertEqual(rawio._extraneous_reads, 0,
1507 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1508 # A more complex case where two raw reads are needed to satisfy
1509 # the request.
1510 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1511 bufio = self.tp(rawio, bufsize)
1512 self.assertEqual(bufio.read(n), b"x" * n)
1513 self.assertEqual(rawio._extraneous_reads, 0,
1514 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1515
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001516 def test_read_on_closed(self):
1517 # Issue #23796
1518 b = io.BufferedReader(io.BytesIO(b"12"))
1519 b.read(1)
1520 b.close()
1521 self.assertRaises(ValueError, b.peek)
1522 self.assertRaises(ValueError, b.read1, 1)
1523
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001524
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001525class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001526 tp = io.BufferedReader
1527
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001528 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1529 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_constructor(self):
1531 BufferedReaderTest.test_constructor(self)
1532 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001533 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001534 if sys.maxsize > 0x7FFFFFFF:
1535 rawio = self.MockRawIO()
1536 bufio = self.tp(rawio)
1537 self.assertRaises((OverflowError, MemoryError, ValueError),
1538 bufio.__init__, rawio, sys.maxsize)
1539
1540 def test_initialization(self):
1541 rawio = self.MockRawIO([b"abc"])
1542 bufio = self.tp(rawio)
1543 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1544 self.assertRaises(ValueError, bufio.read)
1545 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1546 self.assertRaises(ValueError, bufio.read)
1547 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1548 self.assertRaises(ValueError, bufio.read)
1549
1550 def test_misbehaved_io_read(self):
1551 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1552 bufio = self.tp(rawio)
1553 # _pyio.BufferedReader seems to implement reading different, so that
1554 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001555 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556
1557 def test_garbage_collection(self):
1558 # C BufferedReader objects are collected.
1559 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001560 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001561 with support.check_warnings(('', ResourceWarning)):
1562 rawio = self.FileIO(support.TESTFN, "w+b")
1563 f = self.tp(rawio)
1564 f.f = f
1565 wr = weakref.ref(f)
1566 del f
1567 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001568 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001569
R David Murray67bfe802013-02-23 21:51:05 -05001570 def test_args_error(self):
1571 # Issue #17275
1572 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1573 self.tp(io.BytesIO(), 1024, 1024, 1024)
1574
1575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576class PyBufferedReaderTest(BufferedReaderTest):
1577 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001578
Guido van Rossuma9e20242007-03-08 00:43:48 +00001579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1581 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001582
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001583 def test_constructor(self):
1584 rawio = self.MockRawIO()
1585 bufio = self.tp(rawio)
1586 bufio.__init__(rawio)
1587 bufio.__init__(rawio, buffer_size=1024)
1588 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001589 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001590 bufio.flush()
1591 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1592 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1593 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1594 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001595 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001599 def test_uninitialized(self):
1600 bufio = self.tp.__new__(self.tp)
1601 del bufio
1602 bufio = self.tp.__new__(self.tp)
1603 self.assertRaisesRegex((ValueError, AttributeError),
1604 'uninitialized|has no attribute',
1605 bufio.write, b'')
1606 bufio.__init__(self.MockRawIO())
1607 self.assertEqual(bufio.write(b''), 0)
1608
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001609 def test_detach_flush(self):
1610 raw = self.MockRawIO()
1611 buf = self.tp(raw)
1612 buf.write(b"howdy!")
1613 self.assertFalse(raw._write_stack)
1614 buf.detach()
1615 self.assertEqual(raw._write_stack, [b"howdy!"])
1616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001618 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 writer = self.MockRawIO()
1620 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001621 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001622 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001623 buffer = bytearray(b"def")
1624 bufio.write(buffer)
1625 buffer[:] = b"***" # Overwrite our copy of the data
1626 bufio.flush()
1627 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629 def test_write_overflow(self):
1630 writer = self.MockRawIO()
1631 bufio = self.tp(writer, 8)
1632 contents = b"abcdefghijklmnop"
1633 for n in range(0, len(contents), 3):
1634 bufio.write(contents[n:n+3])
1635 flushed = b"".join(writer._write_stack)
1636 # At least (total - 8) bytes were implicitly flushed, perhaps more
1637 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001638 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 def check_writes(self, intermediate_func):
1641 # Lots of writes, test the flushed output is as expected.
1642 contents = bytes(range(256)) * 1000
1643 n = 0
1644 writer = self.MockRawIO()
1645 bufio = self.tp(writer, 13)
1646 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1647 def gen_sizes():
1648 for size in count(1):
1649 for i in range(15):
1650 yield size
1651 sizes = gen_sizes()
1652 while n < len(contents):
1653 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001654 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 intermediate_func(bufio)
1656 n += size
1657 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001658 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 def test_writes(self):
1661 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 def test_writes_and_flushes(self):
1664 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001666 def test_writes_and_seeks(self):
1667 def _seekabs(bufio):
1668 pos = bufio.tell()
1669 bufio.seek(pos + 1, 0)
1670 bufio.seek(pos - 1, 0)
1671 bufio.seek(pos, 0)
1672 self.check_writes(_seekabs)
1673 def _seekrel(bufio):
1674 pos = bufio.seek(0, 1)
1675 bufio.seek(+1, 1)
1676 bufio.seek(-1, 1)
1677 bufio.seek(pos, 0)
1678 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001680 def test_writes_and_truncates(self):
1681 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 def test_write_non_blocking(self):
1684 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001685 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001686
Ezio Melottib3aedd42010-11-20 19:04:17 +00001687 self.assertEqual(bufio.write(b"abcd"), 4)
1688 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 # 1 byte will be written, the rest will be buffered
1690 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001691 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1694 raw.block_on(b"0")
1695 try:
1696 bufio.write(b"opqrwxyz0123456789")
1697 except self.BlockingIOError as e:
1698 written = e.characters_written
1699 else:
1700 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(written, 16)
1702 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001704
Ezio Melottib3aedd42010-11-20 19:04:17 +00001705 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001706 s = raw.pop_written()
1707 # Previously buffered bytes were flushed
1708 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001709
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001710 def test_write_and_rewind(self):
1711 raw = io.BytesIO()
1712 bufio = self.tp(raw, 4)
1713 self.assertEqual(bufio.write(b"abcdef"), 6)
1714 self.assertEqual(bufio.tell(), 6)
1715 bufio.seek(0, 0)
1716 self.assertEqual(bufio.write(b"XY"), 2)
1717 bufio.seek(6, 0)
1718 self.assertEqual(raw.getvalue(), b"XYcdef")
1719 self.assertEqual(bufio.write(b"123456"), 6)
1720 bufio.flush()
1721 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 def test_flush(self):
1724 writer = self.MockRawIO()
1725 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001726 bufio.write(b"abc")
1727 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001728 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001729
Antoine Pitrou131a4892012-10-16 22:57:11 +02001730 def test_writelines(self):
1731 l = [b'ab', b'cd', b'ef']
1732 writer = self.MockRawIO()
1733 bufio = self.tp(writer, 8)
1734 bufio.writelines(l)
1735 bufio.flush()
1736 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1737
1738 def test_writelines_userlist(self):
1739 l = UserList([b'ab', b'cd', b'ef'])
1740 writer = self.MockRawIO()
1741 bufio = self.tp(writer, 8)
1742 bufio.writelines(l)
1743 bufio.flush()
1744 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1745
1746 def test_writelines_error(self):
1747 writer = self.MockRawIO()
1748 bufio = self.tp(writer, 8)
1749 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1750 self.assertRaises(TypeError, bufio.writelines, None)
1751 self.assertRaises(TypeError, bufio.writelines, 'abc')
1752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 def test_destructor(self):
1754 writer = self.MockRawIO()
1755 bufio = self.tp(writer, 8)
1756 bufio.write(b"abc")
1757 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001758 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001759 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760
1761 def test_truncate(self):
1762 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001763 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001764 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 bufio = self.tp(raw, 8)
1766 bufio.write(b"abcdef")
1767 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001768 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001769 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 self.assertEqual(f.read(), b"abc")
1771
Nitish Chandra059f58c2018-01-28 21:30:09 +05301772 def test_truncate_after_write(self):
1773 # Ensure that truncate preserves the file position after
1774 # writes longer than the buffer size.
1775 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001776 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301777 with self.open(support.TESTFN, "wb") as f:
1778 # Fill with some buffer
1779 f.write(b'\x00' * 10000)
1780 buffer_sizes = [8192, 4096, 200]
1781 for buffer_size in buffer_sizes:
1782 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1783 f.write(b'\x00' * (buffer_size + 1))
1784 # After write write_pos and write_end are set to 0
1785 f.read(1)
1786 # read operation makes sure that pos != raw_pos
1787 f.truncate()
1788 self.assertEqual(f.tell(), buffer_size + 2)
1789
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001790 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001792 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 # Write out many bytes from many threads and test they were
1794 # all flushed.
1795 N = 1000
1796 contents = bytes(range(256)) * N
1797 sizes = cycle([1, 19])
1798 n = 0
1799 queue = deque()
1800 while n < len(contents):
1801 size = next(sizes)
1802 queue.append(contents[n:n+size])
1803 n += size
1804 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001805 # We use a real file object because it allows us to
1806 # exercise situations where the GIL is released before
1807 # writing the buffer to the raw streams. This is in addition
1808 # to concurrency issues due to switching threads in the middle
1809 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001810 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001811 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001812 errors = []
1813 def f():
1814 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001815 while True:
1816 try:
1817 s = queue.popleft()
1818 except IndexError:
1819 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001820 bufio.write(s)
1821 except Exception as e:
1822 errors.append(e)
1823 raise
1824 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001825 with support.start_threads(threads):
1826 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001827 self.assertFalse(errors,
1828 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001830 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 s = f.read()
1832 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001833 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001834 finally:
1835 support.unlink(support.TESTFN)
1836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001837 def test_misbehaved_io(self):
1838 rawio = self.MisbehavedRawIO()
1839 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001840 self.assertRaises(OSError, bufio.seek, 0)
1841 self.assertRaises(OSError, bufio.tell)
1842 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001843
Victor Stinnerc15a6822019-06-13 00:23:49 +02001844 # Silence destructor error
1845 bufio.close = lambda: None
1846
Florent Xicluna109d5732012-07-07 17:03:22 +02001847 def test_max_buffer_size_removal(self):
1848 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001849 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001850
Benjamin Peterson68623612012-12-20 11:53:11 -06001851 def test_write_error_on_close(self):
1852 raw = self.MockRawIO()
1853 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001854 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001855 raw.write = bad_write
1856 b = self.tp(raw)
1857 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001858 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001859 self.assertTrue(b.closed)
1860
benfogle9703f092017-11-10 16:03:40 -05001861 def test_slow_close_from_thread(self):
1862 # Issue #31976
1863 rawio = self.SlowFlushRawIO()
1864 bufio = self.tp(rawio, 8)
1865 t = threading.Thread(target=bufio.close)
1866 t.start()
1867 rawio.in_flush.wait()
1868 self.assertRaises(ValueError, bufio.write, b'spam')
1869 self.assertTrue(bufio.closed)
1870 t.join()
1871
1872
Benjamin Peterson59406a92009-03-26 17:10:29 +00001873
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001874class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875 tp = io.BufferedWriter
1876
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001877 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1878 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001879 def test_constructor(self):
1880 BufferedWriterTest.test_constructor(self)
1881 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001882 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001883 if sys.maxsize > 0x7FFFFFFF:
1884 rawio = self.MockRawIO()
1885 bufio = self.tp(rawio)
1886 self.assertRaises((OverflowError, MemoryError, ValueError),
1887 bufio.__init__, rawio, sys.maxsize)
1888
1889 def test_initialization(self):
1890 rawio = self.MockRawIO()
1891 bufio = self.tp(rawio)
1892 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1893 self.assertRaises(ValueError, bufio.write, b"def")
1894 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1895 self.assertRaises(ValueError, bufio.write, b"def")
1896 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1897 self.assertRaises(ValueError, bufio.write, b"def")
1898
1899 def test_garbage_collection(self):
1900 # C BufferedWriter objects are collected, and collecting them flushes
1901 # all data to disk.
1902 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001903 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001904 with support.check_warnings(('', ResourceWarning)):
1905 rawio = self.FileIO(support.TESTFN, "w+b")
1906 f = self.tp(rawio)
1907 f.write(b"123xxx")
1908 f.x = f
1909 wr = weakref.ref(f)
1910 del f
1911 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001912 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001913 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001914 self.assertEqual(f.read(), b"123xxx")
1915
R David Murray67bfe802013-02-23 21:51:05 -05001916 def test_args_error(self):
1917 # Issue #17275
1918 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1919 self.tp(io.BytesIO(), 1024, 1024, 1024)
1920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921
1922class PyBufferedWriterTest(BufferedWriterTest):
1923 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001924
Guido van Rossum01a27522007-03-07 01:00:12 +00001925class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001926
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001927 def test_constructor(self):
1928 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001929 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001930
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001931 def test_uninitialized(self):
1932 pair = self.tp.__new__(self.tp)
1933 del pair
1934 pair = self.tp.__new__(self.tp)
1935 self.assertRaisesRegex((ValueError, AttributeError),
1936 'uninitialized|has no attribute',
1937 pair.read, 0)
1938 self.assertRaisesRegex((ValueError, AttributeError),
1939 'uninitialized|has no attribute',
1940 pair.write, b'')
1941 pair.__init__(self.MockRawIO(), self.MockRawIO())
1942 self.assertEqual(pair.read(0), b'')
1943 self.assertEqual(pair.write(b''), 0)
1944
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001945 def test_detach(self):
1946 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1947 self.assertRaises(self.UnsupportedOperation, pair.detach)
1948
Florent Xicluna109d5732012-07-07 17:03:22 +02001949 def test_constructor_max_buffer_size_removal(self):
1950 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001951 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001952
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001953 def test_constructor_with_not_readable(self):
1954 class NotReadable(MockRawIO):
1955 def readable(self):
1956 return False
1957
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001958 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001959
1960 def test_constructor_with_not_writeable(self):
1961 class NotWriteable(MockRawIO):
1962 def writable(self):
1963 return False
1964
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001965 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001966
1967 def test_read(self):
1968 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1969
1970 self.assertEqual(pair.read(3), b"abc")
1971 self.assertEqual(pair.read(1), b"d")
1972 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001973 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1974 self.assertEqual(pair.read(None), b"abc")
1975
1976 def test_readlines(self):
1977 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1978 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1979 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1980 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001981
1982 def test_read1(self):
1983 # .read1() is delegated to the underlying reader object, so this test
1984 # can be shallow.
1985 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1986
1987 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001988 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001989
1990 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001991 for method in ("readinto", "readinto1"):
1992 with self.subTest(method):
1993 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001994
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001995 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001996 self.assertEqual(getattr(pair, method)(data), 5)
1997 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001998
1999 def test_write(self):
2000 w = self.MockRawIO()
2001 pair = self.tp(self.MockRawIO(), w)
2002
2003 pair.write(b"abc")
2004 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002005 buffer = bytearray(b"def")
2006 pair.write(buffer)
2007 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002008 pair.flush()
2009 self.assertEqual(w._write_stack, [b"abc", b"def"])
2010
2011 def test_peek(self):
2012 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2013
2014 self.assertTrue(pair.peek(3).startswith(b"abc"))
2015 self.assertEqual(pair.read(3), b"abc")
2016
2017 def test_readable(self):
2018 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2019 self.assertTrue(pair.readable())
2020
2021 def test_writeable(self):
2022 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2023 self.assertTrue(pair.writable())
2024
2025 def test_seekable(self):
2026 # BufferedRWPairs are never seekable, even if their readers and writers
2027 # are.
2028 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2029 self.assertFalse(pair.seekable())
2030
2031 # .flush() is delegated to the underlying writer object and has been
2032 # tested in the test_write method.
2033
2034 def test_close_and_closed(self):
2035 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2036 self.assertFalse(pair.closed)
2037 pair.close()
2038 self.assertTrue(pair.closed)
2039
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002040 def test_reader_close_error_on_close(self):
2041 def reader_close():
2042 reader_non_existing
2043 reader = self.MockRawIO()
2044 reader.close = reader_close
2045 writer = self.MockRawIO()
2046 pair = self.tp(reader, writer)
2047 with self.assertRaises(NameError) as err:
2048 pair.close()
2049 self.assertIn('reader_non_existing', str(err.exception))
2050 self.assertTrue(pair.closed)
2051 self.assertFalse(reader.closed)
2052 self.assertTrue(writer.closed)
2053
Victor Stinner472f7942019-04-12 21:58:24 +02002054 # Silence destructor error
2055 reader.close = lambda: None
2056
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002057 def test_writer_close_error_on_close(self):
2058 def writer_close():
2059 writer_non_existing
2060 reader = self.MockRawIO()
2061 writer = self.MockRawIO()
2062 writer.close = writer_close
2063 pair = self.tp(reader, writer)
2064 with self.assertRaises(NameError) as err:
2065 pair.close()
2066 self.assertIn('writer_non_existing', str(err.exception))
2067 self.assertFalse(pair.closed)
2068 self.assertTrue(reader.closed)
2069 self.assertFalse(writer.closed)
2070
Victor Stinner472f7942019-04-12 21:58:24 +02002071 # Silence destructor error
2072 writer.close = lambda: None
Victor Stinnerc15a6822019-06-13 00:23:49 +02002073 writer = None
2074
Miss Islington (bot)3b976d12019-06-14 09:59:54 -07002075 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinnerc15a6822019-06-13 00:23:49 +02002076 with support.catch_unraisable_exception():
Miss Islington (bot)3b976d12019-06-14 09:59:54 -07002077 # Ignore BufferedRWPair unraisable exception
2078 with support.catch_unraisable_exception():
2079 pair = None
2080 support.gc_collect()
Victor Stinnerc15a6822019-06-13 00:23:49 +02002081 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002082
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002083 def test_reader_writer_close_error_on_close(self):
2084 def reader_close():
2085 reader_non_existing
2086 def writer_close():
2087 writer_non_existing
2088 reader = self.MockRawIO()
2089 reader.close = reader_close
2090 writer = self.MockRawIO()
2091 writer.close = writer_close
2092 pair = self.tp(reader, writer)
2093 with self.assertRaises(NameError) as err:
2094 pair.close()
2095 self.assertIn('reader_non_existing', str(err.exception))
2096 self.assertIsInstance(err.exception.__context__, NameError)
2097 self.assertIn('writer_non_existing', str(err.exception.__context__))
2098 self.assertFalse(pair.closed)
2099 self.assertFalse(reader.closed)
2100 self.assertFalse(writer.closed)
2101
Victor Stinner472f7942019-04-12 21:58:24 +02002102 # Silence destructor error
2103 reader.close = lambda: None
2104 writer.close = lambda: None
2105
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002106 def test_isatty(self):
2107 class SelectableIsAtty(MockRawIO):
2108 def __init__(self, isatty):
2109 MockRawIO.__init__(self)
2110 self._isatty = isatty
2111
2112 def isatty(self):
2113 return self._isatty
2114
2115 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2116 self.assertFalse(pair.isatty())
2117
2118 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2119 self.assertTrue(pair.isatty())
2120
2121 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2122 self.assertTrue(pair.isatty())
2123
2124 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2125 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002126
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002127 def test_weakref_clearing(self):
2128 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2129 ref = weakref.ref(brw)
2130 brw = None
2131 ref = None # Shouldn't segfault.
2132
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002133class CBufferedRWPairTest(BufferedRWPairTest):
2134 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002135
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002136class PyBufferedRWPairTest(BufferedRWPairTest):
2137 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002138
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002139
2140class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2141 read_mode = "rb+"
2142 write_mode = "wb+"
2143
2144 def test_constructor(self):
2145 BufferedReaderTest.test_constructor(self)
2146 BufferedWriterTest.test_constructor(self)
2147
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002148 def test_uninitialized(self):
2149 BufferedReaderTest.test_uninitialized(self)
2150 BufferedWriterTest.test_uninitialized(self)
2151
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002152 def test_read_and_write(self):
2153 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002154 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002155
2156 self.assertEqual(b"as", rw.read(2))
2157 rw.write(b"ddd")
2158 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002159 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002160 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002161 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002162
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002163 def test_seek_and_tell(self):
2164 raw = self.BytesIO(b"asdfghjkl")
2165 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002166
Ezio Melottib3aedd42010-11-20 19:04:17 +00002167 self.assertEqual(b"as", rw.read(2))
2168 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002169 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002170 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002171
Antoine Pitroue05565e2011-08-20 14:39:23 +02002172 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002173 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002174 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002175 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002176 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002177 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002178 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002179 self.assertEqual(7, rw.tell())
2180 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002181 rw.flush()
2182 self.assertEqual(b"asdf123fl", raw.getvalue())
2183
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002184 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002185
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002186 def check_flush_and_read(self, read_func):
2187 raw = self.BytesIO(b"abcdefghi")
2188 bufio = self.tp(raw)
2189
Ezio Melottib3aedd42010-11-20 19:04:17 +00002190 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002191 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002192 self.assertEqual(b"ef", read_func(bufio, 2))
2193 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(6, bufio.tell())
2196 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002197 raw.seek(0, 0)
2198 raw.write(b"XYZ")
2199 # flush() resets the read buffer
2200 bufio.flush()
2201 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002202 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002203
2204 def test_flush_and_read(self):
2205 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2206
2207 def test_flush_and_readinto(self):
2208 def _readinto(bufio, n=-1):
2209 b = bytearray(n if n >= 0 else 9999)
2210 n = bufio.readinto(b)
2211 return bytes(b[:n])
2212 self.check_flush_and_read(_readinto)
2213
2214 def test_flush_and_peek(self):
2215 def _peek(bufio, n=-1):
2216 # This relies on the fact that the buffer can contain the whole
2217 # raw stream, otherwise peek() can return less.
2218 b = bufio.peek(n)
2219 if n != -1:
2220 b = b[:n]
2221 bufio.seek(len(b), 1)
2222 return b
2223 self.check_flush_and_read(_peek)
2224
2225 def test_flush_and_write(self):
2226 raw = self.BytesIO(b"abcdefghi")
2227 bufio = self.tp(raw)
2228
2229 bufio.write(b"123")
2230 bufio.flush()
2231 bufio.write(b"45")
2232 bufio.flush()
2233 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002234 self.assertEqual(b"12345fghi", raw.getvalue())
2235 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002236
2237 def test_threads(self):
2238 BufferedReaderTest.test_threads(self)
2239 BufferedWriterTest.test_threads(self)
2240
2241 def test_writes_and_peek(self):
2242 def _peek(bufio):
2243 bufio.peek(1)
2244 self.check_writes(_peek)
2245 def _peek(bufio):
2246 pos = bufio.tell()
2247 bufio.seek(-1, 1)
2248 bufio.peek(1)
2249 bufio.seek(pos, 0)
2250 self.check_writes(_peek)
2251
2252 def test_writes_and_reads(self):
2253 def _read(bufio):
2254 bufio.seek(-1, 1)
2255 bufio.read(1)
2256 self.check_writes(_read)
2257
2258 def test_writes_and_read1s(self):
2259 def _read1(bufio):
2260 bufio.seek(-1, 1)
2261 bufio.read1(1)
2262 self.check_writes(_read1)
2263
2264 def test_writes_and_readintos(self):
2265 def _read(bufio):
2266 bufio.seek(-1, 1)
2267 bufio.readinto(bytearray(1))
2268 self.check_writes(_read)
2269
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002270 def test_write_after_readahead(self):
2271 # Issue #6629: writing after the buffer was filled by readahead should
2272 # first rewind the raw stream.
2273 for overwrite_size in [1, 5]:
2274 raw = self.BytesIO(b"A" * 10)
2275 bufio = self.tp(raw, 4)
2276 # Trigger readahead
2277 self.assertEqual(bufio.read(1), b"A")
2278 self.assertEqual(bufio.tell(), 1)
2279 # Overwriting should rewind the raw stream if it needs so
2280 bufio.write(b"B" * overwrite_size)
2281 self.assertEqual(bufio.tell(), overwrite_size + 1)
2282 # If the write size was smaller than the buffer size, flush() and
2283 # check that rewind happens.
2284 bufio.flush()
2285 self.assertEqual(bufio.tell(), overwrite_size + 1)
2286 s = raw.getvalue()
2287 self.assertEqual(s,
2288 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2289
Antoine Pitrou7c404892011-05-13 00:13:33 +02002290 def test_write_rewind_write(self):
2291 # Various combinations of reading / writing / seeking backwards / writing again
2292 def mutate(bufio, pos1, pos2):
2293 assert pos2 >= pos1
2294 # Fill the buffer
2295 bufio.seek(pos1)
2296 bufio.read(pos2 - pos1)
2297 bufio.write(b'\x02')
2298 # This writes earlier than the previous write, but still inside
2299 # the buffer.
2300 bufio.seek(pos1)
2301 bufio.write(b'\x01')
2302
2303 b = b"\x80\x81\x82\x83\x84"
2304 for i in range(0, len(b)):
2305 for j in range(i, len(b)):
2306 raw = self.BytesIO(b)
2307 bufio = self.tp(raw, 100)
2308 mutate(bufio, i, j)
2309 bufio.flush()
2310 expected = bytearray(b)
2311 expected[j] = 2
2312 expected[i] = 1
2313 self.assertEqual(raw.getvalue(), expected,
2314 "failed result for i=%d, j=%d" % (i, j))
2315
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002316 def test_truncate_after_read_or_write(self):
2317 raw = self.BytesIO(b"A" * 10)
2318 bufio = self.tp(raw, 100)
2319 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2320 self.assertEqual(bufio.truncate(), 2)
2321 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2322 self.assertEqual(bufio.truncate(), 4)
2323
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002324 def test_misbehaved_io(self):
2325 BufferedReaderTest.test_misbehaved_io(self)
2326 BufferedWriterTest.test_misbehaved_io(self)
2327
Antoine Pitroue05565e2011-08-20 14:39:23 +02002328 def test_interleaved_read_write(self):
2329 # Test for issue #12213
2330 with self.BytesIO(b'abcdefgh') as raw:
2331 with self.tp(raw, 100) as f:
2332 f.write(b"1")
2333 self.assertEqual(f.read(1), b'b')
2334 f.write(b'2')
2335 self.assertEqual(f.read1(1), b'd')
2336 f.write(b'3')
2337 buf = bytearray(1)
2338 f.readinto(buf)
2339 self.assertEqual(buf, b'f')
2340 f.write(b'4')
2341 self.assertEqual(f.peek(1), b'h')
2342 f.flush()
2343 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2344
2345 with self.BytesIO(b'abc') as raw:
2346 with self.tp(raw, 100) as f:
2347 self.assertEqual(f.read(1), b'a')
2348 f.write(b"2")
2349 self.assertEqual(f.read(1), b'c')
2350 f.flush()
2351 self.assertEqual(raw.getvalue(), b'a2c')
2352
2353 def test_interleaved_readline_write(self):
2354 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2355 with self.tp(raw) as f:
2356 f.write(b'1')
2357 self.assertEqual(f.readline(), b'b\n')
2358 f.write(b'2')
2359 self.assertEqual(f.readline(), b'def\n')
2360 f.write(b'3')
2361 self.assertEqual(f.readline(), b'\n')
2362 f.flush()
2363 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2364
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002365 # You can't construct a BufferedRandom over a non-seekable stream.
2366 test_unseekable = None
2367
R David Murray67bfe802013-02-23 21:51:05 -05002368
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002369class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002370 tp = io.BufferedRandom
2371
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002372 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2373 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002374 def test_constructor(self):
2375 BufferedRandomTest.test_constructor(self)
2376 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002377 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002378 if sys.maxsize > 0x7FFFFFFF:
2379 rawio = self.MockRawIO()
2380 bufio = self.tp(rawio)
2381 self.assertRaises((OverflowError, MemoryError, ValueError),
2382 bufio.__init__, rawio, sys.maxsize)
2383
2384 def test_garbage_collection(self):
2385 CBufferedReaderTest.test_garbage_collection(self)
2386 CBufferedWriterTest.test_garbage_collection(self)
2387
R David Murray67bfe802013-02-23 21:51:05 -05002388 def test_args_error(self):
2389 # Issue #17275
2390 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2391 self.tp(io.BytesIO(), 1024, 1024, 1024)
2392
2393
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002394class PyBufferedRandomTest(BufferedRandomTest):
2395 tp = pyio.BufferedRandom
2396
2397
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002398# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2399# properties:
2400# - A single output character can correspond to many bytes of input.
2401# - The number of input bytes to complete the character can be
2402# undetermined until the last input byte is received.
2403# - The number of input bytes can vary depending on previous input.
2404# - A single input byte can correspond to many characters of output.
2405# - The number of output characters can be undetermined until the
2406# last input byte is received.
2407# - The number of output characters can vary depending on previous input.
2408
2409class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2410 """
2411 For testing seek/tell behavior with a stateful, buffering decoder.
2412
2413 Input is a sequence of words. Words may be fixed-length (length set
2414 by input) or variable-length (period-terminated). In variable-length
2415 mode, extra periods are ignored. Possible words are:
2416 - 'i' followed by a number sets the input length, I (maximum 99).
2417 When I is set to 0, words are space-terminated.
2418 - 'o' followed by a number sets the output length, O (maximum 99).
2419 - Any other word is converted into a word followed by a period on
2420 the output. The output word consists of the input word truncated
2421 or padded out with hyphens to make its length equal to O. If O
2422 is 0, the word is output verbatim without truncating or padding.
2423 I and O are initially set to 1. When I changes, any buffered input is
2424 re-scanned according to the new I. EOF also terminates the last word.
2425 """
2426
2427 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002428 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002429 self.reset()
2430
2431 def __repr__(self):
2432 return '<SID %x>' % id(self)
2433
2434 def reset(self):
2435 self.i = 1
2436 self.o = 1
2437 self.buffer = bytearray()
2438
2439 def getstate(self):
2440 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2441 return bytes(self.buffer), i*100 + o
2442
2443 def setstate(self, state):
2444 buffer, io = state
2445 self.buffer = bytearray(buffer)
2446 i, o = divmod(io, 100)
2447 self.i, self.o = i ^ 1, o ^ 1
2448
2449 def decode(self, input, final=False):
2450 output = ''
2451 for b in input:
2452 if self.i == 0: # variable-length, terminated with period
2453 if b == ord('.'):
2454 if self.buffer:
2455 output += self.process_word()
2456 else:
2457 self.buffer.append(b)
2458 else: # fixed-length, terminate after self.i bytes
2459 self.buffer.append(b)
2460 if len(self.buffer) == self.i:
2461 output += self.process_word()
2462 if final and self.buffer: # EOF terminates the last word
2463 output += self.process_word()
2464 return output
2465
2466 def process_word(self):
2467 output = ''
2468 if self.buffer[0] == ord('i'):
2469 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2470 elif self.buffer[0] == ord('o'):
2471 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2472 else:
2473 output = self.buffer.decode('ascii')
2474 if len(output) < self.o:
2475 output += '-'*self.o # pad out with hyphens
2476 if self.o:
2477 output = output[:self.o] # truncate to output length
2478 output += '.'
2479 self.buffer = bytearray()
2480 return output
2481
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002482 codecEnabled = False
2483
2484 @classmethod
2485 def lookupTestDecoder(cls, name):
2486 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002487 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002488 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002489 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002490 incrementalencoder=None,
2491 streamreader=None, streamwriter=None,
2492 incrementaldecoder=cls)
2493
2494# Register the previous decoder for testing.
2495# Disabled by default, tests will enable it.
2496codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2497
2498
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002499class StatefulIncrementalDecoderTest(unittest.TestCase):
2500 """
2501 Make sure the StatefulIncrementalDecoder actually works.
2502 """
2503
2504 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002505 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002506 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002507 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002508 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002509 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002510 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002511 # I=0, O=6 (variable-length input, fixed-length output)
2512 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2513 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002514 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002515 # I=6, O=3 (fixed-length input > fixed-length output)
2516 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2517 # I=0, then 3; O=29, then 15 (with longer output)
2518 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2519 'a----------------------------.' +
2520 'b----------------------------.' +
2521 'cde--------------------------.' +
2522 'abcdefghijabcde.' +
2523 'a.b------------.' +
2524 '.c.------------.' +
2525 'd.e------------.' +
2526 'k--------------.' +
2527 'l--------------.' +
2528 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002529 ]
2530
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002531 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002532 # Try a few one-shot test cases.
2533 for input, eof, output in self.test_cases:
2534 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002535 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002536
2537 # Also test an unfinished decode, followed by forcing EOF.
2538 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002539 self.assertEqual(d.decode(b'oiabcd'), '')
2540 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002541
2542class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002543
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002544 def setUp(self):
2545 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2546 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002547 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002548
Guido van Rossumd0712812007-04-11 16:32:43 +00002549 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002550 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002551
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002552 def test_constructor(self):
2553 r = self.BytesIO(b"\xc3\xa9\n\n")
2554 b = self.BufferedReader(r, 1000)
2555 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002556 t.__init__(b, encoding="latin-1", newline="\r\n")
2557 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002558 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002559 t.__init__(b, encoding="utf-8", line_buffering=True)
2560 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002561 self.assertEqual(t.line_buffering, True)
2562 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002563 self.assertRaises(TypeError, t.__init__, b, newline=42)
2564 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2565
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002566 def test_uninitialized(self):
2567 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2568 del t
2569 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2570 self.assertRaises(Exception, repr, t)
2571 self.assertRaisesRegex((ValueError, AttributeError),
2572 'uninitialized|has no attribute',
2573 t.read, 0)
2574 t.__init__(self.MockRawIO())
2575 self.assertEqual(t.read(0), '')
2576
Nick Coghlana9b15242014-02-04 22:11:18 +10002577 def test_non_text_encoding_codecs_are_rejected(self):
2578 # Ensure the constructor complains if passed a codec that isn't
2579 # marked as a text encoding
2580 # http://bugs.python.org/issue20404
2581 r = self.BytesIO()
2582 b = self.BufferedWriter(r)
2583 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2584 self.TextIOWrapper(b, encoding="hex")
2585
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002586 def test_detach(self):
2587 r = self.BytesIO()
2588 b = self.BufferedWriter(r)
2589 t = self.TextIOWrapper(b)
2590 self.assertIs(t.detach(), b)
2591
2592 t = self.TextIOWrapper(b, encoding="ascii")
2593 t.write("howdy")
2594 self.assertFalse(r.getvalue())
2595 t.detach()
2596 self.assertEqual(r.getvalue(), b"howdy")
2597 self.assertRaises(ValueError, t.detach)
2598
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002599 # Operations independent of the detached stream should still work
2600 repr(t)
2601 self.assertEqual(t.encoding, "ascii")
2602 self.assertEqual(t.errors, "strict")
2603 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002604 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002605
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002606 def test_repr(self):
2607 raw = self.BytesIO("hello".encode("utf-8"))
2608 b = self.BufferedReader(raw)
2609 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002610 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002611 self.assertRegex(repr(t),
2612 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002613 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002614 self.assertRegex(repr(t),
2615 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002616 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002617 self.assertRegex(repr(t),
2618 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002619 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002620 self.assertRegex(repr(t),
2621 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002622
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002623 t.buffer.detach()
2624 repr(t) # Should not raise an exception
2625
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002626 def test_recursive_repr(self):
2627 # Issue #25455
2628 raw = self.BytesIO()
2629 t = self.TextIOWrapper(raw)
2630 with support.swap_attr(raw, 'name', t):
2631 try:
2632 repr(t) # Should not crash
2633 except RuntimeError:
2634 pass
2635
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002636 def test_line_buffering(self):
2637 r = self.BytesIO()
2638 b = self.BufferedWriter(r, 1000)
2639 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002640 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002641 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002642 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002643 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002644 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002645 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002646
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002647 def test_reconfigure_line_buffering(self):
2648 r = self.BytesIO()
2649 b = self.BufferedWriter(r, 1000)
2650 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2651 t.write("AB\nC")
2652 self.assertEqual(r.getvalue(), b"")
2653
2654 t.reconfigure(line_buffering=True) # implicit flush
2655 self.assertEqual(r.getvalue(), b"AB\nC")
2656 t.write("DEF\nG")
2657 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2658 t.write("H")
2659 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2660 t.reconfigure(line_buffering=False) # implicit flush
2661 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2662 t.write("IJ")
2663 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2664
2665 # Keeping default value
2666 t.reconfigure()
2667 t.reconfigure(line_buffering=None)
2668 self.assertEqual(t.line_buffering, False)
2669 t.reconfigure(line_buffering=True)
2670 t.reconfigure()
2671 t.reconfigure(line_buffering=None)
2672 self.assertEqual(t.line_buffering, True)
2673
Victor Stinner91106cd2017-12-13 12:29:09 +01002674 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002675 def test_default_encoding(self):
2676 old_environ = dict(os.environ)
2677 try:
2678 # try to get a user preferred encoding different than the current
2679 # locale encoding to check that TextIOWrapper() uses the current
2680 # locale encoding and not the user preferred encoding
2681 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2682 if key in os.environ:
2683 del os.environ[key]
2684
2685 current_locale_encoding = locale.getpreferredencoding(False)
2686 b = self.BytesIO()
2687 t = self.TextIOWrapper(b)
2688 self.assertEqual(t.encoding, current_locale_encoding)
2689 finally:
2690 os.environ.clear()
2691 os.environ.update(old_environ)
2692
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002693 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002694 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002695 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002696 # Issue 15989
2697 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002698 b = self.BytesIO()
2699 b.fileno = lambda: _testcapi.INT_MAX + 1
2700 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2701 b.fileno = lambda: _testcapi.UINT_MAX + 1
2702 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2703
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002704 def test_encoding(self):
2705 # Check the encoding attribute is always set, and valid
2706 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002707 t = self.TextIOWrapper(b, encoding="utf-8")
2708 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002709 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002710 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002711 codecs.lookup(t.encoding)
2712
2713 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002714 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002715 b = self.BytesIO(b"abc\n\xff\n")
2716 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002717 self.assertRaises(UnicodeError, t.read)
2718 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002719 b = self.BytesIO(b"abc\n\xff\n")
2720 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002721 self.assertRaises(UnicodeError, t.read)
2722 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723 b = self.BytesIO(b"abc\n\xff\n")
2724 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002725 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002726 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002727 b = self.BytesIO(b"abc\n\xff\n")
2728 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002729 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002731 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002732 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002733 b = self.BytesIO()
2734 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002735 self.assertRaises(UnicodeError, t.write, "\xff")
2736 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002737 b = self.BytesIO()
2738 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002739 self.assertRaises(UnicodeError, t.write, "\xff")
2740 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002741 b = self.BytesIO()
2742 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002743 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002744 t.write("abc\xffdef\n")
2745 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002746 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002747 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002748 b = self.BytesIO()
2749 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002750 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002751 t.write("abc\xffdef\n")
2752 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002753 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002754
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002755 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002756 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2757
2758 tests = [
2759 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002760 [ '', input_lines ],
2761 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2762 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2763 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002764 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002765 encodings = (
2766 'utf-8', 'latin-1',
2767 'utf-16', 'utf-16-le', 'utf-16-be',
2768 'utf-32', 'utf-32-le', 'utf-32-be',
2769 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002770
Guido van Rossum8358db22007-08-18 21:39:55 +00002771 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002772 # character in TextIOWrapper._pending_line.
2773 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002774 # XXX: str.encode() should return bytes
2775 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002776 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002777 for bufsize in range(1, 10):
2778 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002779 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2780 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002781 encoding=encoding)
2782 if do_reads:
2783 got_lines = []
2784 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002785 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002786 if c2 == '':
2787 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002788 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002789 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002790 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002791 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002792
2793 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002794 self.assertEqual(got_line, exp_line)
2795 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002796
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002797 def test_newlines_input(self):
2798 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002799 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2800 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002801 (None, normalized.decode("ascii").splitlines(keepends=True)),
2802 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002803 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2804 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2805 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002806 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002807 buf = self.BytesIO(testdata)
2808 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002809 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002810 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002811 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002812
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002813 def test_newlines_output(self):
2814 testdict = {
2815 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2816 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2817 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2818 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2819 }
2820 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2821 for newline, expected in tests:
2822 buf = self.BytesIO()
2823 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2824 txt.write("AAA\nB")
2825 txt.write("BB\nCCC\n")
2826 txt.write("X\rY\r\nZ")
2827 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002828 self.assertEqual(buf.closed, False)
2829 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002830
2831 def test_destructor(self):
2832 l = []
2833 base = self.BytesIO
2834 class MyBytesIO(base):
2835 def close(self):
2836 l.append(self.getvalue())
2837 base.close(self)
2838 b = MyBytesIO()
2839 t = self.TextIOWrapper(b, encoding="ascii")
2840 t.write("abc")
2841 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002842 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002843 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002844
2845 def test_override_destructor(self):
2846 record = []
2847 class MyTextIO(self.TextIOWrapper):
2848 def __del__(self):
2849 record.append(1)
2850 try:
2851 f = super().__del__
2852 except AttributeError:
2853 pass
2854 else:
2855 f()
2856 def close(self):
2857 record.append(2)
2858 super().close()
2859 def flush(self):
2860 record.append(3)
2861 super().flush()
2862 b = self.BytesIO()
2863 t = MyTextIO(b, encoding="ascii")
2864 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002865 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002866 self.assertEqual(record, [1, 2, 3])
2867
2868 def test_error_through_destructor(self):
2869 # Test that the exception state is not modified by a destructor,
2870 # even if close() fails.
2871 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002872 with support.catch_unraisable_exception() as cm:
2873 with self.assertRaises(AttributeError):
2874 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002875
2876 if not IOBASE_EMITS_UNRAISABLE:
2877 self.assertIsNone(cm.unraisable)
2878 elif cm.unraisable is not None:
2879 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002880
Guido van Rossum9b76da62007-04-11 01:09:03 +00002881 # Systematic tests of the text I/O API
2882
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002883 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002884 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002885 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002886 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002887 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002888 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002889 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002890 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002891 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002892 self.assertEqual(f.tell(), 0)
2893 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002894 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002895 self.assertEqual(f.seek(0), 0)
2896 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002897 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002898 self.assertEqual(f.read(2), "ab")
2899 self.assertEqual(f.read(1), "c")
2900 self.assertEqual(f.read(1), "")
2901 self.assertEqual(f.read(), "")
2902 self.assertEqual(f.tell(), cookie)
2903 self.assertEqual(f.seek(0), 0)
2904 self.assertEqual(f.seek(0, 2), cookie)
2905 self.assertEqual(f.write("def"), 3)
2906 self.assertEqual(f.seek(cookie), cookie)
2907 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002908 if enc.startswith("utf"):
2909 self.multi_line_test(f, enc)
2910 f.close()
2911
2912 def multi_line_test(self, f, enc):
2913 f.seek(0)
2914 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002915 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002916 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002917 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002918 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002919 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002920 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002921 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002922 wlines.append((f.tell(), line))
2923 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002924 f.seek(0)
2925 rlines = []
2926 while True:
2927 pos = f.tell()
2928 line = f.readline()
2929 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002930 break
2931 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002932 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002933
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002934 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002935 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002936 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002937 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002938 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002939 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002940 p2 = f.tell()
2941 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002942 self.assertEqual(f.tell(), p0)
2943 self.assertEqual(f.readline(), "\xff\n")
2944 self.assertEqual(f.tell(), p1)
2945 self.assertEqual(f.readline(), "\xff\n")
2946 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002947 f.seek(0)
2948 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002949 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002950 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002951 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002952 f.close()
2953
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002954 def test_seeking(self):
2955 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002956 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002957 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002958 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002959 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002960 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002961 suffix = bytes(u_suffix.encode("utf-8"))
2962 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002963 with self.open(support.TESTFN, "wb") as f:
2964 f.write(line*2)
2965 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2966 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002967 self.assertEqual(s, str(prefix, "ascii"))
2968 self.assertEqual(f.tell(), prefix_size)
2969 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002970
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002971 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002972 # Regression test for a specific bug
2973 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002974 with self.open(support.TESTFN, "wb") as f:
2975 f.write(data)
2976 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2977 f._CHUNK_SIZE # Just test that it exists
2978 f._CHUNK_SIZE = 2
2979 f.readline()
2980 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002981
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002982 def test_seek_and_tell(self):
2983 #Test seek/tell using the StatefulIncrementalDecoder.
2984 # Make test faster by doing smaller seeks
2985 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002986
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002987 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002988 """Tell/seek to various points within a data stream and ensure
2989 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002990 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002991 f.write(data)
2992 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002993 f = self.open(support.TESTFN, encoding='test_decoder')
2994 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002995 decoded = f.read()
2996 f.close()
2997
Neal Norwitze2b07052008-03-18 19:52:05 +00002998 for i in range(min_pos, len(decoded) + 1): # seek positions
2999 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003000 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003001 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003002 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003003 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003004 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003005 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003006 f.close()
3007
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003008 # Enable the test decoder.
3009 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003010
3011 # Run the tests.
3012 try:
3013 # Try each test case.
3014 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003015 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003016
3017 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003018 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3019 offset = CHUNK_SIZE - len(input)//2
3020 prefix = b'.'*offset
3021 # Don't bother seeking into the prefix (takes too long).
3022 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003023 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003024
3025 # Ensure our test decoder won't interfere with subsequent tests.
3026 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003027 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003028
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003029 def test_multibyte_seek_and_tell(self):
3030 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3031 f.write("AB\n\u3046\u3048\n")
3032 f.close()
3033
3034 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3035 self.assertEqual(f.readline(), "AB\n")
3036 p0 = f.tell()
3037 self.assertEqual(f.readline(), "\u3046\u3048\n")
3038 p1 = f.tell()
3039 f.seek(p0)
3040 self.assertEqual(f.readline(), "\u3046\u3048\n")
3041 self.assertEqual(f.tell(), p1)
3042 f.close()
3043
3044 def test_seek_with_encoder_state(self):
3045 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3046 f.write("\u00e6\u0300")
3047 p0 = f.tell()
3048 f.write("\u00e6")
3049 f.seek(p0)
3050 f.write("\u0300")
3051 f.close()
3052
3053 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3054 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3055 f.close()
3056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003057 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003058 data = "1234567890"
3059 tests = ("utf-16",
3060 "utf-16-le",
3061 "utf-16-be",
3062 "utf-32",
3063 "utf-32-le",
3064 "utf-32-be")
3065 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003066 buf = self.BytesIO()
3067 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003068 # Check if the BOM is written only once (see issue1753).
3069 f.write(data)
3070 f.write(data)
3071 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003072 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003073 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003074 self.assertEqual(f.read(), data * 2)
3075 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003076
Benjamin Petersona1b49012009-03-31 23:11:32 +00003077 def test_unreadable(self):
3078 class UnReadable(self.BytesIO):
3079 def readable(self):
3080 return False
3081 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003082 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003083
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003084 def test_read_one_by_one(self):
3085 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003086 reads = ""
3087 while True:
3088 c = txt.read(1)
3089 if not c:
3090 break
3091 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003092 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003093
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003094 def test_readlines(self):
3095 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3096 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3097 txt.seek(0)
3098 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3099 txt.seek(0)
3100 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3101
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003102 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003103 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003104 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003105 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003106 reads = ""
3107 while True:
3108 c = txt.read(128)
3109 if not c:
3110 break
3111 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003112 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003113
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003114 def test_writelines(self):
3115 l = ['ab', 'cd', 'ef']
3116 buf = self.BytesIO()
3117 txt = self.TextIOWrapper(buf)
3118 txt.writelines(l)
3119 txt.flush()
3120 self.assertEqual(buf.getvalue(), b'abcdef')
3121
3122 def test_writelines_userlist(self):
3123 l = UserList(['ab', 'cd', 'ef'])
3124 buf = self.BytesIO()
3125 txt = self.TextIOWrapper(buf)
3126 txt.writelines(l)
3127 txt.flush()
3128 self.assertEqual(buf.getvalue(), b'abcdef')
3129
3130 def test_writelines_error(self):
3131 txt = self.TextIOWrapper(self.BytesIO())
3132 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3133 self.assertRaises(TypeError, txt.writelines, None)
3134 self.assertRaises(TypeError, txt.writelines, b'abc')
3135
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003136 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003137 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003138
3139 # read one char at a time
3140 reads = ""
3141 while True:
3142 c = txt.read(1)
3143 if not c:
3144 break
3145 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003146 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003147
3148 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003149 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003150 txt._CHUNK_SIZE = 4
3151
3152 reads = ""
3153 while True:
3154 c = txt.read(4)
3155 if not c:
3156 break
3157 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003158 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003159
3160 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003161 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003162 txt._CHUNK_SIZE = 4
3163
3164 reads = txt.read(4)
3165 reads += txt.read(4)
3166 reads += txt.readline()
3167 reads += txt.readline()
3168 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003169 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003170
3171 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003172 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003173 txt._CHUNK_SIZE = 4
3174
3175 reads = txt.read(4)
3176 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003177 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003178
3179 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003180 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003181 txt._CHUNK_SIZE = 4
3182
3183 reads = txt.read(4)
3184 pos = txt.tell()
3185 txt.seek(0)
3186 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003187 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003188
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003189 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003190 buffer = self.BytesIO(self.testdata)
3191 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003192
3193 self.assertEqual(buffer.seekable(), txt.seekable())
3194
Antoine Pitroue4501852009-05-14 18:55:55 +00003195 def test_append_bom(self):
3196 # The BOM is not written again when appending to a non-empty file
3197 filename = support.TESTFN
3198 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3199 with self.open(filename, 'w', encoding=charset) as f:
3200 f.write('aaa')
3201 pos = f.tell()
3202 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003203 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003204
3205 with self.open(filename, 'a', encoding=charset) as f:
3206 f.write('xxx')
3207 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003208 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003209
3210 def test_seek_bom(self):
3211 # Same test, but when seeking manually
3212 filename = support.TESTFN
3213 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3214 with self.open(filename, 'w', encoding=charset) as f:
3215 f.write('aaa')
3216 pos = f.tell()
3217 with self.open(filename, 'r+', encoding=charset) as f:
3218 f.seek(pos)
3219 f.write('zzz')
3220 f.seek(0)
3221 f.write('bbb')
3222 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003223 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003224
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003225 def test_seek_append_bom(self):
3226 # Same test, but first seek to the start and then to the end
3227 filename = support.TESTFN
3228 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3229 with self.open(filename, 'w', encoding=charset) as f:
3230 f.write('aaa')
3231 with self.open(filename, 'a', encoding=charset) as f:
3232 f.seek(0)
3233 f.seek(0, self.SEEK_END)
3234 f.write('xxx')
3235 with self.open(filename, 'rb') as f:
3236 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3237
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003238 def test_errors_property(self):
3239 with self.open(support.TESTFN, "w") as f:
3240 self.assertEqual(f.errors, "strict")
3241 with self.open(support.TESTFN, "w", errors="replace") as f:
3242 self.assertEqual(f.errors, "replace")
3243
Brett Cannon31f59292011-02-21 19:29:56 +00003244 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003245 def test_threads_write(self):
3246 # Issue6750: concurrent writes could duplicate data
3247 event = threading.Event()
3248 with self.open(support.TESTFN, "w", buffering=1) as f:
3249 def run(n):
3250 text = "Thread%03d\n" % n
3251 event.wait()
3252 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003253 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003254 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003255 with support.start_threads(threads, event.set):
3256 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003257 with self.open(support.TESTFN) as f:
3258 content = f.read()
3259 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003260 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003261
Antoine Pitrou6be88762010-05-03 16:48:20 +00003262 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003263 # Test that text file is closed despite failed flush
3264 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003265 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003266 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003267 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003268 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003269 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003270 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003271 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003272 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003273 self.assertTrue(txt.buffer.closed)
3274 self.assertTrue(closed) # flush() called
3275 self.assertFalse(closed[0]) # flush() called before file closed
3276 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003277 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003278
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003279 def test_close_error_on_close(self):
3280 buffer = self.BytesIO(self.testdata)
3281 def bad_flush():
3282 raise OSError('flush')
3283 def bad_close():
3284 raise OSError('close')
3285 buffer.close = bad_close
3286 txt = self.TextIOWrapper(buffer, encoding="ascii")
3287 txt.flush = bad_flush
3288 with self.assertRaises(OSError) as err: # exception not swallowed
3289 txt.close()
3290 self.assertEqual(err.exception.args, ('close',))
3291 self.assertIsInstance(err.exception.__context__, OSError)
3292 self.assertEqual(err.exception.__context__.args, ('flush',))
3293 self.assertFalse(txt.closed)
3294
Victor Stinner472f7942019-04-12 21:58:24 +02003295 # Silence destructor error
3296 buffer.close = lambda: None
3297 txt.flush = lambda: None
3298
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003299 def test_nonnormalized_close_error_on_close(self):
3300 # Issue #21677
3301 buffer = self.BytesIO(self.testdata)
3302 def bad_flush():
3303 raise non_existing_flush
3304 def bad_close():
3305 raise non_existing_close
3306 buffer.close = bad_close
3307 txt = self.TextIOWrapper(buffer, encoding="ascii")
3308 txt.flush = bad_flush
3309 with self.assertRaises(NameError) as err: # exception not swallowed
3310 txt.close()
3311 self.assertIn('non_existing_close', str(err.exception))
3312 self.assertIsInstance(err.exception.__context__, NameError)
3313 self.assertIn('non_existing_flush', str(err.exception.__context__))
3314 self.assertFalse(txt.closed)
3315
Victor Stinner472f7942019-04-12 21:58:24 +02003316 # Silence destructor error
3317 buffer.close = lambda: None
3318 txt.flush = lambda: None
3319
Antoine Pitrou6be88762010-05-03 16:48:20 +00003320 def test_multi_close(self):
3321 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3322 txt.close()
3323 txt.close()
3324 txt.close()
3325 self.assertRaises(ValueError, txt.flush)
3326
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003327 def test_unseekable(self):
3328 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3329 self.assertRaises(self.UnsupportedOperation, txt.tell)
3330 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3331
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003332 def test_readonly_attributes(self):
3333 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3334 buf = self.BytesIO(self.testdata)
3335 with self.assertRaises(AttributeError):
3336 txt.buffer = buf
3337
Antoine Pitroue96ec682011-07-23 21:46:35 +02003338 def test_rawio(self):
3339 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3340 # that subprocess.Popen() can have the required unbuffered
3341 # semantics with universal_newlines=True.
3342 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3343 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3344 # Reads
3345 self.assertEqual(txt.read(4), 'abcd')
3346 self.assertEqual(txt.readline(), 'efghi\n')
3347 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3348
3349 def test_rawio_write_through(self):
3350 # Issue #12591: with write_through=True, writes don't need a flush
3351 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3352 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3353 write_through=True)
3354 txt.write('1')
3355 txt.write('23\n4')
3356 txt.write('5')
3357 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3358
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003359 def test_bufio_write_through(self):
3360 # Issue #21396: write_through=True doesn't force a flush()
3361 # on the underlying binary buffered object.
3362 flush_called, write_called = [], []
3363 class BufferedWriter(self.BufferedWriter):
3364 def flush(self, *args, **kwargs):
3365 flush_called.append(True)
3366 return super().flush(*args, **kwargs)
3367 def write(self, *args, **kwargs):
3368 write_called.append(True)
3369 return super().write(*args, **kwargs)
3370
3371 rawio = self.BytesIO()
3372 data = b"a"
3373 bufio = BufferedWriter(rawio, len(data)*2)
3374 textio = self.TextIOWrapper(bufio, encoding='ascii',
3375 write_through=True)
3376 # write to the buffered io but don't overflow the buffer
3377 text = data.decode('ascii')
3378 textio.write(text)
3379
3380 # buffer.flush is not called with write_through=True
3381 self.assertFalse(flush_called)
3382 # buffer.write *is* called with write_through=True
3383 self.assertTrue(write_called)
3384 self.assertEqual(rawio.getvalue(), b"") # no flush
3385
3386 write_called = [] # reset
3387 textio.write(text * 10) # total content is larger than bufio buffer
3388 self.assertTrue(write_called)
3389 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3390
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003391 def test_reconfigure_write_through(self):
3392 raw = self.MockRawIO([])
3393 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3394 t.write('1')
3395 t.reconfigure(write_through=True) # implied flush
3396 self.assertEqual(t.write_through, True)
3397 self.assertEqual(b''.join(raw._write_stack), b'1')
3398 t.write('23')
3399 self.assertEqual(b''.join(raw._write_stack), b'123')
3400 t.reconfigure(write_through=False)
3401 self.assertEqual(t.write_through, False)
3402 t.write('45')
3403 t.flush()
3404 self.assertEqual(b''.join(raw._write_stack), b'12345')
3405 # Keeping default value
3406 t.reconfigure()
3407 t.reconfigure(write_through=None)
3408 self.assertEqual(t.write_through, False)
3409 t.reconfigure(write_through=True)
3410 t.reconfigure()
3411 t.reconfigure(write_through=None)
3412 self.assertEqual(t.write_through, True)
3413
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003414 def test_read_nonbytes(self):
3415 # Issue #17106
3416 # Crash when underlying read() returns non-bytes
3417 t = self.TextIOWrapper(self.StringIO('a'))
3418 self.assertRaises(TypeError, t.read, 1)
3419 t = self.TextIOWrapper(self.StringIO('a'))
3420 self.assertRaises(TypeError, t.readline)
3421 t = self.TextIOWrapper(self.StringIO('a'))
3422 self.assertRaises(TypeError, t.read)
3423
Oren Milmana5b4ea12017-08-25 21:14:54 +03003424 def test_illegal_encoder(self):
3425 # Issue 31271: Calling write() while the return value of encoder's
3426 # encode() is invalid shouldn't cause an assertion failure.
3427 rot13 = codecs.lookup("rot13")
3428 with support.swap_attr(rot13, '_is_text_encoding', True):
3429 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3430 self.assertRaises(TypeError, t.write, 'bar')
3431
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003432 def test_illegal_decoder(self):
3433 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003434 # Bypass the early encoding check added in issue 20404
3435 def _make_illegal_wrapper():
3436 quopri = codecs.lookup("quopri")
3437 quopri._is_text_encoding = True
3438 try:
3439 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3440 newline='\n', encoding="quopri")
3441 finally:
3442 quopri._is_text_encoding = False
3443 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003444 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003445 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003446 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003447 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003448 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003449 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003450 self.assertRaises(TypeError, t.read)
3451
Oren Milmanba7d7362017-08-29 11:58:27 +03003452 # Issue 31243: calling read() while the return value of decoder's
3453 # getstate() is invalid should neither crash the interpreter nor
3454 # raise a SystemError.
3455 def _make_very_illegal_wrapper(getstate_ret_val):
3456 class BadDecoder:
3457 def getstate(self):
3458 return getstate_ret_val
3459 def _get_bad_decoder(dummy):
3460 return BadDecoder()
3461 quopri = codecs.lookup("quopri")
3462 with support.swap_attr(quopri, 'incrementaldecoder',
3463 _get_bad_decoder):
3464 return _make_illegal_wrapper()
3465 t = _make_very_illegal_wrapper(42)
3466 self.assertRaises(TypeError, t.read, 42)
3467 t = _make_very_illegal_wrapper(())
3468 self.assertRaises(TypeError, t.read, 42)
3469 t = _make_very_illegal_wrapper((1, 2))
3470 self.assertRaises(TypeError, t.read, 42)
3471
Antoine Pitrou712cb732013-12-21 15:51:54 +01003472 def _check_create_at_shutdown(self, **kwargs):
3473 # Issue #20037: creating a TextIOWrapper at shutdown
3474 # shouldn't crash the interpreter.
3475 iomod = self.io.__name__
3476 code = """if 1:
3477 import codecs
3478 import {iomod} as io
3479
3480 # Avoid looking up codecs at shutdown
3481 codecs.lookup('utf-8')
3482
3483 class C:
3484 def __init__(self):
3485 self.buf = io.BytesIO()
3486 def __del__(self):
3487 io.TextIOWrapper(self.buf, **{kwargs})
3488 print("ok")
3489 c = C()
3490 """.format(iomod=iomod, kwargs=kwargs)
3491 return assert_python_ok("-c", code)
3492
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003493 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003494 def test_create_at_shutdown_without_encoding(self):
3495 rc, out, err = self._check_create_at_shutdown()
3496 if err:
3497 # Can error out with a RuntimeError if the module state
3498 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003499 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003500 else:
3501 self.assertEqual("ok", out.decode().strip())
3502
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003503 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003504 def test_create_at_shutdown_with_encoding(self):
3505 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3506 errors='strict')
3507 self.assertFalse(err)
3508 self.assertEqual("ok", out.decode().strip())
3509
Antoine Pitroub8503892014-04-29 10:14:02 +02003510 def test_read_byteslike(self):
3511 r = MemviewBytesIO(b'Just some random string\n')
3512 t = self.TextIOWrapper(r, 'utf-8')
3513
3514 # TextIOwrapper will not read the full string, because
3515 # we truncate it to a multiple of the native int size
3516 # so that we can construct a more complex memoryview.
3517 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3518
3519 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3520
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003521 def test_issue22849(self):
3522 class F(object):
3523 def readable(self): return True
3524 def writable(self): return True
3525 def seekable(self): return True
3526
3527 for i in range(10):
3528 try:
3529 self.TextIOWrapper(F(), encoding='utf-8')
3530 except Exception:
3531 pass
3532
3533 F.tell = lambda x: 0
3534 t = self.TextIOWrapper(F(), encoding='utf-8')
3535
INADA Naoki507434f2017-12-21 09:59:53 +09003536 def test_reconfigure_encoding_read(self):
3537 # latin1 -> utf8
3538 # (latin1 can decode utf-8 encoded string)
3539 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3540 raw = self.BytesIO(data)
3541 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3542 self.assertEqual(txt.readline(), 'abc\xe9\n')
3543 with self.assertRaises(self.UnsupportedOperation):
3544 txt.reconfigure(encoding='utf-8')
3545 with self.assertRaises(self.UnsupportedOperation):
3546 txt.reconfigure(newline=None)
3547
3548 def test_reconfigure_write_fromascii(self):
3549 # ascii has a specific encodefunc in the C implementation,
3550 # but utf-8-sig has not. Make sure that we get rid of the
3551 # cached encodefunc when we switch encoders.
3552 raw = self.BytesIO()
3553 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3554 txt.write('foo\n')
3555 txt.reconfigure(encoding='utf-8-sig')
3556 txt.write('\xe9\n')
3557 txt.flush()
3558 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3559
3560 def test_reconfigure_write(self):
3561 # latin -> utf8
3562 raw = self.BytesIO()
3563 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3564 txt.write('abc\xe9\n')
3565 txt.reconfigure(encoding='utf-8')
3566 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3567 txt.write('d\xe9f\n')
3568 txt.flush()
3569 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3570
3571 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3572 # the file
3573 raw = self.BytesIO()
3574 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3575 txt.write('abc\n')
3576 txt.reconfigure(encoding='utf-8-sig')
3577 txt.write('d\xe9f\n')
3578 txt.flush()
3579 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3580
3581 def test_reconfigure_write_non_seekable(self):
3582 raw = self.BytesIO()
3583 raw.seekable = lambda: False
3584 raw.seek = None
3585 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3586 txt.write('abc\n')
3587 txt.reconfigure(encoding='utf-8-sig')
3588 txt.write('d\xe9f\n')
3589 txt.flush()
3590
3591 # If the raw stream is not seekable, there'll be a BOM
3592 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3593
3594 def test_reconfigure_defaults(self):
3595 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3596 txt.reconfigure(encoding=None)
3597 self.assertEqual(txt.encoding, 'ascii')
3598 self.assertEqual(txt.errors, 'replace')
3599 txt.write('LF\n')
3600
3601 txt.reconfigure(newline='\r\n')
3602 self.assertEqual(txt.encoding, 'ascii')
3603 self.assertEqual(txt.errors, 'replace')
3604
3605 txt.reconfigure(errors='ignore')
3606 self.assertEqual(txt.encoding, 'ascii')
3607 self.assertEqual(txt.errors, 'ignore')
3608 txt.write('CRLF\n')
3609
3610 txt.reconfigure(encoding='utf-8', newline=None)
3611 self.assertEqual(txt.errors, 'strict')
3612 txt.seek(0)
3613 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3614
3615 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3616
3617 def test_reconfigure_newline(self):
3618 raw = self.BytesIO(b'CR\rEOF')
3619 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3620 txt.reconfigure(newline=None)
3621 self.assertEqual(txt.readline(), 'CR\n')
3622 raw = self.BytesIO(b'CR\rEOF')
3623 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3624 txt.reconfigure(newline='')
3625 self.assertEqual(txt.readline(), 'CR\r')
3626 raw = self.BytesIO(b'CR\rLF\nEOF')
3627 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3628 txt.reconfigure(newline='\n')
3629 self.assertEqual(txt.readline(), 'CR\rLF\n')
3630 raw = self.BytesIO(b'LF\nCR\rEOF')
3631 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3632 txt.reconfigure(newline='\r')
3633 self.assertEqual(txt.readline(), 'LF\nCR\r')
3634 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3635 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3636 txt.reconfigure(newline='\r\n')
3637 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3638
3639 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3640 txt.reconfigure(newline=None)
3641 txt.write('linesep\n')
3642 txt.reconfigure(newline='')
3643 txt.write('LF\n')
3644 txt.reconfigure(newline='\n')
3645 txt.write('LF\n')
3646 txt.reconfigure(newline='\r')
3647 txt.write('CR\n')
3648 txt.reconfigure(newline='\r\n')
3649 txt.write('CRLF\n')
3650 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3651 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3652
Zackery Spytz23db9352018-06-29 04:14:58 -06003653 def test_issue25862(self):
3654 # Assertion failures occurred in tell() after read() and write().
3655 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3656 t.read(1)
3657 t.read()
3658 t.tell()
3659 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3660 t.read(1)
3661 t.write('x')
3662 t.tell()
3663
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003664
Antoine Pitroub8503892014-04-29 10:14:02 +02003665class MemviewBytesIO(io.BytesIO):
3666 '''A BytesIO object whose read method returns memoryviews
3667 rather than bytes'''
3668
3669 def read1(self, len_):
3670 return _to_memoryview(super().read1(len_))
3671
3672 def read(self, len_):
3673 return _to_memoryview(super().read(len_))
3674
3675def _to_memoryview(buf):
3676 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3677
3678 arr = array.array('i')
3679 idx = len(buf) - len(buf) % arr.itemsize
3680 arr.frombytes(buf[:idx])
3681 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003682
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003683
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003684class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003685 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003686 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003687
3688 def test_initialization(self):
3689 r = self.BytesIO(b"\xc3\xa9\n\n")
3690 b = self.BufferedReader(r, 1000)
3691 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003692 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3693 self.assertRaises(ValueError, t.read)
3694
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003695 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3696 self.assertRaises(Exception, repr, t)
3697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003698 def test_garbage_collection(self):
3699 # C TextIOWrapper objects are collected, and collecting them flushes
3700 # all data to disk.
3701 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003702 with support.check_warnings(('', ResourceWarning)):
3703 rawio = io.FileIO(support.TESTFN, "wb")
3704 b = self.BufferedWriter(rawio)
3705 t = self.TextIOWrapper(b, encoding="ascii")
3706 t.write("456def")
3707 t.x = t
3708 wr = weakref.ref(t)
3709 del t
3710 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003711 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003712 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003713 self.assertEqual(f.read(), b"456def")
3714
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003715 def test_rwpair_cleared_before_textio(self):
3716 # Issue 13070: TextIOWrapper's finalization would crash when called
3717 # after the reference to the underlying BufferedRWPair's writer got
3718 # cleared by the GC.
3719 for i in range(1000):
3720 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3721 t1 = self.TextIOWrapper(b1, encoding="ascii")
3722 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3723 t2 = self.TextIOWrapper(b2, encoding="ascii")
3724 # circular references
3725 t1.buddy = t2
3726 t2.buddy = t1
3727 support.gc_collect()
3728
Zackery Spytz842acaa2018-12-17 07:52:45 -07003729 def test_del__CHUNK_SIZE_SystemError(self):
3730 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3731 with self.assertRaises(AttributeError):
3732 del t._CHUNK_SIZE
3733
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003734
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003735class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003736 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003737 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003738
3739
3740class IncrementalNewlineDecoderTest(unittest.TestCase):
3741
3742 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003743 # UTF-8 specific tests for a newline decoder
3744 def _check_decode(b, s, **kwargs):
3745 # We exercise getstate() / setstate() as well as decode()
3746 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003747 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003748 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003749 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003750
Antoine Pitrou180a3362008-12-14 16:36:46 +00003751 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003752
Antoine Pitrou180a3362008-12-14 16:36:46 +00003753 _check_decode(b'\xe8', "")
3754 _check_decode(b'\xa2', "")
3755 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003756
Antoine Pitrou180a3362008-12-14 16:36:46 +00003757 _check_decode(b'\xe8', "")
3758 _check_decode(b'\xa2', "")
3759 _check_decode(b'\x88', "\u8888")
3760
3761 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003762 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3763
Antoine Pitrou180a3362008-12-14 16:36:46 +00003764 decoder.reset()
3765 _check_decode(b'\n', "\n")
3766 _check_decode(b'\r', "")
3767 _check_decode(b'', "\n", final=True)
3768 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003769
Antoine Pitrou180a3362008-12-14 16:36:46 +00003770 _check_decode(b'\r', "")
3771 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003772
Antoine Pitrou180a3362008-12-14 16:36:46 +00003773 _check_decode(b'\r\r\n', "\n\n")
3774 _check_decode(b'\r', "")
3775 _check_decode(b'\r', "\n")
3776 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003777
Antoine Pitrou180a3362008-12-14 16:36:46 +00003778 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3779 _check_decode(b'\xe8\xa2\x88', "\u8888")
3780 _check_decode(b'\n', "\n")
3781 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3782 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003783
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003784 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003785 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003786 if encoding is not None:
3787 encoder = codecs.getincrementalencoder(encoding)()
3788 def _decode_bytewise(s):
3789 # Decode one byte at a time
3790 for b in encoder.encode(s):
3791 result.append(decoder.decode(bytes([b])))
3792 else:
3793 encoder = None
3794 def _decode_bytewise(s):
3795 # Decode one char at a time
3796 for c in s:
3797 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003798 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003799 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003800 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003801 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003802 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003803 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003804 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003805 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003806 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003807 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003808 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003809 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003810 input = "abc"
3811 if encoder is not None:
3812 encoder.reset()
3813 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003814 self.assertEqual(decoder.decode(input), "abc")
3815 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003816
3817 def test_newline_decoder(self):
3818 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003819 # None meaning the IncrementalNewlineDecoder takes unicode input
3820 # rather than bytes input
3821 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003822 'utf-16', 'utf-16-le', 'utf-16-be',
3823 'utf-32', 'utf-32-le', 'utf-32-be',
3824 )
3825 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003826 decoder = enc and codecs.getincrementaldecoder(enc)()
3827 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3828 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003829 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003830 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3831 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003832 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003833
Antoine Pitrou66913e22009-03-06 23:40:56 +00003834 def test_newline_bytes(self):
3835 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3836 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003837 self.assertEqual(dec.newlines, None)
3838 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3839 self.assertEqual(dec.newlines, None)
3840 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3841 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003842 dec = self.IncrementalNewlineDecoder(None, translate=False)
3843 _check(dec)
3844 dec = self.IncrementalNewlineDecoder(None, translate=True)
3845 _check(dec)
3846
Xiang Zhangb08746b2018-10-31 19:49:16 +08003847 def test_translate(self):
3848 # issue 35062
3849 for translate in (-2, -1, 1, 2):
3850 decoder = codecs.getincrementaldecoder("utf-8")()
3851 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3852 self.check_newline_decoding_utf8(decoder)
3853 decoder = codecs.getincrementaldecoder("utf-8")()
3854 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3855 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3856
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003857class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3858 pass
3859
3860class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3861 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003862
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003863
Guido van Rossum01a27522007-03-07 01:00:12 +00003864# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003865
Guido van Rossum5abbf752007-08-27 17:39:33 +00003866class MiscIOTest(unittest.TestCase):
3867
Barry Warsaw40e82462008-11-20 20:14:50 +00003868 def tearDown(self):
3869 support.unlink(support.TESTFN)
3870
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003871 def test___all__(self):
3872 for name in self.io.__all__:
3873 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003874 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003875 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003876 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003877 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003878 self.assertTrue(issubclass(obj, Exception), name)
3879 elif not name.startswith("SEEK_"):
3880 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003881
Barry Warsaw40e82462008-11-20 20:14:50 +00003882 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003883 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003884 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003885 f.close()
3886
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003887 with support.check_warnings(('', DeprecationWarning)):
3888 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003889 self.assertEqual(f.name, support.TESTFN)
3890 self.assertEqual(f.buffer.name, support.TESTFN)
3891 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3892 self.assertEqual(f.mode, "U")
3893 self.assertEqual(f.buffer.mode, "rb")
3894 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003895 f.close()
3896
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003897 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003898 self.assertEqual(f.mode, "w+")
3899 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3900 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003902 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003903 self.assertEqual(g.mode, "wb")
3904 self.assertEqual(g.raw.mode, "wb")
3905 self.assertEqual(g.name, f.fileno())
3906 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003907 f.close()
3908 g.close()
3909
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003910 def test_io_after_close(self):
3911 for kwargs in [
3912 {"mode": "w"},
3913 {"mode": "wb"},
3914 {"mode": "w", "buffering": 1},
3915 {"mode": "w", "buffering": 2},
3916 {"mode": "wb", "buffering": 0},
3917 {"mode": "r"},
3918 {"mode": "rb"},
3919 {"mode": "r", "buffering": 1},
3920 {"mode": "r", "buffering": 2},
3921 {"mode": "rb", "buffering": 0},
3922 {"mode": "w+"},
3923 {"mode": "w+b"},
3924 {"mode": "w+", "buffering": 1},
3925 {"mode": "w+", "buffering": 2},
3926 {"mode": "w+b", "buffering": 0},
3927 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003928 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003929 f.close()
3930 self.assertRaises(ValueError, f.flush)
3931 self.assertRaises(ValueError, f.fileno)
3932 self.assertRaises(ValueError, f.isatty)
3933 self.assertRaises(ValueError, f.__iter__)
3934 if hasattr(f, "peek"):
3935 self.assertRaises(ValueError, f.peek, 1)
3936 self.assertRaises(ValueError, f.read)
3937 if hasattr(f, "read1"):
3938 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003939 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003940 if hasattr(f, "readall"):
3941 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003942 if hasattr(f, "readinto"):
3943 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003944 if hasattr(f, "readinto1"):
3945 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003946 self.assertRaises(ValueError, f.readline)
3947 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003948 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003949 self.assertRaises(ValueError, f.seek, 0)
3950 self.assertRaises(ValueError, f.tell)
3951 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003952 self.assertRaises(ValueError, f.write,
3953 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003954 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003955 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003956
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003957 def test_blockingioerror(self):
3958 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003959 class C(str):
3960 pass
3961 c = C("")
3962 b = self.BlockingIOError(1, c)
3963 c.b = b
3964 b.c = c
3965 wr = weakref.ref(c)
3966 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003967 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003968 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003969
3970 def test_abcs(self):
3971 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003972 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3973 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3974 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3975 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003976
3977 def _check_abc_inheritance(self, abcmodule):
3978 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003979 self.assertIsInstance(f, abcmodule.IOBase)
3980 self.assertIsInstance(f, abcmodule.RawIOBase)
3981 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3982 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003983 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003984 self.assertIsInstance(f, abcmodule.IOBase)
3985 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3986 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3987 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003988 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003989 self.assertIsInstance(f, abcmodule.IOBase)
3990 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3991 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3992 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003993
3994 def test_abc_inheritance(self):
3995 # Test implementations inherit from their respective ABCs
3996 self._check_abc_inheritance(self)
3997
3998 def test_abc_inheritance_official(self):
3999 # Test implementations inherit from the official ABCs of the
4000 # baseline "io" module.
4001 self._check_abc_inheritance(io)
4002
Antoine Pitroue033e062010-10-29 10:38:18 +00004003 def _check_warn_on_dealloc(self, *args, **kwargs):
4004 f = open(*args, **kwargs)
4005 r = repr(f)
4006 with self.assertWarns(ResourceWarning) as cm:
4007 f = None
4008 support.gc_collect()
4009 self.assertIn(r, str(cm.warning.args[0]))
4010
4011 def test_warn_on_dealloc(self):
4012 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4013 self._check_warn_on_dealloc(support.TESTFN, "wb")
4014 self._check_warn_on_dealloc(support.TESTFN, "w")
4015
4016 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4017 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004018 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004019 for fd in fds:
4020 try:
4021 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004022 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004023 if e.errno != errno.EBADF:
4024 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004025 self.addCleanup(cleanup_fds)
4026 r, w = os.pipe()
4027 fds += r, w
4028 self._check_warn_on_dealloc(r, *args, **kwargs)
4029 # When using closefd=False, there's no warning
4030 r, w = os.pipe()
4031 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004032 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004033 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004034
4035 def test_warn_on_dealloc_fd(self):
4036 self._check_warn_on_dealloc_fd("rb", buffering=0)
4037 self._check_warn_on_dealloc_fd("rb")
4038 self._check_warn_on_dealloc_fd("r")
4039
4040
Antoine Pitrou243757e2010-11-05 21:15:39 +00004041 def test_pickling(self):
4042 # Pickling file objects is forbidden
4043 for kwargs in [
4044 {"mode": "w"},
4045 {"mode": "wb"},
4046 {"mode": "wb", "buffering": 0},
4047 {"mode": "r"},
4048 {"mode": "rb"},
4049 {"mode": "rb", "buffering": 0},
4050 {"mode": "w+"},
4051 {"mode": "w+b"},
4052 {"mode": "w+b", "buffering": 0},
4053 ]:
4054 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4055 with self.open(support.TESTFN, **kwargs) as f:
4056 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4057
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004058 def test_nonblock_pipe_write_bigbuf(self):
4059 self._test_nonblock_pipe_write(16*1024)
4060
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004061 def test_nonblock_pipe_write_smallbuf(self):
4062 self._test_nonblock_pipe_write(1024)
4063
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004064 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4065 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004066 def _test_nonblock_pipe_write(self, bufsize):
4067 sent = []
4068 received = []
4069 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004070 os.set_blocking(r, False)
4071 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004072
4073 # To exercise all code paths in the C implementation we need
4074 # to play with buffer sizes. For instance, if we choose a
4075 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4076 # then we will never get a partial write of the buffer.
4077 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4078 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4079
4080 with rf, wf:
4081 for N in 9999, 73, 7574:
4082 try:
4083 i = 0
4084 while True:
4085 msg = bytes([i % 26 + 97]) * N
4086 sent.append(msg)
4087 wf.write(msg)
4088 i += 1
4089
4090 except self.BlockingIOError as e:
4091 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004092 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004093 sent[-1] = sent[-1][:e.characters_written]
4094 received.append(rf.read())
4095 msg = b'BLOCKED'
4096 wf.write(msg)
4097 sent.append(msg)
4098
4099 while True:
4100 try:
4101 wf.flush()
4102 break
4103 except self.BlockingIOError as e:
4104 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004105 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004106 self.assertEqual(e.characters_written, 0)
4107 received.append(rf.read())
4108
4109 received += iter(rf.read, None)
4110
4111 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004112 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004113 self.assertTrue(wf.closed)
4114 self.assertTrue(rf.closed)
4115
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004116 def test_create_fail(self):
4117 # 'x' mode fails if file is existing
4118 with self.open(support.TESTFN, 'w'):
4119 pass
4120 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4121
4122 def test_create_writes(self):
4123 # 'x' mode opens for writing
4124 with self.open(support.TESTFN, 'xb') as f:
4125 f.write(b"spam")
4126 with self.open(support.TESTFN, 'rb') as f:
4127 self.assertEqual(b"spam", f.read())
4128
Christian Heimes7b648752012-09-10 14:48:43 +02004129 def test_open_allargs(self):
4130 # there used to be a buffer overflow in the parser for rawmode
4131 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4132
4133
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004134class CMiscIOTest(MiscIOTest):
4135 io = io
4136
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004137 def test_readinto_buffer_overflow(self):
4138 # Issue #18025
4139 class BadReader(self.io.BufferedIOBase):
4140 def read(self, n=-1):
4141 return b'x' * 10**6
4142 bufio = BadReader()
4143 b = bytearray(2)
4144 self.assertRaises(ValueError, bufio.readinto, b)
4145
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004146 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4147 # Issue #23309: deadlocks at shutdown should be avoided when a
4148 # daemon thread and the main thread both write to a file.
4149 code = """if 1:
4150 import sys
4151 import time
4152 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004153 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004154
4155 file = sys.{stream_name}
4156
4157 def run():
4158 while True:
4159 file.write('.')
4160 file.flush()
4161
Victor Stinner2a1aed02017-04-21 17:59:23 +02004162 crash = SuppressCrashReport()
4163 crash.__enter__()
4164 # don't call __exit__(): the crash occurs at Python shutdown
4165
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004166 thread = threading.Thread(target=run)
4167 thread.daemon = True
4168 thread.start()
4169
4170 time.sleep(0.5)
4171 file.write('!')
4172 file.flush()
4173 """.format_map(locals())
4174 res, _ = run_python_until_end("-c", code)
4175 err = res.err.decode()
4176 if res.rc != 0:
4177 # Failure: should be a fatal error
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004178 pattern = (r"Fatal Python error: could not acquire lock "
4179 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4180 r"at interpreter shutdown, possibly due to "
4181 r"daemon threads".format_map(locals()))
4182 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004183 else:
4184 self.assertFalse(err.strip('.!'))
4185
4186 def test_daemon_threads_shutdown_stdout_deadlock(self):
4187 self.check_daemon_threads_shutdown_deadlock('stdout')
4188
4189 def test_daemon_threads_shutdown_stderr_deadlock(self):
4190 self.check_daemon_threads_shutdown_deadlock('stderr')
4191
4192
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004193class PyMiscIOTest(MiscIOTest):
4194 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004195
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004196
4197@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4198class SignalsTest(unittest.TestCase):
4199
4200 def setUp(self):
4201 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4202
4203 def tearDown(self):
4204 signal.signal(signal.SIGALRM, self.oldalrm)
4205
4206 def alarm_interrupt(self, sig, frame):
4207 1/0
4208
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004209 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4210 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004211 invokes the signal handler, and bubbles up the exception raised
4212 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004213 read_results = []
4214 def _read():
4215 s = os.read(r, 1)
4216 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004217
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004218 t = threading.Thread(target=_read)
4219 t.daemon = True
4220 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004221 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004222 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004223 try:
4224 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004225 if hasattr(signal, 'pthread_sigmask'):
4226 # create the thread with SIGALRM signal blocked
4227 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4228 t.start()
4229 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4230 else:
4231 t.start()
4232
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004233 # Fill the pipe enough that the write will be blocking.
4234 # It will be interrupted by the timer armed above. Since the
4235 # other thread has read one byte, the low-level write will
4236 # return with a successful (partial) result rather than an EINTR.
4237 # The buffered IO layer must check for pending signal
4238 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004239 signal.alarm(1)
4240 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004241 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004242 finally:
4243 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004244 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004245 # We got one byte, get another one and check that it isn't a
4246 # repeat of the first one.
4247 read_results.append(os.read(r, 1))
4248 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4249 finally:
4250 os.close(w)
4251 os.close(r)
4252 # This is deliberate. If we didn't close the file descriptor
4253 # before closing wio, wio would try to flush its internal
4254 # buffer, and block again.
4255 try:
4256 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004257 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004258 if e.errno != errno.EBADF:
4259 raise
4260
4261 def test_interrupted_write_unbuffered(self):
4262 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4263
4264 def test_interrupted_write_buffered(self):
4265 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4266
4267 def test_interrupted_write_text(self):
4268 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4269
Brett Cannon31f59292011-02-21 19:29:56 +00004270 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004271 def check_reentrant_write(self, data, **fdopen_kwargs):
4272 def on_alarm(*args):
4273 # Will be called reentrantly from the same thread
4274 wio.write(data)
4275 1/0
4276 signal.signal(signal.SIGALRM, on_alarm)
4277 r, w = os.pipe()
4278 wio = self.io.open(w, **fdopen_kwargs)
4279 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004280 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004281 # Either the reentrant call to wio.write() fails with RuntimeError,
4282 # or the signal handler raises ZeroDivisionError.
4283 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4284 while 1:
4285 for i in range(100):
4286 wio.write(data)
4287 wio.flush()
4288 # Make sure the buffer doesn't fill up and block further writes
4289 os.read(r, len(data) * 100)
4290 exc = cm.exception
4291 if isinstance(exc, RuntimeError):
4292 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4293 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004294 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004295 wio.close()
4296 os.close(r)
4297
4298 def test_reentrant_write_buffered(self):
4299 self.check_reentrant_write(b"xy", mode="wb")
4300
4301 def test_reentrant_write_text(self):
4302 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4303
Antoine Pitrou707ce822011-02-25 21:24:11 +00004304 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4305 """Check that a buffered read, when it gets interrupted (either
4306 returning a partial result or EINTR), properly invokes the signal
4307 handler and retries if the latter returned successfully."""
4308 r, w = os.pipe()
4309 fdopen_kwargs["closefd"] = False
4310 def alarm_handler(sig, frame):
4311 os.write(w, b"bar")
4312 signal.signal(signal.SIGALRM, alarm_handler)
4313 try:
4314 rio = self.io.open(r, **fdopen_kwargs)
4315 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004316 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004317 # Expected behaviour:
4318 # - first raw read() returns partial b"foo"
4319 # - second raw read() returns EINTR
4320 # - third raw read() returns b"bar"
4321 self.assertEqual(decode(rio.read(6)), "foobar")
4322 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004323 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004324 rio.close()
4325 os.close(w)
4326 os.close(r)
4327
Antoine Pitrou20db5112011-08-19 20:32:34 +02004328 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004329 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4330 mode="rb")
4331
Antoine Pitrou20db5112011-08-19 20:32:34 +02004332 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004333 self.check_interrupted_read_retry(lambda x: x,
4334 mode="r")
4335
Antoine Pitrou707ce822011-02-25 21:24:11 +00004336 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4337 """Check that a buffered write, when it gets interrupted (either
4338 returning a partial result or EINTR), properly invokes the signal
4339 handler and retries if the latter returned successfully."""
4340 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004341
Antoine Pitrou707ce822011-02-25 21:24:11 +00004342 # A quantity that exceeds the buffer size of an anonymous pipe's
4343 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004344 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004345 r, w = os.pipe()
4346 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004347
Antoine Pitrou707ce822011-02-25 21:24:11 +00004348 # We need a separate thread to read from the pipe and allow the
4349 # write() to finish. This thread is started after the SIGALRM is
4350 # received (forcing a first EINTR in write()).
4351 read_results = []
4352 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004353 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004354 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004355 try:
4356 while not write_finished:
4357 while r in select.select([r], [], [], 1.0)[0]:
4358 s = os.read(r, 1024)
4359 read_results.append(s)
4360 except BaseException as exc:
4361 nonlocal error
4362 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004363 t = threading.Thread(target=_read)
4364 t.daemon = True
4365 def alarm1(sig, frame):
4366 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004367 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004368 def alarm2(sig, frame):
4369 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004370
4371 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004372 signal.signal(signal.SIGALRM, alarm1)
4373 try:
4374 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004375 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004376 # Expected behaviour:
4377 # - first raw write() is partial (because of the limited pipe buffer
4378 # and the first alarm)
4379 # - second raw write() returns EINTR (because of the second alarm)
4380 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004381 written = wio.write(large_data)
4382 self.assertEqual(N, written)
4383
Antoine Pitrou707ce822011-02-25 21:24:11 +00004384 wio.flush()
4385 write_finished = True
4386 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004387
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004388 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004389 self.assertEqual(N, sum(len(x) for x in read_results))
4390 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004391 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004392 write_finished = True
4393 os.close(w)
4394 os.close(r)
4395 # This is deliberate. If we didn't close the file descriptor
4396 # before closing wio, wio would try to flush its internal
4397 # buffer, and could block (in case of failure).
4398 try:
4399 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004400 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004401 if e.errno != errno.EBADF:
4402 raise
4403
Antoine Pitrou20db5112011-08-19 20:32:34 +02004404 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004405 self.check_interrupted_write_retry(b"x", mode="wb")
4406
Antoine Pitrou20db5112011-08-19 20:32:34 +02004407 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004408 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4409
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004410
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004411class CSignalsTest(SignalsTest):
4412 io = io
4413
4414class PySignalsTest(SignalsTest):
4415 io = pyio
4416
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004417 # Handling reentrancy issues would slow down _pyio even more, so the
4418 # tests are disabled.
4419 test_reentrant_write_buffered = None
4420 test_reentrant_write_text = None
4421
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004422
Ezio Melottidaa42c72013-03-23 16:30:16 +02004423def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004424 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004425 CBufferedReaderTest, PyBufferedReaderTest,
4426 CBufferedWriterTest, PyBufferedWriterTest,
4427 CBufferedRWPairTest, PyBufferedRWPairTest,
4428 CBufferedRandomTest, PyBufferedRandomTest,
4429 StatefulIncrementalDecoderTest,
4430 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4431 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004432 CMiscIOTest, PyMiscIOTest,
4433 CSignalsTest, PySignalsTest,
4434 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004435
4436 # Put the namespaces of the IO module we are testing and some useful mock
4437 # classes in the __dict__ of each test.
4438 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004439 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4440 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004441 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4442 c_io_ns = {name : getattr(io, name) for name in all_members}
4443 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4444 globs = globals()
4445 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4446 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4447 # Avoid turning open into a bound method.
4448 py_io_ns["open"] = pyio.OpenWrapper
4449 for test in tests:
4450 if test.__name__.startswith("C"):
4451 for name, obj in c_io_ns.items():
4452 setattr(test, name, obj)
4453 elif test.__name__.startswith("Py"):
4454 for name, obj in py_io_ns.items():
4455 setattr(test, name, obj)
4456
Ezio Melottidaa42c72013-03-23 16:30:16 +02004457 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4458 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004459
4460if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004461 unittest.main()