blob: 55686d743983556a48e3fe29abe301a6bd06d0d8 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000033import time
Guido van Rossum28524c72007-02-27 05:47:44 +000034import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000035import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020037from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020038from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000039from test import support
Berker Peksagce643912015-05-06 06:33:17 +030040from test.support.script_helper import assert_python_ok, run_python_until_end
Serhiy Storchakab21d1552018-03-02 11:53:51 +020041from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000042
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000043import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044import io # C implementation of io
45import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Martin Panter6bb91f32016-05-28 00:41:57 +000047try:
48 import ctypes
49except ImportError:
50 def byteslike(*pos, **kw):
51 return array.array("b", bytes(*pos, **kw))
52else:
53 def byteslike(*pos, **kw):
54 """Create a bytes-like object having no string or sequence methods"""
55 data = bytes(*pos, **kw)
56 obj = EmptyStruct()
57 ctypes.resize(obj, len(data))
58 memoryview(obj).cast("B")[:] = data
59 return obj
60 class EmptyStruct(ctypes.Structure):
61 pass
62
Gregory P. Smithe5796c42018-12-30 20:17:57 -080063_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66 '-fsanitize=memory' in _cflags or
67 '--with-memory-sanitizer' in _config_args
68)
69
Victor Stinnerbc2aa812019-05-23 03:45:09 +020070# Does io.IOBase finalizer log the exception if the close() method fails?
71# The exception is ignored silently by default in release build.
72IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020073
74
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000075def _default_chunk_size():
76 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000077 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return f._CHUNK_SIZE
79
80
Antoine Pitrou328ec742010-09-14 18:37:24 +000081class MockRawIOWithoutRead:
82 """A RawIO implementation without read(), so as to exercise the default
83 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000084
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000085 def __init__(self, read_stack=()):
86 self._read_stack = list(read_stack)
87 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000089 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000090
Guido van Rossum01a27522007-03-07 01:00:12 +000091 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000093 return len(b)
94
95 def writable(self):
96 return True
97
Guido van Rossum68bbcd22007-02-27 17:19:33 +000098 def fileno(self):
99 return 42
100
101 def readable(self):
102 return True
103
Guido van Rossum01a27522007-03-07 01:00:12 +0000104 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000105 return True
106
Guido van Rossum01a27522007-03-07 01:00:12 +0000107 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000108 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000109
110 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000111 return 0 # same comment as above
112
113 def readinto(self, buf):
114 self._reads += 1
115 max_len = len(buf)
116 try:
117 data = self._read_stack[0]
118 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000119 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000120 return 0
121 if data is None:
122 del self._read_stack[0]
123 return None
124 n = len(data)
125 if len(data) <= max_len:
126 del self._read_stack[0]
127 buf[:n] = data
128 return n
129 else:
130 buf[:] = data[:max_len]
131 self._read_stack[0] = data[max_len:]
132 return max_len
133
134 def truncate(self, pos=None):
135 return pos
136
Antoine Pitrou328ec742010-09-14 18:37:24 +0000137class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
138 pass
139
140class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
141 pass
142
143
144class MockRawIO(MockRawIOWithoutRead):
145
146 def read(self, n=None):
147 self._reads += 1
148 try:
149 return self._read_stack.pop(0)
150 except:
151 self._extraneous_reads += 1
152 return b""
153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000154class CMockRawIO(MockRawIO, io.RawIOBase):
155 pass
156
157class PyMockRawIO(MockRawIO, pyio.RawIOBase):
158 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000159
Guido van Rossuma9e20242007-03-08 00:43:48 +0000160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000161class MisbehavedRawIO(MockRawIO):
162 def write(self, b):
163 return super().write(b) * 2
164
165 def read(self, n=None):
166 return super().read(n) * 2
167
168 def seek(self, pos, whence):
169 return -123
170
171 def tell(self):
172 return -456
173
174 def readinto(self, buf):
175 super().readinto(buf)
176 return len(buf) * 5
177
178class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
179 pass
180
181class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
182 pass
183
184
benfogle9703f092017-11-10 16:03:40 -0500185class SlowFlushRawIO(MockRawIO):
186 def __init__(self):
187 super().__init__()
188 self.in_flush = threading.Event()
189
190 def flush(self):
191 self.in_flush.set()
192 time.sleep(0.25)
193
194class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
195 pass
196
197class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
198 pass
199
200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000201class CloseFailureIO(MockRawIO):
202 closed = 0
203
204 def close(self):
205 if not self.closed:
206 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200207 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208
209class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
210 pass
211
212class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
213 pass
214
215
216class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000217
218 def __init__(self, data):
219 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000221
222 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000224 self.read_history.append(None if res is None else len(res))
225 return res
226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000227 def readinto(self, b):
228 res = super().readinto(b)
229 self.read_history.append(res)
230 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class CMockFileIO(MockFileIO, io.BytesIO):
233 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235class PyMockFileIO(MockFileIO, pyio.BytesIO):
236 pass
237
238
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000239class MockUnseekableIO:
240 def seekable(self):
241 return False
242
243 def seek(self, *args):
244 raise self.UnsupportedOperation("not seekable")
245
246 def tell(self, *args):
247 raise self.UnsupportedOperation("not seekable")
248
Martin Panter754aab22016-03-31 07:21:56 +0000249 def truncate(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000252class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
253 UnsupportedOperation = io.UnsupportedOperation
254
255class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
256 UnsupportedOperation = pyio.UnsupportedOperation
257
258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000259class MockNonBlockWriterIO:
260
261 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000262 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000263 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265 def pop_written(self):
266 s = b"".join(self._write_stack)
267 self._write_stack[:] = []
268 return s
269
270 def block_on(self, char):
271 """Block when a given char is encountered."""
272 self._blocker_char = char
273
274 def readable(self):
275 return True
276
277 def seekable(self):
278 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000279
Victor Stinnerb589cef2019-06-11 03:10:59 +0200280 def seek(self, pos, whence=0):
281 # naive implementation, enough for tests
282 return 0
283
Guido van Rossum01a27522007-03-07 01:00:12 +0000284 def writable(self):
285 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000287 def write(self, b):
288 b = bytes(b)
289 n = -1
290 if self._blocker_char:
291 try:
292 n = b.index(self._blocker_char)
293 except ValueError:
294 pass
295 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100296 if n > 0:
297 # write data up to the first blocker
298 self._write_stack.append(b[:n])
299 return n
300 else:
301 # cancel blocker and indicate would block
302 self._blocker_char = None
303 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000304 self._write_stack.append(b)
305 return len(b)
306
307class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
308 BlockingIOError = io.BlockingIOError
309
310class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
311 BlockingIOError = pyio.BlockingIOError
312
Guido van Rossuma9e20242007-03-08 00:43:48 +0000313
Guido van Rossum28524c72007-02-27 05:47:44 +0000314class IOTest(unittest.TestCase):
315
Neal Norwitze7789b12008-03-24 06:18:09 +0000316 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000317 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000318
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000319 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000320 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000321
Guido van Rossum28524c72007-02-27 05:47:44 +0000322 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000323 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000324 f.truncate(0)
325 self.assertEqual(f.tell(), 5)
326 f.seek(0)
327
328 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.seek(0), 0)
330 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000331 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000334 buffer = bytearray(b" world\n\n\n")
335 self.assertEqual(f.write(buffer), 9)
336 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000338 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.seek(-1, 2), 13)
340 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000341
Guido van Rossum87429772007-04-10 21:06:59 +0000342 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000343 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000344 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000345
Guido van Rossum9b76da62007-04-11 01:09:03 +0000346 def read_ops(self, f, buffered=False):
347 data = f.read(5)
348 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000349 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000351 self.assertEqual(bytes(data), b" worl")
352 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000353 self.assertEqual(f.readinto(data), 2)
354 self.assertEqual(len(data), 5)
355 self.assertEqual(data[:2], b"d\n")
356 self.assertEqual(f.seek(0), 0)
357 self.assertEqual(f.read(20), b"hello world\n")
358 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000359 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360 self.assertEqual(f.seek(-6, 2), 6)
361 self.assertEqual(f.read(5), b"world")
362 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000363 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000364 self.assertEqual(f.seek(-6, 1), 5)
365 self.assertEqual(f.read(5), b" worl")
366 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000367 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000368 if buffered:
369 f.seek(0)
370 self.assertEqual(f.read(), b"hello world\n")
371 f.seek(6)
372 self.assertEqual(f.read(), b"world\n")
373 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000374 f.seek(0)
375 data = byteslike(5)
376 self.assertEqual(f.readinto1(data), 5)
377 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000378
Guido van Rossum34d69e52007-04-10 20:08:41 +0000379 LARGE = 2**31
380
Guido van Rossum53807da2007-04-10 19:01:47 +0000381 def large_file_ops(self, f):
382 assert f.readable()
383 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100384 try:
385 self.assertEqual(f.seek(self.LARGE), self.LARGE)
386 except (OverflowError, ValueError):
387 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000388 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000389 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000390 self.assertEqual(f.tell(), self.LARGE + 3)
391 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000392 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000393 self.assertEqual(f.tell(), self.LARGE + 2)
394 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000395 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000396 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000397 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
398 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000399 self.assertEqual(f.read(2), b"x")
400
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000401 def test_invalid_operations(self):
402 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000403 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000404 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000405 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000406 self.assertRaises(exc, fp.read)
407 self.assertRaises(exc, fp.readline)
408 with self.open(support.TESTFN, "wb", buffering=0) as fp:
409 self.assertRaises(exc, fp.read)
410 self.assertRaises(exc, fp.readline)
411 with self.open(support.TESTFN, "rb", buffering=0) as fp:
412 self.assertRaises(exc, fp.write, b"blah")
413 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000414 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000415 self.assertRaises(exc, fp.write, b"blah")
416 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000417 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, "blah")
419 self.assertRaises(exc, fp.writelines, ["blah\n"])
420 # Non-zero seeking from current or end pos
421 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
422 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000423
Martin Panter754aab22016-03-31 07:21:56 +0000424 def test_optional_abilities(self):
425 # Test for OSError when optional APIs are not supported
426 # The purpose of this test is to try fileno(), reading, writing and
427 # seeking operations with various objects that indicate they do not
428 # support these operations.
429
430 def pipe_reader():
431 [r, w] = os.pipe()
432 os.close(w) # So that read() is harmless
433 return self.FileIO(r, "r")
434
435 def pipe_writer():
436 [r, w] = os.pipe()
437 self.addCleanup(os.close, r)
438 # Guarantee that we can write into the pipe without blocking
439 thread = threading.Thread(target=os.read, args=(r, 100))
440 thread.start()
441 self.addCleanup(thread.join)
442 return self.FileIO(w, "w")
443
444 def buffered_reader():
445 return self.BufferedReader(self.MockUnseekableIO())
446
447 def buffered_writer():
448 return self.BufferedWriter(self.MockUnseekableIO())
449
450 def buffered_random():
451 return self.BufferedRandom(self.BytesIO())
452
453 def buffered_rw_pair():
454 return self.BufferedRWPair(self.MockUnseekableIO(),
455 self.MockUnseekableIO())
456
457 def text_reader():
458 class UnseekableReader(self.MockUnseekableIO):
459 writable = self.BufferedIOBase.writable
460 write = self.BufferedIOBase.write
461 return self.TextIOWrapper(UnseekableReader(), "ascii")
462
463 def text_writer():
464 class UnseekableWriter(self.MockUnseekableIO):
465 readable = self.BufferedIOBase.readable
466 read = self.BufferedIOBase.read
467 return self.TextIOWrapper(UnseekableWriter(), "ascii")
468
469 tests = (
470 (pipe_reader, "fr"), (pipe_writer, "fw"),
471 (buffered_reader, "r"), (buffered_writer, "w"),
472 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
473 (text_reader, "r"), (text_writer, "w"),
474 (self.BytesIO, "rws"), (self.StringIO, "rws"),
475 )
476 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000477 with self.subTest(test), test() as obj:
478 readable = "r" in abilities
479 self.assertEqual(obj.readable(), readable)
480 writable = "w" in abilities
481 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000482
483 if isinstance(obj, self.TextIOBase):
484 data = "3"
485 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
486 data = b"3"
487 else:
488 self.fail("Unknown base class")
489
490 if "f" in abilities:
491 obj.fileno()
492 else:
493 self.assertRaises(OSError, obj.fileno)
494
495 if readable:
496 obj.read(1)
497 obj.read()
498 else:
499 self.assertRaises(OSError, obj.read, 1)
500 self.assertRaises(OSError, obj.read)
501
502 if writable:
503 obj.write(data)
504 else:
505 self.assertRaises(OSError, obj.write, data)
506
Martin Panter3ee147f2016-03-31 21:05:31 +0000507 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000508 pipe_reader, pipe_writer):
509 # Pipes seem to appear as seekable on Windows
510 continue
511 seekable = "s" in abilities
512 self.assertEqual(obj.seekable(), seekable)
513
Martin Panter754aab22016-03-31 07:21:56 +0000514 if seekable:
515 obj.tell()
516 obj.seek(0)
517 else:
518 self.assertRaises(OSError, obj.tell)
519 self.assertRaises(OSError, obj.seek, 0)
520
521 if writable and seekable:
522 obj.truncate()
523 obj.truncate(0)
524 else:
525 self.assertRaises(OSError, obj.truncate)
526 self.assertRaises(OSError, obj.truncate, 0)
527
Antoine Pitrou13348842012-01-29 18:36:34 +0100528 def test_open_handles_NUL_chars(self):
529 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300530 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100531
532 bytes_fn = bytes(fn_with_NUL, 'ascii')
533 with warnings.catch_warnings():
534 warnings.simplefilter("ignore", DeprecationWarning)
535 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100536
Guido van Rossum28524c72007-02-27 05:47:44 +0000537 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", buffering=0) as f:
539 self.assertEqual(f.readable(), False)
540 self.assertEqual(f.writable(), True)
541 self.assertEqual(f.seekable(), True)
542 self.write_ops(f)
543 with self.open(support.TESTFN, "rb", buffering=0) as f:
544 self.assertEqual(f.readable(), True)
545 self.assertEqual(f.writable(), False)
546 self.assertEqual(f.seekable(), True)
547 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000548
Guido van Rossum87429772007-04-10 21:06:59 +0000549 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000550 with self.open(support.TESTFN, "wb") as f:
551 self.assertEqual(f.readable(), False)
552 self.assertEqual(f.writable(), True)
553 self.assertEqual(f.seekable(), True)
554 self.write_ops(f)
555 with self.open(support.TESTFN, "rb") as f:
556 self.assertEqual(f.readable(), True)
557 self.assertEqual(f.writable(), False)
558 self.assertEqual(f.seekable(), True)
559 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000560
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000561 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000562 with self.open(support.TESTFN, "wb") as f:
563 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
564 with self.open(support.TESTFN, "rb") as f:
565 self.assertEqual(f.readline(), b"abc\n")
566 self.assertEqual(f.readline(10), b"def\n")
567 self.assertEqual(f.readline(2), b"xy")
568 self.assertEqual(f.readline(4), b"zzy\n")
569 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000570 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000571 self.assertRaises(TypeError, f.readline, 5.3)
572 with self.open(support.TESTFN, "r") as f:
573 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000574
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300575 def test_readline_nonsizeable(self):
576 # Issue #30061
577 # Crash when readline() returns an object without __len__
578 class R(self.IOBase):
579 def readline(self):
580 return None
581 self.assertRaises((TypeError, StopIteration), next, R())
582
583 def test_next_nonsizeable(self):
584 # Issue #30061
585 # Crash when __next__() returns an object without __len__
586 class R(self.IOBase):
587 def __next__(self):
588 return None
589 self.assertRaises(TypeError, R().readlines, 1)
590
Guido van Rossum28524c72007-02-27 05:47:44 +0000591 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000592 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000593 self.write_ops(f)
594 data = f.getvalue()
595 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000596 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000597 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000598
Guido van Rossum53807da2007-04-10 19:01:47 +0000599 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300600 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800601 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000602 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200603 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600604 support.requires(
605 'largefile',
606 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000607 with self.open(support.TESTFN, "w+b", 0) as f:
608 self.large_file_ops(f)
609 with self.open(support.TESTFN, "w+b") as f:
610 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000611
612 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300613 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000614 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000615 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000616 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000617 self.assertEqual(f.closed, True)
618 f = None
619 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000620 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000621 1/0
622 except ZeroDivisionError:
623 self.assertEqual(f.closed, True)
624 else:
625 self.fail("1/0 didn't raise an exception")
626
Antoine Pitrou08838b62009-01-21 00:55:13 +0000627 # issue 5008
628 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000630 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000632 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000634 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300636 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000637
Guido van Rossum87429772007-04-10 21:06:59 +0000638 def test_destructor(self):
639 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000640 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000641 def __del__(self):
642 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000643 try:
644 f = super().__del__
645 except AttributeError:
646 pass
647 else:
648 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000649 def close(self):
650 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000651 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000652 def flush(self):
653 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000655 with support.check_warnings(('', ResourceWarning)):
656 f = MyFileIO(support.TESTFN, "wb")
657 f.write(b"xxx")
658 del f
659 support.gc_collect()
660 self.assertEqual(record, [1, 2, 3])
661 with self.open(support.TESTFN, "rb") as f:
662 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000663
664 def _check_base_destructor(self, base):
665 record = []
666 class MyIO(base):
667 def __init__(self):
668 # This exercises the availability of attributes on object
669 # destruction.
670 # (in the C version, close() is called by the tp_dealloc
671 # function, not by __del__)
672 self.on_del = 1
673 self.on_close = 2
674 self.on_flush = 3
675 def __del__(self):
676 record.append(self.on_del)
677 try:
678 f = super().__del__
679 except AttributeError:
680 pass
681 else:
682 f()
683 def close(self):
684 record.append(self.on_close)
685 super().close()
686 def flush(self):
687 record.append(self.on_flush)
688 super().flush()
689 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000690 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000691 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000692 self.assertEqual(record, [1, 2, 3])
693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 def test_IOBase_destructor(self):
695 self._check_base_destructor(self.IOBase)
696
697 def test_RawIOBase_destructor(self):
698 self._check_base_destructor(self.RawIOBase)
699
700 def test_BufferedIOBase_destructor(self):
701 self._check_base_destructor(self.BufferedIOBase)
702
703 def test_TextIOBase_destructor(self):
704 self._check_base_destructor(self.TextIOBase)
705
Guido van Rossum87429772007-04-10 21:06:59 +0000706 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000707 with self.open(support.TESTFN, "wb") as f:
708 f.write(b"xxx")
709 with self.open(support.TESTFN, "rb") as f:
710 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000711
Guido van Rossumd4103952007-04-12 05:44:49 +0000712 def test_array_writes(self):
713 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000714 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000715 def check(f):
716 with f:
717 self.assertEqual(f.write(a), n)
718 f.writelines((a,))
719 check(self.BytesIO())
720 check(self.FileIO(support.TESTFN, "w"))
721 check(self.BufferedWriter(self.MockRawIO()))
722 check(self.BufferedRandom(self.MockRawIO()))
723 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000724
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000725 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000727 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 def test_read_closed(self):
730 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000731 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 with self.open(support.TESTFN, "r") as f:
733 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000734 self.assertEqual(file.read(), "egg\n")
735 file.seek(0)
736 file.close()
737 self.assertRaises(ValueError, file.read)
738
739 def test_no_closefd_with_filename(self):
740 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000742
743 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000745 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000747 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000749 self.assertEqual(file.buffer.raw.closefd, False)
750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 def test_garbage_collection(self):
752 # FileIO objects are collected, and collecting them flushes
753 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000754 with support.check_warnings(('', ResourceWarning)):
755 f = self.FileIO(support.TESTFN, "wb")
756 f.write(b"abcxxx")
757 f.f = f
758 wr = weakref.ref(f)
759 del f
760 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300761 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000762 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000763 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000764
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000765 def test_unbounded_file(self):
766 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
767 zero = "/dev/zero"
768 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000769 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000770 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000771 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000772 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800773 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000774 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000775 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000776 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000777 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000778 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000779 self.assertRaises(OverflowError, f.read)
780
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200781 def check_flush_error_on_close(self, *args, **kwargs):
782 # Test that the file is closed despite failed flush
783 # and that flush() is called before file closed.
784 f = self.open(*args, **kwargs)
785 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000786 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200787 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200788 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000789 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200790 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600791 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200792 self.assertTrue(closed) # flush() called
793 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200794 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200795
796 def test_flush_error_on_close(self):
797 # raw file
798 # Issue #5700: io.FileIO calls flush() after file closed
799 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
800 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
801 self.check_flush_error_on_close(fd, 'wb', buffering=0)
802 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
803 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
804 os.close(fd)
805 # buffered io
806 self.check_flush_error_on_close(support.TESTFN, 'wb')
807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808 self.check_flush_error_on_close(fd, 'wb')
809 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
810 self.check_flush_error_on_close(fd, 'wb', closefd=False)
811 os.close(fd)
812 # text io
813 self.check_flush_error_on_close(support.TESTFN, 'w')
814 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
815 self.check_flush_error_on_close(fd, 'w')
816 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
817 self.check_flush_error_on_close(fd, 'w', closefd=False)
818 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000819
820 def test_multi_close(self):
821 f = self.open(support.TESTFN, "wb", buffering=0)
822 f.close()
823 f.close()
824 f.close()
825 self.assertRaises(ValueError, f.flush)
826
Antoine Pitrou328ec742010-09-14 18:37:24 +0000827 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530828 # Exercise the default limited RawIOBase.read(n) implementation (which
829 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000830 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
831 self.assertEqual(rawio.read(2), b"ab")
832 self.assertEqual(rawio.read(2), b"c")
833 self.assertEqual(rawio.read(2), b"d")
834 self.assertEqual(rawio.read(2), None)
835 self.assertEqual(rawio.read(2), b"ef")
836 self.assertEqual(rawio.read(2), b"g")
837 self.assertEqual(rawio.read(2), None)
838 self.assertEqual(rawio.read(2), b"")
839
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400840 def test_types_have_dict(self):
841 test = (
842 self.IOBase(),
843 self.RawIOBase(),
844 self.TextIOBase(),
845 self.StringIO(),
846 self.BytesIO()
847 )
848 for obj in test:
849 self.assertTrue(hasattr(obj, "__dict__"))
850
Ross Lagerwall59142db2011-10-31 20:34:46 +0200851 def test_opener(self):
852 with self.open(support.TESTFN, "w") as f:
853 f.write("egg\n")
854 fd = os.open(support.TESTFN, os.O_RDONLY)
855 def opener(path, flags):
856 return fd
857 with self.open("non-existent", "r", opener=opener) as f:
858 self.assertEqual(f.read(), "egg\n")
859
Barry Warsaw480e2852016-06-08 17:47:26 -0400860 def test_bad_opener_negative_1(self):
861 # Issue #27066.
862 def badopener(fname, flags):
863 return -1
864 with self.assertRaises(ValueError) as cm:
865 open('non-existent', 'r', opener=badopener)
866 self.assertEqual(str(cm.exception), 'opener returned -1')
867
868 def test_bad_opener_other_negative(self):
869 # Issue #27066.
870 def badopener(fname, flags):
871 return -2
872 with self.assertRaises(ValueError) as cm:
873 open('non-existent', 'r', opener=badopener)
874 self.assertEqual(str(cm.exception), 'opener returned -2')
875
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200876 def test_fileio_closefd(self):
877 # Issue #4841
878 with self.open(__file__, 'rb') as f1, \
879 self.open(__file__, 'rb') as f2:
880 fileio = self.FileIO(f1.fileno(), closefd=False)
881 # .__init__() must not close f1
882 fileio.__init__(f2.fileno(), closefd=False)
883 f1.readline()
884 # .close() must not close f2
885 fileio.close()
886 f2.readline()
887
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300888 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200889 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300890 with self.assertRaises(ValueError):
891 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300892
893 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200894 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300895 with self.assertRaises(ValueError):
896 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300897
Martin Panter6bb91f32016-05-28 00:41:57 +0000898 def test_buffered_readinto_mixin(self):
899 # Test the implementation provided by BufferedIOBase
900 class Stream(self.BufferedIOBase):
901 def read(self, size):
902 return b"12345"
903 read1 = read
904 stream = Stream()
905 for method in ("readinto", "readinto1"):
906 with self.subTest(method):
907 buffer = byteslike(5)
908 self.assertEqual(getattr(stream, method)(buffer), 5)
909 self.assertEqual(bytes(buffer), b"12345")
910
Ethan Furmand62548a2016-06-04 14:38:43 -0700911 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700912 def check_path_succeeds(path):
913 with self.open(path, "w") as f:
914 f.write("egg\n")
915
916 with self.open(path, "r") as f:
917 self.assertEqual(f.read(), "egg\n")
918
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200919 check_path_succeeds(FakePath(support.TESTFN))
920 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700921
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200922 with self.open(support.TESTFN, "w") as f:
923 bad_path = FakePath(f.fileno())
924 with self.assertRaises(TypeError):
925 self.open(bad_path, 'w')
926
927 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700928 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700929 self.open(bad_path, 'w')
930
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200931 bad_path = FakePath(FloatingPointError)
932 with self.assertRaises(FloatingPointError):
933 self.open(bad_path, 'w')
934
Ethan Furmand62548a2016-06-04 14:38:43 -0700935 # ensure that refcounting is correct with some error conditions
936 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200937 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700938
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530939 def test_RawIOBase_readall(self):
940 # Exercise the default unlimited RawIOBase.read() and readall()
941 # implementations.
942 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
943 self.assertEqual(rawio.read(), b"abcdefg")
944 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
945 self.assertEqual(rawio.readall(), b"abcdefg")
946
947 def test_BufferedIOBase_readinto(self):
948 # Exercise the default BufferedIOBase.readinto() and readinto1()
949 # implementations (which call read() or read1() internally).
950 class Reader(self.BufferedIOBase):
951 def __init__(self, avail):
952 self.avail = avail
953 def read(self, size):
954 result = self.avail[:size]
955 self.avail = self.avail[size:]
956 return result
957 def read1(self, size):
958 """Returns no more than 5 bytes at once"""
959 return self.read(min(size, 5))
960 tests = (
961 # (test method, total data available, read buffer size, expected
962 # read size)
963 ("readinto", 10, 5, 5),
964 ("readinto", 10, 6, 6), # More than read1() can return
965 ("readinto", 5, 6, 5), # Buffer larger than total available
966 ("readinto", 6, 7, 6),
967 ("readinto", 10, 0, 0), # Empty buffer
968 ("readinto1", 10, 5, 5), # Result limited to single read1() call
969 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
970 ("readinto1", 5, 6, 5), # Buffer larger than total available
971 ("readinto1", 6, 7, 5),
972 ("readinto1", 10, 0, 0), # Empty buffer
973 )
974 UNUSED_BYTE = 0x81
975 for test in tests:
976 with self.subTest(test):
977 method, avail, request, result = test
978 reader = Reader(bytes(range(avail)))
979 buffer = bytearray((UNUSED_BYTE,) * request)
980 method = getattr(reader, method)
981 self.assertEqual(method(buffer), result)
982 self.assertEqual(len(buffer), request)
983 self.assertSequenceEqual(buffer[:result], range(result))
984 unused = (UNUSED_BYTE,) * (request - result)
985 self.assertSequenceEqual(buffer[result:], unused)
986 self.assertEqual(len(reader.avail), avail - result)
987
Zackery Spytz28f07362018-07-17 00:31:44 -0600988 def test_close_assert(self):
989 class R(self.IOBase):
990 def __setattr__(self, name, value):
991 pass
992 def flush(self):
993 raise OSError()
994 f = R()
995 # This would cause an assertion failure.
996 self.assertRaises(OSError, f.close)
997
Victor Stinner472f7942019-04-12 21:58:24 +0200998 # Silence destructor error
999 R.flush = lambda self: None
1000
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001003
1004 def test_IOBase_finalize(self):
1005 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1006 # class which inherits IOBase and an object of this class are caught
1007 # in a reference cycle and close() is already in the method cache.
1008 class MyIO(self.IOBase):
1009 def close(self):
1010 pass
1011
1012 # create an instance to populate the method cache
1013 MyIO()
1014 obj = MyIO()
1015 obj.obj = obj
1016 wr = weakref.ref(obj)
1017 del MyIO
1018 del obj
1019 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001020 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022class PyIOTest(IOTest):
1023 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001024
Guido van Rossuma9e20242007-03-08 00:43:48 +00001025
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001026@support.cpython_only
1027class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001028
Gregory P. Smith054b0652015-04-14 12:58:05 -07001029 def test_RawIOBase_io_in_pyio_match(self):
1030 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001031 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1032 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001033 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1034
1035 def test_RawIOBase_pyio_in_io_match(self):
1036 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1037 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1038 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1039
1040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041class CommonBufferedTests:
1042 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1043
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001044 def test_detach(self):
1045 raw = self.MockRawIO()
1046 buf = self.tp(raw)
1047 self.assertIs(buf.detach(), raw)
1048 self.assertRaises(ValueError, buf.detach)
1049
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001050 repr(buf) # Should still work
1051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 def test_fileno(self):
1053 rawio = self.MockRawIO()
1054 bufio = self.tp(rawio)
1055
Ezio Melottib3aedd42010-11-20 19:04:17 +00001056 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058 def test_invalid_args(self):
1059 rawio = self.MockRawIO()
1060 bufio = self.tp(rawio)
1061 # Invalid whence
1062 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001063 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064
1065 def test_override_destructor(self):
1066 tp = self.tp
1067 record = []
1068 class MyBufferedIO(tp):
1069 def __del__(self):
1070 record.append(1)
1071 try:
1072 f = super().__del__
1073 except AttributeError:
1074 pass
1075 else:
1076 f()
1077 def close(self):
1078 record.append(2)
1079 super().close()
1080 def flush(self):
1081 record.append(3)
1082 super().flush()
1083 rawio = self.MockRawIO()
1084 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001086 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001087 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088
1089 def test_context_manager(self):
1090 # Test usability as a context manager
1091 rawio = self.MockRawIO()
1092 bufio = self.tp(rawio)
1093 def _with():
1094 with bufio:
1095 pass
1096 _with()
1097 # bufio should now be closed, and using it a second time should raise
1098 # a ValueError.
1099 self.assertRaises(ValueError, _with)
1100
1101 def test_error_through_destructor(self):
1102 # Test that the exception state is not modified by a destructor,
1103 # even if close() fails.
1104 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001105 with support.catch_unraisable_exception() as cm:
1106 with self.assertRaises(AttributeError):
1107 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001108
1109 if not IOBASE_EMITS_UNRAISABLE:
1110 self.assertIsNone(cm.unraisable)
1111 elif cm.unraisable is not None:
1112 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001113
Antoine Pitrou716c4442009-05-23 19:04:03 +00001114 def test_repr(self):
1115 raw = self.MockRawIO()
1116 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001117 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1118 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001119 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001120 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001121 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001122 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001123
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001124 def test_recursive_repr(self):
1125 # Issue #25455
1126 raw = self.MockRawIO()
1127 b = self.tp(raw)
1128 with support.swap_attr(raw, 'name', b):
1129 try:
1130 repr(b) # Should not crash
1131 except RuntimeError:
1132 pass
1133
Antoine Pitrou6be88762010-05-03 16:48:20 +00001134 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001135 # Test that buffered file is closed despite failed flush
1136 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001137 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001138 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001139 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001140 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001141 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001142 raw.flush = bad_flush
1143 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001144 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001145 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001146 self.assertTrue(raw.closed)
1147 self.assertTrue(closed) # flush() called
1148 self.assertFalse(closed[0]) # flush() called before file closed
1149 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001150 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001151
1152 def test_close_error_on_close(self):
1153 raw = self.MockRawIO()
1154 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001155 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001156 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001157 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001158 raw.close = bad_close
1159 b = self.tp(raw)
1160 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001161 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001162 b.close()
1163 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001164 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001165 self.assertEqual(err.exception.__context__.args, ('flush',))
1166 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001167
Victor Stinner472f7942019-04-12 21:58:24 +02001168 # Silence destructor error
1169 raw.close = lambda: None
1170 b.flush = lambda: None
1171
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001172 def test_nonnormalized_close_error_on_close(self):
1173 # Issue #21677
1174 raw = self.MockRawIO()
1175 def bad_flush():
1176 raise non_existing_flush
1177 def bad_close():
1178 raise non_existing_close
1179 raw.close = bad_close
1180 b = self.tp(raw)
1181 b.flush = bad_flush
1182 with self.assertRaises(NameError) as err: # exception not swallowed
1183 b.close()
1184 self.assertIn('non_existing_close', str(err.exception))
1185 self.assertIsInstance(err.exception.__context__, NameError)
1186 self.assertIn('non_existing_flush', str(err.exception.__context__))
1187 self.assertFalse(b.closed)
1188
Victor Stinner472f7942019-04-12 21:58:24 +02001189 # Silence destructor error
1190 b.flush = lambda: None
1191 raw.close = lambda: None
1192
Antoine Pitrou6be88762010-05-03 16:48:20 +00001193 def test_multi_close(self):
1194 raw = self.MockRawIO()
1195 b = self.tp(raw)
1196 b.close()
1197 b.close()
1198 b.close()
1199 self.assertRaises(ValueError, b.flush)
1200
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001201 def test_unseekable(self):
1202 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1203 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1204 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1205
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001206 def test_readonly_attributes(self):
1207 raw = self.MockRawIO()
1208 buf = self.tp(raw)
1209 x = self.MockRawIO()
1210 with self.assertRaises(AttributeError):
1211 buf.raw = x
1212
Guido van Rossum78892e42007-04-06 17:31:18 +00001213
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001214class SizeofTest:
1215
1216 @support.cpython_only
1217 def test_sizeof(self):
1218 bufsize1 = 4096
1219 bufsize2 = 8192
1220 rawio = self.MockRawIO()
1221 bufio = self.tp(rawio, buffer_size=bufsize1)
1222 size = sys.getsizeof(bufio) - bufsize1
1223 rawio = self.MockRawIO()
1224 bufio = self.tp(rawio, buffer_size=bufsize2)
1225 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1226
Jesus Ceadc469452012-10-04 12:37:56 +02001227 @support.cpython_only
1228 def test_buffer_freeing(self) :
1229 bufsize = 4096
1230 rawio = self.MockRawIO()
1231 bufio = self.tp(rawio, buffer_size=bufsize)
1232 size = sys.getsizeof(bufio) - bufsize
1233 bufio.close()
1234 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001236class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1237 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001239 def test_constructor(self):
1240 rawio = self.MockRawIO([b"abc"])
1241 bufio = self.tp(rawio)
1242 bufio.__init__(rawio)
1243 bufio.__init__(rawio, buffer_size=1024)
1244 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001245 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001246 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1247 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1248 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1249 rawio = self.MockRawIO([b"abc"])
1250 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001251 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001252
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001253 def test_uninitialized(self):
1254 bufio = self.tp.__new__(self.tp)
1255 del bufio
1256 bufio = self.tp.__new__(self.tp)
1257 self.assertRaisesRegex((ValueError, AttributeError),
1258 'uninitialized|has no attribute',
1259 bufio.read, 0)
1260 bufio.__init__(self.MockRawIO())
1261 self.assertEqual(bufio.read(0), b'')
1262
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001264 for arg in (None, 7):
1265 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1266 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001267 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 # Invalid args
1269 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 def test_read1(self):
1272 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1273 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001274 self.assertEqual(b"a", bufio.read(1))
1275 self.assertEqual(b"b", bufio.read1(1))
1276 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001277 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001278 self.assertEqual(b"c", bufio.read1(100))
1279 self.assertEqual(rawio._reads, 1)
1280 self.assertEqual(b"d", bufio.read1(100))
1281 self.assertEqual(rawio._reads, 2)
1282 self.assertEqual(b"efg", bufio.read1(100))
1283 self.assertEqual(rawio._reads, 3)
1284 self.assertEqual(b"", bufio.read1(100))
1285 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001286
1287 def test_read1_arbitrary(self):
1288 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1289 bufio = self.tp(rawio)
1290 self.assertEqual(b"a", bufio.read(1))
1291 self.assertEqual(b"bc", bufio.read1())
1292 self.assertEqual(b"d", bufio.read1())
1293 self.assertEqual(b"efg", bufio.read1(-1))
1294 self.assertEqual(rawio._reads, 3)
1295 self.assertEqual(b"", bufio.read1())
1296 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001297
1298 def test_readinto(self):
1299 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1300 bufio = self.tp(rawio)
1301 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001302 self.assertEqual(bufio.readinto(b), 2)
1303 self.assertEqual(b, b"ab")
1304 self.assertEqual(bufio.readinto(b), 2)
1305 self.assertEqual(b, b"cd")
1306 self.assertEqual(bufio.readinto(b), 2)
1307 self.assertEqual(b, b"ef")
1308 self.assertEqual(bufio.readinto(b), 1)
1309 self.assertEqual(b, b"gf")
1310 self.assertEqual(bufio.readinto(b), 0)
1311 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001312 rawio = self.MockRawIO((b"abc", None))
1313 bufio = self.tp(rawio)
1314 self.assertEqual(bufio.readinto(b), 2)
1315 self.assertEqual(b, b"ab")
1316 self.assertEqual(bufio.readinto(b), 1)
1317 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318
Benjamin Petersona96fea02014-06-22 14:17:44 -07001319 def test_readinto1(self):
1320 buffer_size = 10
1321 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1322 bufio = self.tp(rawio, buffer_size=buffer_size)
1323 b = bytearray(2)
1324 self.assertEqual(bufio.peek(3), b'abc')
1325 self.assertEqual(rawio._reads, 1)
1326 self.assertEqual(bufio.readinto1(b), 2)
1327 self.assertEqual(b, b"ab")
1328 self.assertEqual(rawio._reads, 1)
1329 self.assertEqual(bufio.readinto1(b), 1)
1330 self.assertEqual(b[:1], b"c")
1331 self.assertEqual(rawio._reads, 1)
1332 self.assertEqual(bufio.readinto1(b), 2)
1333 self.assertEqual(b, b"de")
1334 self.assertEqual(rawio._reads, 2)
1335 b = bytearray(2*buffer_size)
1336 self.assertEqual(bufio.peek(3), b'fgh')
1337 self.assertEqual(rawio._reads, 3)
1338 self.assertEqual(bufio.readinto1(b), 6)
1339 self.assertEqual(b[:6], b"fghjkl")
1340 self.assertEqual(rawio._reads, 4)
1341
1342 def test_readinto_array(self):
1343 buffer_size = 60
1344 data = b"a" * 26
1345 rawio = self.MockRawIO((data,))
1346 bufio = self.tp(rawio, buffer_size=buffer_size)
1347
1348 # Create an array with element size > 1 byte
1349 b = array.array('i', b'x' * 32)
1350 assert len(b) != 16
1351
1352 # Read into it. We should get as many *bytes* as we can fit into b
1353 # (which is more than the number of elements)
1354 n = bufio.readinto(b)
1355 self.assertGreater(n, len(b))
1356
1357 # Check that old contents of b are preserved
1358 bm = memoryview(b).cast('B')
1359 self.assertLess(n, len(bm))
1360 self.assertEqual(bm[:n], data[:n])
1361 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1362
1363 def test_readinto1_array(self):
1364 buffer_size = 60
1365 data = b"a" * 26
1366 rawio = self.MockRawIO((data,))
1367 bufio = self.tp(rawio, buffer_size=buffer_size)
1368
1369 # Create an array with element size > 1 byte
1370 b = array.array('i', b'x' * 32)
1371 assert len(b) != 16
1372
1373 # Read into it. We should get as many *bytes* as we can fit into b
1374 # (which is more than the number of elements)
1375 n = bufio.readinto1(b)
1376 self.assertGreater(n, len(b))
1377
1378 # Check that old contents of b are preserved
1379 bm = memoryview(b).cast('B')
1380 self.assertLess(n, len(bm))
1381 self.assertEqual(bm[:n], data[:n])
1382 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1383
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001384 def test_readlines(self):
1385 def bufio():
1386 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1387 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001388 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1389 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1390 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001391
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001392 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001393 data = b"abcdefghi"
1394 dlen = len(data)
1395
1396 tests = [
1397 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1398 [ 100, [ 3, 3, 3], [ dlen ] ],
1399 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1400 ]
1401
1402 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 rawio = self.MockFileIO(data)
1404 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001405 pos = 0
1406 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001407 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001408 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001410 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001411
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001412 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001413 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1415 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001416 self.assertEqual(b"abcd", bufio.read(6))
1417 self.assertEqual(b"e", bufio.read(1))
1418 self.assertEqual(b"fg", bufio.read())
1419 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001420 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001422
Victor Stinnera80987f2011-05-25 22:47:16 +02001423 rawio = self.MockRawIO((b"a", None, None))
1424 self.assertEqual(b"a", rawio.readall())
1425 self.assertIsNone(rawio.readall())
1426
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001427 def test_read_past_eof(self):
1428 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1429 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001430
Ezio Melottib3aedd42010-11-20 19:04:17 +00001431 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001432
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001433 def test_read_all(self):
1434 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1435 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001436
Ezio Melottib3aedd42010-11-20 19:04:17 +00001437 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001438
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001439 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001441 try:
1442 # Write out many bytes with exactly the same number of 0's,
1443 # 1's... 255's. This will help us check that concurrent reading
1444 # doesn't duplicate or forget contents.
1445 N = 1000
1446 l = list(range(256)) * N
1447 random.shuffle(l)
1448 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001450 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001451 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001452 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001453 errors = []
1454 results = []
1455 def f():
1456 try:
1457 # Intra-buffer read then buffer-flushing read
1458 for n in cycle([1, 19]):
1459 s = bufio.read(n)
1460 if not s:
1461 break
1462 # list.append() is atomic
1463 results.append(s)
1464 except Exception as e:
1465 errors.append(e)
1466 raise
1467 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001468 with support.start_threads(threads):
1469 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001470 self.assertFalse(errors,
1471 "the following exceptions were caught: %r" % errors)
1472 s = b''.join(results)
1473 for i in range(256):
1474 c = bytes(bytearray([i]))
1475 self.assertEqual(s.count(c), N)
1476 finally:
1477 support.unlink(support.TESTFN)
1478
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001479 def test_unseekable(self):
1480 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1481 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1482 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1483 bufio.read(1)
1484 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1485 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 def test_misbehaved_io(self):
1488 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1489 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001490 self.assertRaises(OSError, bufio.seek, 0)
1491 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492
Victor Stinnerb589cef2019-06-11 03:10:59 +02001493 # Silence destructor error
1494 bufio.close = lambda: None
1495
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001496 def test_no_extraneous_read(self):
1497 # Issue #9550; when the raw IO object has satisfied the read request,
1498 # we should not issue any additional reads, otherwise it may block
1499 # (e.g. socket).
1500 bufsize = 16
1501 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1502 rawio = self.MockRawIO([b"x" * n])
1503 bufio = self.tp(rawio, bufsize)
1504 self.assertEqual(bufio.read(n), b"x" * n)
1505 # Simple case: one raw read is enough to satisfy the request.
1506 self.assertEqual(rawio._extraneous_reads, 0,
1507 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1508 # A more complex case where two raw reads are needed to satisfy
1509 # the request.
1510 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1511 bufio = self.tp(rawio, bufsize)
1512 self.assertEqual(bufio.read(n), b"x" * n)
1513 self.assertEqual(rawio._extraneous_reads, 0,
1514 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1515
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001516 def test_read_on_closed(self):
1517 # Issue #23796
1518 b = io.BufferedReader(io.BytesIO(b"12"))
1519 b.read(1)
1520 b.close()
1521 self.assertRaises(ValueError, b.peek)
1522 self.assertRaises(ValueError, b.read1, 1)
1523
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001524
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001525class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001526 tp = io.BufferedReader
1527
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001528 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1529 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_constructor(self):
1531 BufferedReaderTest.test_constructor(self)
1532 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001533 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001534 if sys.maxsize > 0x7FFFFFFF:
1535 rawio = self.MockRawIO()
1536 bufio = self.tp(rawio)
1537 self.assertRaises((OverflowError, MemoryError, ValueError),
1538 bufio.__init__, rawio, sys.maxsize)
1539
1540 def test_initialization(self):
1541 rawio = self.MockRawIO([b"abc"])
1542 bufio = self.tp(rawio)
1543 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1544 self.assertRaises(ValueError, bufio.read)
1545 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1546 self.assertRaises(ValueError, bufio.read)
1547 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1548 self.assertRaises(ValueError, bufio.read)
1549
1550 def test_misbehaved_io_read(self):
1551 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1552 bufio = self.tp(rawio)
1553 # _pyio.BufferedReader seems to implement reading different, so that
1554 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001555 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556
1557 def test_garbage_collection(self):
1558 # C BufferedReader objects are collected.
1559 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001560 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001561 with support.check_warnings(('', ResourceWarning)):
1562 rawio = self.FileIO(support.TESTFN, "w+b")
1563 f = self.tp(rawio)
1564 f.f = f
1565 wr = weakref.ref(f)
1566 del f
1567 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001568 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001569
R David Murray67bfe802013-02-23 21:51:05 -05001570 def test_args_error(self):
1571 # Issue #17275
1572 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1573 self.tp(io.BytesIO(), 1024, 1024, 1024)
1574
1575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576class PyBufferedReaderTest(BufferedReaderTest):
1577 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001578
Guido van Rossuma9e20242007-03-08 00:43:48 +00001579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1581 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001582
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001583 def test_constructor(self):
1584 rawio = self.MockRawIO()
1585 bufio = self.tp(rawio)
1586 bufio.__init__(rawio)
1587 bufio.__init__(rawio, buffer_size=1024)
1588 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001589 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001590 bufio.flush()
1591 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1592 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1593 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1594 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001595 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001599 def test_uninitialized(self):
1600 bufio = self.tp.__new__(self.tp)
1601 del bufio
1602 bufio = self.tp.__new__(self.tp)
1603 self.assertRaisesRegex((ValueError, AttributeError),
1604 'uninitialized|has no attribute',
1605 bufio.write, b'')
1606 bufio.__init__(self.MockRawIO())
1607 self.assertEqual(bufio.write(b''), 0)
1608
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001609 def test_detach_flush(self):
1610 raw = self.MockRawIO()
1611 buf = self.tp(raw)
1612 buf.write(b"howdy!")
1613 self.assertFalse(raw._write_stack)
1614 buf.detach()
1615 self.assertEqual(raw._write_stack, [b"howdy!"])
1616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001618 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 writer = self.MockRawIO()
1620 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001621 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001622 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001623 buffer = bytearray(b"def")
1624 bufio.write(buffer)
1625 buffer[:] = b"***" # Overwrite our copy of the data
1626 bufio.flush()
1627 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629 def test_write_overflow(self):
1630 writer = self.MockRawIO()
1631 bufio = self.tp(writer, 8)
1632 contents = b"abcdefghijklmnop"
1633 for n in range(0, len(contents), 3):
1634 bufio.write(contents[n:n+3])
1635 flushed = b"".join(writer._write_stack)
1636 # At least (total - 8) bytes were implicitly flushed, perhaps more
1637 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001638 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 def check_writes(self, intermediate_func):
1641 # Lots of writes, test the flushed output is as expected.
1642 contents = bytes(range(256)) * 1000
1643 n = 0
1644 writer = self.MockRawIO()
1645 bufio = self.tp(writer, 13)
1646 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1647 def gen_sizes():
1648 for size in count(1):
1649 for i in range(15):
1650 yield size
1651 sizes = gen_sizes()
1652 while n < len(contents):
1653 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001654 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 intermediate_func(bufio)
1656 n += size
1657 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001658 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 def test_writes(self):
1661 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 def test_writes_and_flushes(self):
1664 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001666 def test_writes_and_seeks(self):
1667 def _seekabs(bufio):
1668 pos = bufio.tell()
1669 bufio.seek(pos + 1, 0)
1670 bufio.seek(pos - 1, 0)
1671 bufio.seek(pos, 0)
1672 self.check_writes(_seekabs)
1673 def _seekrel(bufio):
1674 pos = bufio.seek(0, 1)
1675 bufio.seek(+1, 1)
1676 bufio.seek(-1, 1)
1677 bufio.seek(pos, 0)
1678 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001680 def test_writes_and_truncates(self):
1681 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 def test_write_non_blocking(self):
1684 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001685 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001686
Ezio Melottib3aedd42010-11-20 19:04:17 +00001687 self.assertEqual(bufio.write(b"abcd"), 4)
1688 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 # 1 byte will be written, the rest will be buffered
1690 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001691 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1694 raw.block_on(b"0")
1695 try:
1696 bufio.write(b"opqrwxyz0123456789")
1697 except self.BlockingIOError as e:
1698 written = e.characters_written
1699 else:
1700 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(written, 16)
1702 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001704
Ezio Melottib3aedd42010-11-20 19:04:17 +00001705 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001706 s = raw.pop_written()
1707 # Previously buffered bytes were flushed
1708 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001709
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001710 def test_write_and_rewind(self):
1711 raw = io.BytesIO()
1712 bufio = self.tp(raw, 4)
1713 self.assertEqual(bufio.write(b"abcdef"), 6)
1714 self.assertEqual(bufio.tell(), 6)
1715 bufio.seek(0, 0)
1716 self.assertEqual(bufio.write(b"XY"), 2)
1717 bufio.seek(6, 0)
1718 self.assertEqual(raw.getvalue(), b"XYcdef")
1719 self.assertEqual(bufio.write(b"123456"), 6)
1720 bufio.flush()
1721 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 def test_flush(self):
1724 writer = self.MockRawIO()
1725 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001726 bufio.write(b"abc")
1727 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001728 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001729
Antoine Pitrou131a4892012-10-16 22:57:11 +02001730 def test_writelines(self):
1731 l = [b'ab', b'cd', b'ef']
1732 writer = self.MockRawIO()
1733 bufio = self.tp(writer, 8)
1734 bufio.writelines(l)
1735 bufio.flush()
1736 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1737
1738 def test_writelines_userlist(self):
1739 l = UserList([b'ab', b'cd', b'ef'])
1740 writer = self.MockRawIO()
1741 bufio = self.tp(writer, 8)
1742 bufio.writelines(l)
1743 bufio.flush()
1744 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1745
1746 def test_writelines_error(self):
1747 writer = self.MockRawIO()
1748 bufio = self.tp(writer, 8)
1749 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1750 self.assertRaises(TypeError, bufio.writelines, None)
1751 self.assertRaises(TypeError, bufio.writelines, 'abc')
1752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 def test_destructor(self):
1754 writer = self.MockRawIO()
1755 bufio = self.tp(writer, 8)
1756 bufio.write(b"abc")
1757 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001758 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001759 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760
1761 def test_truncate(self):
1762 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001763 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001764 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 bufio = self.tp(raw, 8)
1766 bufio.write(b"abcdef")
1767 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001768 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001769 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 self.assertEqual(f.read(), b"abc")
1771
Nitish Chandra059f58c2018-01-28 21:30:09 +05301772 def test_truncate_after_write(self):
1773 # Ensure that truncate preserves the file position after
1774 # writes longer than the buffer size.
1775 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001776 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301777 with self.open(support.TESTFN, "wb") as f:
1778 # Fill with some buffer
1779 f.write(b'\x00' * 10000)
1780 buffer_sizes = [8192, 4096, 200]
1781 for buffer_size in buffer_sizes:
1782 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1783 f.write(b'\x00' * (buffer_size + 1))
1784 # After write write_pos and write_end are set to 0
1785 f.read(1)
1786 # read operation makes sure that pos != raw_pos
1787 f.truncate()
1788 self.assertEqual(f.tell(), buffer_size + 2)
1789
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001790 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001792 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 # Write out many bytes from many threads and test they were
1794 # all flushed.
1795 N = 1000
1796 contents = bytes(range(256)) * N
1797 sizes = cycle([1, 19])
1798 n = 0
1799 queue = deque()
1800 while n < len(contents):
1801 size = next(sizes)
1802 queue.append(contents[n:n+size])
1803 n += size
1804 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001805 # We use a real file object because it allows us to
1806 # exercise situations where the GIL is released before
1807 # writing the buffer to the raw streams. This is in addition
1808 # to concurrency issues due to switching threads in the middle
1809 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001810 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001811 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001812 errors = []
1813 def f():
1814 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001815 while True:
1816 try:
1817 s = queue.popleft()
1818 except IndexError:
1819 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001820 bufio.write(s)
1821 except Exception as e:
1822 errors.append(e)
1823 raise
1824 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001825 with support.start_threads(threads):
1826 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001827 self.assertFalse(errors,
1828 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001830 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 s = f.read()
1832 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001833 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001834 finally:
1835 support.unlink(support.TESTFN)
1836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001837 def test_misbehaved_io(self):
1838 rawio = self.MisbehavedRawIO()
1839 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001840 self.assertRaises(OSError, bufio.seek, 0)
1841 self.assertRaises(OSError, bufio.tell)
1842 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001843
Victor Stinnerb589cef2019-06-11 03:10:59 +02001844 # Silence destructor error
1845 bufio.close = lambda: None
1846
Florent Xicluna109d5732012-07-07 17:03:22 +02001847 def test_max_buffer_size_removal(self):
1848 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001849 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001850
Benjamin Peterson68623612012-12-20 11:53:11 -06001851 def test_write_error_on_close(self):
1852 raw = self.MockRawIO()
1853 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001854 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001855 raw.write = bad_write
1856 b = self.tp(raw)
1857 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001858 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001859 self.assertTrue(b.closed)
1860
benfogle9703f092017-11-10 16:03:40 -05001861 def test_slow_close_from_thread(self):
1862 # Issue #31976
1863 rawio = self.SlowFlushRawIO()
1864 bufio = self.tp(rawio, 8)
1865 t = threading.Thread(target=bufio.close)
1866 t.start()
1867 rawio.in_flush.wait()
1868 self.assertRaises(ValueError, bufio.write, b'spam')
1869 self.assertTrue(bufio.closed)
1870 t.join()
1871
1872
Benjamin Peterson59406a92009-03-26 17:10:29 +00001873
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001874class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875 tp = io.BufferedWriter
1876
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001877 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1878 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001879 def test_constructor(self):
1880 BufferedWriterTest.test_constructor(self)
1881 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001882 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001883 if sys.maxsize > 0x7FFFFFFF:
1884 rawio = self.MockRawIO()
1885 bufio = self.tp(rawio)
1886 self.assertRaises((OverflowError, MemoryError, ValueError),
1887 bufio.__init__, rawio, sys.maxsize)
1888
1889 def test_initialization(self):
1890 rawio = self.MockRawIO()
1891 bufio = self.tp(rawio)
1892 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1893 self.assertRaises(ValueError, bufio.write, b"def")
1894 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1895 self.assertRaises(ValueError, bufio.write, b"def")
1896 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1897 self.assertRaises(ValueError, bufio.write, b"def")
1898
1899 def test_garbage_collection(self):
1900 # C BufferedWriter objects are collected, and collecting them flushes
1901 # all data to disk.
1902 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001903 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001904 with support.check_warnings(('', ResourceWarning)):
1905 rawio = self.FileIO(support.TESTFN, "w+b")
1906 f = self.tp(rawio)
1907 f.write(b"123xxx")
1908 f.x = f
1909 wr = weakref.ref(f)
1910 del f
1911 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001912 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001913 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001914 self.assertEqual(f.read(), b"123xxx")
1915
R David Murray67bfe802013-02-23 21:51:05 -05001916 def test_args_error(self):
1917 # Issue #17275
1918 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1919 self.tp(io.BytesIO(), 1024, 1024, 1024)
1920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921
1922class PyBufferedWriterTest(BufferedWriterTest):
1923 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001924
Guido van Rossum01a27522007-03-07 01:00:12 +00001925class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001926
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001927 def test_constructor(self):
1928 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001929 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001930
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001931 def test_uninitialized(self):
1932 pair = self.tp.__new__(self.tp)
1933 del pair
1934 pair = self.tp.__new__(self.tp)
1935 self.assertRaisesRegex((ValueError, AttributeError),
1936 'uninitialized|has no attribute',
1937 pair.read, 0)
1938 self.assertRaisesRegex((ValueError, AttributeError),
1939 'uninitialized|has no attribute',
1940 pair.write, b'')
1941 pair.__init__(self.MockRawIO(), self.MockRawIO())
1942 self.assertEqual(pair.read(0), b'')
1943 self.assertEqual(pair.write(b''), 0)
1944
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001945 def test_detach(self):
1946 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1947 self.assertRaises(self.UnsupportedOperation, pair.detach)
1948
Florent Xicluna109d5732012-07-07 17:03:22 +02001949 def test_constructor_max_buffer_size_removal(self):
1950 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001951 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001952
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001953 def test_constructor_with_not_readable(self):
1954 class NotReadable(MockRawIO):
1955 def readable(self):
1956 return False
1957
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001958 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001959
1960 def test_constructor_with_not_writeable(self):
1961 class NotWriteable(MockRawIO):
1962 def writable(self):
1963 return False
1964
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001965 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001966
1967 def test_read(self):
1968 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1969
1970 self.assertEqual(pair.read(3), b"abc")
1971 self.assertEqual(pair.read(1), b"d")
1972 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001973 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1974 self.assertEqual(pair.read(None), b"abc")
1975
1976 def test_readlines(self):
1977 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1978 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1979 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1980 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001981
1982 def test_read1(self):
1983 # .read1() is delegated to the underlying reader object, so this test
1984 # can be shallow.
1985 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1986
1987 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001988 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001989
1990 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001991 for method in ("readinto", "readinto1"):
1992 with self.subTest(method):
1993 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001994
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001995 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001996 self.assertEqual(getattr(pair, method)(data), 5)
1997 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001998
1999 def test_write(self):
2000 w = self.MockRawIO()
2001 pair = self.tp(self.MockRawIO(), w)
2002
2003 pair.write(b"abc")
2004 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002005 buffer = bytearray(b"def")
2006 pair.write(buffer)
2007 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002008 pair.flush()
2009 self.assertEqual(w._write_stack, [b"abc", b"def"])
2010
2011 def test_peek(self):
2012 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2013
2014 self.assertTrue(pair.peek(3).startswith(b"abc"))
2015 self.assertEqual(pair.read(3), b"abc")
2016
2017 def test_readable(self):
2018 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2019 self.assertTrue(pair.readable())
2020
2021 def test_writeable(self):
2022 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2023 self.assertTrue(pair.writable())
2024
2025 def test_seekable(self):
2026 # BufferedRWPairs are never seekable, even if their readers and writers
2027 # are.
2028 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2029 self.assertFalse(pair.seekable())
2030
2031 # .flush() is delegated to the underlying writer object and has been
2032 # tested in the test_write method.
2033
2034 def test_close_and_closed(self):
2035 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2036 self.assertFalse(pair.closed)
2037 pair.close()
2038 self.assertTrue(pair.closed)
2039
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002040 def test_reader_close_error_on_close(self):
2041 def reader_close():
2042 reader_non_existing
2043 reader = self.MockRawIO()
2044 reader.close = reader_close
2045 writer = self.MockRawIO()
2046 pair = self.tp(reader, writer)
2047 with self.assertRaises(NameError) as err:
2048 pair.close()
2049 self.assertIn('reader_non_existing', str(err.exception))
2050 self.assertTrue(pair.closed)
2051 self.assertFalse(reader.closed)
2052 self.assertTrue(writer.closed)
2053
Victor Stinner472f7942019-04-12 21:58:24 +02002054 # Silence destructor error
2055 reader.close = lambda: None
2056
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002057 def test_writer_close_error_on_close(self):
2058 def writer_close():
2059 writer_non_existing
2060 reader = self.MockRawIO()
2061 writer = self.MockRawIO()
2062 writer.close = writer_close
2063 pair = self.tp(reader, writer)
2064 with self.assertRaises(NameError) as err:
2065 pair.close()
2066 self.assertIn('writer_non_existing', str(err.exception))
2067 self.assertFalse(pair.closed)
2068 self.assertTrue(reader.closed)
2069 self.assertFalse(writer.closed)
2070
Victor Stinner472f7942019-04-12 21:58:24 +02002071 # Silence destructor error
2072 writer.close = lambda: None
Victor Stinner913fa1c2019-06-12 23:57:11 +02002073 writer = None
2074
2075 with support.catch_unraisable_exception():
2076 pair = None
2077 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002078
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002079 def test_reader_writer_close_error_on_close(self):
2080 def reader_close():
2081 reader_non_existing
2082 def writer_close():
2083 writer_non_existing
2084 reader = self.MockRawIO()
2085 reader.close = reader_close
2086 writer = self.MockRawIO()
2087 writer.close = writer_close
2088 pair = self.tp(reader, writer)
2089 with self.assertRaises(NameError) as err:
2090 pair.close()
2091 self.assertIn('reader_non_existing', str(err.exception))
2092 self.assertIsInstance(err.exception.__context__, NameError)
2093 self.assertIn('writer_non_existing', str(err.exception.__context__))
2094 self.assertFalse(pair.closed)
2095 self.assertFalse(reader.closed)
2096 self.assertFalse(writer.closed)
2097
Victor Stinner472f7942019-04-12 21:58:24 +02002098 # Silence destructor error
2099 reader.close = lambda: None
2100 writer.close = lambda: None
2101
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002102 def test_isatty(self):
2103 class SelectableIsAtty(MockRawIO):
2104 def __init__(self, isatty):
2105 MockRawIO.__init__(self)
2106 self._isatty = isatty
2107
2108 def isatty(self):
2109 return self._isatty
2110
2111 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2112 self.assertFalse(pair.isatty())
2113
2114 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2115 self.assertTrue(pair.isatty())
2116
2117 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2118 self.assertTrue(pair.isatty())
2119
2120 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2121 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002122
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002123 def test_weakref_clearing(self):
2124 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2125 ref = weakref.ref(brw)
2126 brw = None
2127 ref = None # Shouldn't segfault.
2128
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002129class CBufferedRWPairTest(BufferedRWPairTest):
2130 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002131
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002132class PyBufferedRWPairTest(BufferedRWPairTest):
2133 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002134
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002135
2136class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2137 read_mode = "rb+"
2138 write_mode = "wb+"
2139
2140 def test_constructor(self):
2141 BufferedReaderTest.test_constructor(self)
2142 BufferedWriterTest.test_constructor(self)
2143
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002144 def test_uninitialized(self):
2145 BufferedReaderTest.test_uninitialized(self)
2146 BufferedWriterTest.test_uninitialized(self)
2147
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002148 def test_read_and_write(self):
2149 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002150 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002151
2152 self.assertEqual(b"as", rw.read(2))
2153 rw.write(b"ddd")
2154 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002155 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002156 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002157 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002158
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002159 def test_seek_and_tell(self):
2160 raw = self.BytesIO(b"asdfghjkl")
2161 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002162
Ezio Melottib3aedd42010-11-20 19:04:17 +00002163 self.assertEqual(b"as", rw.read(2))
2164 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002165 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002166 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002167
Antoine Pitroue05565e2011-08-20 14:39:23 +02002168 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002169 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002170 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002171 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002172 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002173 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002174 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002175 self.assertEqual(7, rw.tell())
2176 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002177 rw.flush()
2178 self.assertEqual(b"asdf123fl", raw.getvalue())
2179
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002180 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002181
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002182 def check_flush_and_read(self, read_func):
2183 raw = self.BytesIO(b"abcdefghi")
2184 bufio = self.tp(raw)
2185
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002187 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002188 self.assertEqual(b"ef", read_func(bufio, 2))
2189 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002190 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002191 self.assertEqual(6, bufio.tell())
2192 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002193 raw.seek(0, 0)
2194 raw.write(b"XYZ")
2195 # flush() resets the read buffer
2196 bufio.flush()
2197 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002198 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002199
2200 def test_flush_and_read(self):
2201 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2202
2203 def test_flush_and_readinto(self):
2204 def _readinto(bufio, n=-1):
2205 b = bytearray(n if n >= 0 else 9999)
2206 n = bufio.readinto(b)
2207 return bytes(b[:n])
2208 self.check_flush_and_read(_readinto)
2209
2210 def test_flush_and_peek(self):
2211 def _peek(bufio, n=-1):
2212 # This relies on the fact that the buffer can contain the whole
2213 # raw stream, otherwise peek() can return less.
2214 b = bufio.peek(n)
2215 if n != -1:
2216 b = b[:n]
2217 bufio.seek(len(b), 1)
2218 return b
2219 self.check_flush_and_read(_peek)
2220
2221 def test_flush_and_write(self):
2222 raw = self.BytesIO(b"abcdefghi")
2223 bufio = self.tp(raw)
2224
2225 bufio.write(b"123")
2226 bufio.flush()
2227 bufio.write(b"45")
2228 bufio.flush()
2229 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002230 self.assertEqual(b"12345fghi", raw.getvalue())
2231 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002232
2233 def test_threads(self):
2234 BufferedReaderTest.test_threads(self)
2235 BufferedWriterTest.test_threads(self)
2236
2237 def test_writes_and_peek(self):
2238 def _peek(bufio):
2239 bufio.peek(1)
2240 self.check_writes(_peek)
2241 def _peek(bufio):
2242 pos = bufio.tell()
2243 bufio.seek(-1, 1)
2244 bufio.peek(1)
2245 bufio.seek(pos, 0)
2246 self.check_writes(_peek)
2247
2248 def test_writes_and_reads(self):
2249 def _read(bufio):
2250 bufio.seek(-1, 1)
2251 bufio.read(1)
2252 self.check_writes(_read)
2253
2254 def test_writes_and_read1s(self):
2255 def _read1(bufio):
2256 bufio.seek(-1, 1)
2257 bufio.read1(1)
2258 self.check_writes(_read1)
2259
2260 def test_writes_and_readintos(self):
2261 def _read(bufio):
2262 bufio.seek(-1, 1)
2263 bufio.readinto(bytearray(1))
2264 self.check_writes(_read)
2265
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002266 def test_write_after_readahead(self):
2267 # Issue #6629: writing after the buffer was filled by readahead should
2268 # first rewind the raw stream.
2269 for overwrite_size in [1, 5]:
2270 raw = self.BytesIO(b"A" * 10)
2271 bufio = self.tp(raw, 4)
2272 # Trigger readahead
2273 self.assertEqual(bufio.read(1), b"A")
2274 self.assertEqual(bufio.tell(), 1)
2275 # Overwriting should rewind the raw stream if it needs so
2276 bufio.write(b"B" * overwrite_size)
2277 self.assertEqual(bufio.tell(), overwrite_size + 1)
2278 # If the write size was smaller than the buffer size, flush() and
2279 # check that rewind happens.
2280 bufio.flush()
2281 self.assertEqual(bufio.tell(), overwrite_size + 1)
2282 s = raw.getvalue()
2283 self.assertEqual(s,
2284 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2285
Antoine Pitrou7c404892011-05-13 00:13:33 +02002286 def test_write_rewind_write(self):
2287 # Various combinations of reading / writing / seeking backwards / writing again
2288 def mutate(bufio, pos1, pos2):
2289 assert pos2 >= pos1
2290 # Fill the buffer
2291 bufio.seek(pos1)
2292 bufio.read(pos2 - pos1)
2293 bufio.write(b'\x02')
2294 # This writes earlier than the previous write, but still inside
2295 # the buffer.
2296 bufio.seek(pos1)
2297 bufio.write(b'\x01')
2298
2299 b = b"\x80\x81\x82\x83\x84"
2300 for i in range(0, len(b)):
2301 for j in range(i, len(b)):
2302 raw = self.BytesIO(b)
2303 bufio = self.tp(raw, 100)
2304 mutate(bufio, i, j)
2305 bufio.flush()
2306 expected = bytearray(b)
2307 expected[j] = 2
2308 expected[i] = 1
2309 self.assertEqual(raw.getvalue(), expected,
2310 "failed result for i=%d, j=%d" % (i, j))
2311
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002312 def test_truncate_after_read_or_write(self):
2313 raw = self.BytesIO(b"A" * 10)
2314 bufio = self.tp(raw, 100)
2315 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2316 self.assertEqual(bufio.truncate(), 2)
2317 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2318 self.assertEqual(bufio.truncate(), 4)
2319
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002320 def test_misbehaved_io(self):
2321 BufferedReaderTest.test_misbehaved_io(self)
2322 BufferedWriterTest.test_misbehaved_io(self)
2323
Antoine Pitroue05565e2011-08-20 14:39:23 +02002324 def test_interleaved_read_write(self):
2325 # Test for issue #12213
2326 with self.BytesIO(b'abcdefgh') as raw:
2327 with self.tp(raw, 100) as f:
2328 f.write(b"1")
2329 self.assertEqual(f.read(1), b'b')
2330 f.write(b'2')
2331 self.assertEqual(f.read1(1), b'd')
2332 f.write(b'3')
2333 buf = bytearray(1)
2334 f.readinto(buf)
2335 self.assertEqual(buf, b'f')
2336 f.write(b'4')
2337 self.assertEqual(f.peek(1), b'h')
2338 f.flush()
2339 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2340
2341 with self.BytesIO(b'abc') as raw:
2342 with self.tp(raw, 100) as f:
2343 self.assertEqual(f.read(1), b'a')
2344 f.write(b"2")
2345 self.assertEqual(f.read(1), b'c')
2346 f.flush()
2347 self.assertEqual(raw.getvalue(), b'a2c')
2348
2349 def test_interleaved_readline_write(self):
2350 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2351 with self.tp(raw) as f:
2352 f.write(b'1')
2353 self.assertEqual(f.readline(), b'b\n')
2354 f.write(b'2')
2355 self.assertEqual(f.readline(), b'def\n')
2356 f.write(b'3')
2357 self.assertEqual(f.readline(), b'\n')
2358 f.flush()
2359 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2360
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002361 # You can't construct a BufferedRandom over a non-seekable stream.
2362 test_unseekable = None
2363
R David Murray67bfe802013-02-23 21:51:05 -05002364
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002365class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002366 tp = io.BufferedRandom
2367
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002368 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2369 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002370 def test_constructor(self):
2371 BufferedRandomTest.test_constructor(self)
2372 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002373 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002374 if sys.maxsize > 0x7FFFFFFF:
2375 rawio = self.MockRawIO()
2376 bufio = self.tp(rawio)
2377 self.assertRaises((OverflowError, MemoryError, ValueError),
2378 bufio.__init__, rawio, sys.maxsize)
2379
2380 def test_garbage_collection(self):
2381 CBufferedReaderTest.test_garbage_collection(self)
2382 CBufferedWriterTest.test_garbage_collection(self)
2383
R David Murray67bfe802013-02-23 21:51:05 -05002384 def test_args_error(self):
2385 # Issue #17275
2386 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2387 self.tp(io.BytesIO(), 1024, 1024, 1024)
2388
2389
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002390class PyBufferedRandomTest(BufferedRandomTest):
2391 tp = pyio.BufferedRandom
2392
2393
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002394# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2395# properties:
2396# - A single output character can correspond to many bytes of input.
2397# - The number of input bytes to complete the character can be
2398# undetermined until the last input byte is received.
2399# - The number of input bytes can vary depending on previous input.
2400# - A single input byte can correspond to many characters of output.
2401# - The number of output characters can be undetermined until the
2402# last input byte is received.
2403# - The number of output characters can vary depending on previous input.
2404
2405class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2406 """
2407 For testing seek/tell behavior with a stateful, buffering decoder.
2408
2409 Input is a sequence of words. Words may be fixed-length (length set
2410 by input) or variable-length (period-terminated). In variable-length
2411 mode, extra periods are ignored. Possible words are:
2412 - 'i' followed by a number sets the input length, I (maximum 99).
2413 When I is set to 0, words are space-terminated.
2414 - 'o' followed by a number sets the output length, O (maximum 99).
2415 - Any other word is converted into a word followed by a period on
2416 the output. The output word consists of the input word truncated
2417 or padded out with hyphens to make its length equal to O. If O
2418 is 0, the word is output verbatim without truncating or padding.
2419 I and O are initially set to 1. When I changes, any buffered input is
2420 re-scanned according to the new I. EOF also terminates the last word.
2421 """
2422
2423 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002424 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002425 self.reset()
2426
2427 def __repr__(self):
2428 return '<SID %x>' % id(self)
2429
2430 def reset(self):
2431 self.i = 1
2432 self.o = 1
2433 self.buffer = bytearray()
2434
2435 def getstate(self):
2436 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2437 return bytes(self.buffer), i*100 + o
2438
2439 def setstate(self, state):
2440 buffer, io = state
2441 self.buffer = bytearray(buffer)
2442 i, o = divmod(io, 100)
2443 self.i, self.o = i ^ 1, o ^ 1
2444
2445 def decode(self, input, final=False):
2446 output = ''
2447 for b in input:
2448 if self.i == 0: # variable-length, terminated with period
2449 if b == ord('.'):
2450 if self.buffer:
2451 output += self.process_word()
2452 else:
2453 self.buffer.append(b)
2454 else: # fixed-length, terminate after self.i bytes
2455 self.buffer.append(b)
2456 if len(self.buffer) == self.i:
2457 output += self.process_word()
2458 if final and self.buffer: # EOF terminates the last word
2459 output += self.process_word()
2460 return output
2461
2462 def process_word(self):
2463 output = ''
2464 if self.buffer[0] == ord('i'):
2465 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2466 elif self.buffer[0] == ord('o'):
2467 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2468 else:
2469 output = self.buffer.decode('ascii')
2470 if len(output) < self.o:
2471 output += '-'*self.o # pad out with hyphens
2472 if self.o:
2473 output = output[:self.o] # truncate to output length
2474 output += '.'
2475 self.buffer = bytearray()
2476 return output
2477
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002478 codecEnabled = False
2479
2480 @classmethod
2481 def lookupTestDecoder(cls, name):
2482 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002483 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002484 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002485 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002486 incrementalencoder=None,
2487 streamreader=None, streamwriter=None,
2488 incrementaldecoder=cls)
2489
2490# Register the previous decoder for testing.
2491# Disabled by default, tests will enable it.
2492codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2493
2494
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002495class StatefulIncrementalDecoderTest(unittest.TestCase):
2496 """
2497 Make sure the StatefulIncrementalDecoder actually works.
2498 """
2499
2500 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002501 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002502 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002503 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002504 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002505 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002506 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002507 # I=0, O=6 (variable-length input, fixed-length output)
2508 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2509 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002510 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002511 # I=6, O=3 (fixed-length input > fixed-length output)
2512 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2513 # I=0, then 3; O=29, then 15 (with longer output)
2514 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2515 'a----------------------------.' +
2516 'b----------------------------.' +
2517 'cde--------------------------.' +
2518 'abcdefghijabcde.' +
2519 'a.b------------.' +
2520 '.c.------------.' +
2521 'd.e------------.' +
2522 'k--------------.' +
2523 'l--------------.' +
2524 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002525 ]
2526
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002527 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002528 # Try a few one-shot test cases.
2529 for input, eof, output in self.test_cases:
2530 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002531 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002532
2533 # Also test an unfinished decode, followed by forcing EOF.
2534 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002535 self.assertEqual(d.decode(b'oiabcd'), '')
2536 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002537
2538class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002539
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002540 def setUp(self):
2541 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2542 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002543 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002544
Guido van Rossumd0712812007-04-11 16:32:43 +00002545 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002546 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002547
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002548 def test_constructor(self):
2549 r = self.BytesIO(b"\xc3\xa9\n\n")
2550 b = self.BufferedReader(r, 1000)
2551 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002552 t.__init__(b, encoding="latin-1", newline="\r\n")
2553 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002554 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002555 t.__init__(b, encoding="utf-8", line_buffering=True)
2556 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002557 self.assertEqual(t.line_buffering, True)
2558 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002559 self.assertRaises(TypeError, t.__init__, b, newline=42)
2560 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2561
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002562 def test_uninitialized(self):
2563 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2564 del t
2565 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2566 self.assertRaises(Exception, repr, t)
2567 self.assertRaisesRegex((ValueError, AttributeError),
2568 'uninitialized|has no attribute',
2569 t.read, 0)
2570 t.__init__(self.MockRawIO())
2571 self.assertEqual(t.read(0), '')
2572
Nick Coghlana9b15242014-02-04 22:11:18 +10002573 def test_non_text_encoding_codecs_are_rejected(self):
2574 # Ensure the constructor complains if passed a codec that isn't
2575 # marked as a text encoding
2576 # http://bugs.python.org/issue20404
2577 r = self.BytesIO()
2578 b = self.BufferedWriter(r)
2579 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2580 self.TextIOWrapper(b, encoding="hex")
2581
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002582 def test_detach(self):
2583 r = self.BytesIO()
2584 b = self.BufferedWriter(r)
2585 t = self.TextIOWrapper(b)
2586 self.assertIs(t.detach(), b)
2587
2588 t = self.TextIOWrapper(b, encoding="ascii")
2589 t.write("howdy")
2590 self.assertFalse(r.getvalue())
2591 t.detach()
2592 self.assertEqual(r.getvalue(), b"howdy")
2593 self.assertRaises(ValueError, t.detach)
2594
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002595 # Operations independent of the detached stream should still work
2596 repr(t)
2597 self.assertEqual(t.encoding, "ascii")
2598 self.assertEqual(t.errors, "strict")
2599 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002600 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002601
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002602 def test_repr(self):
2603 raw = self.BytesIO("hello".encode("utf-8"))
2604 b = self.BufferedReader(raw)
2605 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002606 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002607 self.assertRegex(repr(t),
2608 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002609 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002610 self.assertRegex(repr(t),
2611 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002612 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002613 self.assertRegex(repr(t),
2614 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002615 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002616 self.assertRegex(repr(t),
2617 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002618
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002619 t.buffer.detach()
2620 repr(t) # Should not raise an exception
2621
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002622 def test_recursive_repr(self):
2623 # Issue #25455
2624 raw = self.BytesIO()
2625 t = self.TextIOWrapper(raw)
2626 with support.swap_attr(raw, 'name', t):
2627 try:
2628 repr(t) # Should not crash
2629 except RuntimeError:
2630 pass
2631
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002632 def test_line_buffering(self):
2633 r = self.BytesIO()
2634 b = self.BufferedWriter(r, 1000)
2635 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002636 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002637 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002638 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002639 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002640 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002641 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002642
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002643 def test_reconfigure_line_buffering(self):
2644 r = self.BytesIO()
2645 b = self.BufferedWriter(r, 1000)
2646 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2647 t.write("AB\nC")
2648 self.assertEqual(r.getvalue(), b"")
2649
2650 t.reconfigure(line_buffering=True) # implicit flush
2651 self.assertEqual(r.getvalue(), b"AB\nC")
2652 t.write("DEF\nG")
2653 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2654 t.write("H")
2655 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2656 t.reconfigure(line_buffering=False) # implicit flush
2657 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2658 t.write("IJ")
2659 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2660
2661 # Keeping default value
2662 t.reconfigure()
2663 t.reconfigure(line_buffering=None)
2664 self.assertEqual(t.line_buffering, False)
2665 t.reconfigure(line_buffering=True)
2666 t.reconfigure()
2667 t.reconfigure(line_buffering=None)
2668 self.assertEqual(t.line_buffering, True)
2669
Victor Stinner91106cd2017-12-13 12:29:09 +01002670 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002671 def test_default_encoding(self):
2672 old_environ = dict(os.environ)
2673 try:
2674 # try to get a user preferred encoding different than the current
2675 # locale encoding to check that TextIOWrapper() uses the current
2676 # locale encoding and not the user preferred encoding
2677 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2678 if key in os.environ:
2679 del os.environ[key]
2680
2681 current_locale_encoding = locale.getpreferredencoding(False)
2682 b = self.BytesIO()
2683 t = self.TextIOWrapper(b)
2684 self.assertEqual(t.encoding, current_locale_encoding)
2685 finally:
2686 os.environ.clear()
2687 os.environ.update(old_environ)
2688
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002689 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002690 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002691 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002692 # Issue 15989
2693 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002694 b = self.BytesIO()
2695 b.fileno = lambda: _testcapi.INT_MAX + 1
2696 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2697 b.fileno = lambda: _testcapi.UINT_MAX + 1
2698 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2699
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002700 def test_encoding(self):
2701 # Check the encoding attribute is always set, and valid
2702 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002703 t = self.TextIOWrapper(b, encoding="utf-8")
2704 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002705 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002706 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002707 codecs.lookup(t.encoding)
2708
2709 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002710 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002711 b = self.BytesIO(b"abc\n\xff\n")
2712 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002713 self.assertRaises(UnicodeError, t.read)
2714 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002715 b = self.BytesIO(b"abc\n\xff\n")
2716 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002717 self.assertRaises(UnicodeError, t.read)
2718 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002719 b = self.BytesIO(b"abc\n\xff\n")
2720 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002721 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002722 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002723 b = self.BytesIO(b"abc\n\xff\n")
2724 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002725 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002726
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002727 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002728 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002729 b = self.BytesIO()
2730 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002731 self.assertRaises(UnicodeError, t.write, "\xff")
2732 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002733 b = self.BytesIO()
2734 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002735 self.assertRaises(UnicodeError, t.write, "\xff")
2736 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002737 b = self.BytesIO()
2738 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002739 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002740 t.write("abc\xffdef\n")
2741 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002742 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002743 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002744 b = self.BytesIO()
2745 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002746 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002747 t.write("abc\xffdef\n")
2748 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002749 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002751 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002752 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2753
2754 tests = [
2755 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002756 [ '', input_lines ],
2757 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2758 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2759 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002760 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002761 encodings = (
2762 'utf-8', 'latin-1',
2763 'utf-16', 'utf-16-le', 'utf-16-be',
2764 'utf-32', 'utf-32-le', 'utf-32-be',
2765 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002766
Guido van Rossum8358db22007-08-18 21:39:55 +00002767 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002768 # character in TextIOWrapper._pending_line.
2769 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002770 # XXX: str.encode() should return bytes
2771 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002772 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002773 for bufsize in range(1, 10):
2774 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002775 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2776 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002777 encoding=encoding)
2778 if do_reads:
2779 got_lines = []
2780 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002781 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002782 if c2 == '':
2783 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002784 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002785 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002786 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002787 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002788
2789 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002790 self.assertEqual(got_line, exp_line)
2791 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002792
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002793 def test_newlines_input(self):
2794 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002795 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2796 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002797 (None, normalized.decode("ascii").splitlines(keepends=True)),
2798 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002799 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2800 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2801 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002802 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002803 buf = self.BytesIO(testdata)
2804 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002805 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002806 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002807 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002808
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002809 def test_newlines_output(self):
2810 testdict = {
2811 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2812 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2813 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2814 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2815 }
2816 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2817 for newline, expected in tests:
2818 buf = self.BytesIO()
2819 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2820 txt.write("AAA\nB")
2821 txt.write("BB\nCCC\n")
2822 txt.write("X\rY\r\nZ")
2823 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002824 self.assertEqual(buf.closed, False)
2825 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002826
2827 def test_destructor(self):
2828 l = []
2829 base = self.BytesIO
2830 class MyBytesIO(base):
2831 def close(self):
2832 l.append(self.getvalue())
2833 base.close(self)
2834 b = MyBytesIO()
2835 t = self.TextIOWrapper(b, encoding="ascii")
2836 t.write("abc")
2837 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002838 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002839 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002840
2841 def test_override_destructor(self):
2842 record = []
2843 class MyTextIO(self.TextIOWrapper):
2844 def __del__(self):
2845 record.append(1)
2846 try:
2847 f = super().__del__
2848 except AttributeError:
2849 pass
2850 else:
2851 f()
2852 def close(self):
2853 record.append(2)
2854 super().close()
2855 def flush(self):
2856 record.append(3)
2857 super().flush()
2858 b = self.BytesIO()
2859 t = MyTextIO(b, encoding="ascii")
2860 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002861 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002862 self.assertEqual(record, [1, 2, 3])
2863
2864 def test_error_through_destructor(self):
2865 # Test that the exception state is not modified by a destructor,
2866 # even if close() fails.
2867 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002868 with support.catch_unraisable_exception() as cm:
2869 with self.assertRaises(AttributeError):
2870 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002871
2872 if not IOBASE_EMITS_UNRAISABLE:
2873 self.assertIsNone(cm.unraisable)
2874 elif cm.unraisable is not None:
2875 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002876
Guido van Rossum9b76da62007-04-11 01:09:03 +00002877 # Systematic tests of the text I/O API
2878
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002879 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002880 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002881 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002882 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002883 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002884 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002885 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002886 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002887 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002888 self.assertEqual(f.tell(), 0)
2889 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002890 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002891 self.assertEqual(f.seek(0), 0)
2892 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002893 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002894 self.assertEqual(f.read(2), "ab")
2895 self.assertEqual(f.read(1), "c")
2896 self.assertEqual(f.read(1), "")
2897 self.assertEqual(f.read(), "")
2898 self.assertEqual(f.tell(), cookie)
2899 self.assertEqual(f.seek(0), 0)
2900 self.assertEqual(f.seek(0, 2), cookie)
2901 self.assertEqual(f.write("def"), 3)
2902 self.assertEqual(f.seek(cookie), cookie)
2903 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002904 if enc.startswith("utf"):
2905 self.multi_line_test(f, enc)
2906 f.close()
2907
2908 def multi_line_test(self, f, enc):
2909 f.seek(0)
2910 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002911 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002912 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002913 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002914 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002915 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002916 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002917 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002918 wlines.append((f.tell(), line))
2919 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002920 f.seek(0)
2921 rlines = []
2922 while True:
2923 pos = f.tell()
2924 line = f.readline()
2925 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002926 break
2927 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002928 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002929
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002930 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002931 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002932 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002933 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002934 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002935 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002936 p2 = f.tell()
2937 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002938 self.assertEqual(f.tell(), p0)
2939 self.assertEqual(f.readline(), "\xff\n")
2940 self.assertEqual(f.tell(), p1)
2941 self.assertEqual(f.readline(), "\xff\n")
2942 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002943 f.seek(0)
2944 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002945 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002946 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002947 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002948 f.close()
2949
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002950 def test_seeking(self):
2951 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002952 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002953 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002954 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002955 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002956 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002957 suffix = bytes(u_suffix.encode("utf-8"))
2958 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002959 with self.open(support.TESTFN, "wb") as f:
2960 f.write(line*2)
2961 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2962 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002963 self.assertEqual(s, str(prefix, "ascii"))
2964 self.assertEqual(f.tell(), prefix_size)
2965 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002966
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002967 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002968 # Regression test for a specific bug
2969 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002970 with self.open(support.TESTFN, "wb") as f:
2971 f.write(data)
2972 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2973 f._CHUNK_SIZE # Just test that it exists
2974 f._CHUNK_SIZE = 2
2975 f.readline()
2976 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002977
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002978 def test_seek_and_tell(self):
2979 #Test seek/tell using the StatefulIncrementalDecoder.
2980 # Make test faster by doing smaller seeks
2981 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002982
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002983 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002984 """Tell/seek to various points within a data stream and ensure
2985 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002986 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002987 f.write(data)
2988 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002989 f = self.open(support.TESTFN, encoding='test_decoder')
2990 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002991 decoded = f.read()
2992 f.close()
2993
Neal Norwitze2b07052008-03-18 19:52:05 +00002994 for i in range(min_pos, len(decoded) + 1): # seek positions
2995 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002996 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002997 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002998 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002999 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003000 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003001 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003002 f.close()
3003
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003004 # Enable the test decoder.
3005 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003006
3007 # Run the tests.
3008 try:
3009 # Try each test case.
3010 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003011 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003012
3013 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003014 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3015 offset = CHUNK_SIZE - len(input)//2
3016 prefix = b'.'*offset
3017 # Don't bother seeking into the prefix (takes too long).
3018 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003019 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003020
3021 # Ensure our test decoder won't interfere with subsequent tests.
3022 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003023 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003024
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003025 def test_multibyte_seek_and_tell(self):
3026 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3027 f.write("AB\n\u3046\u3048\n")
3028 f.close()
3029
3030 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3031 self.assertEqual(f.readline(), "AB\n")
3032 p0 = f.tell()
3033 self.assertEqual(f.readline(), "\u3046\u3048\n")
3034 p1 = f.tell()
3035 f.seek(p0)
3036 self.assertEqual(f.readline(), "\u3046\u3048\n")
3037 self.assertEqual(f.tell(), p1)
3038 f.close()
3039
3040 def test_seek_with_encoder_state(self):
3041 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3042 f.write("\u00e6\u0300")
3043 p0 = f.tell()
3044 f.write("\u00e6")
3045 f.seek(p0)
3046 f.write("\u0300")
3047 f.close()
3048
3049 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3050 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3051 f.close()
3052
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003053 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003054 data = "1234567890"
3055 tests = ("utf-16",
3056 "utf-16-le",
3057 "utf-16-be",
3058 "utf-32",
3059 "utf-32-le",
3060 "utf-32-be")
3061 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003062 buf = self.BytesIO()
3063 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003064 # Check if the BOM is written only once (see issue1753).
3065 f.write(data)
3066 f.write(data)
3067 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003068 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003069 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003070 self.assertEqual(f.read(), data * 2)
3071 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003072
Benjamin Petersona1b49012009-03-31 23:11:32 +00003073 def test_unreadable(self):
3074 class UnReadable(self.BytesIO):
3075 def readable(self):
3076 return False
3077 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003078 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003079
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003080 def test_read_one_by_one(self):
3081 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003082 reads = ""
3083 while True:
3084 c = txt.read(1)
3085 if not c:
3086 break
3087 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003088 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003089
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003090 def test_readlines(self):
3091 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3092 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3093 txt.seek(0)
3094 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3095 txt.seek(0)
3096 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3097
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003098 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003099 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003100 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003101 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003102 reads = ""
3103 while True:
3104 c = txt.read(128)
3105 if not c:
3106 break
3107 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003108 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003109
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003110 def test_writelines(self):
3111 l = ['ab', 'cd', 'ef']
3112 buf = self.BytesIO()
3113 txt = self.TextIOWrapper(buf)
3114 txt.writelines(l)
3115 txt.flush()
3116 self.assertEqual(buf.getvalue(), b'abcdef')
3117
3118 def test_writelines_userlist(self):
3119 l = UserList(['ab', 'cd', 'ef'])
3120 buf = self.BytesIO()
3121 txt = self.TextIOWrapper(buf)
3122 txt.writelines(l)
3123 txt.flush()
3124 self.assertEqual(buf.getvalue(), b'abcdef')
3125
3126 def test_writelines_error(self):
3127 txt = self.TextIOWrapper(self.BytesIO())
3128 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3129 self.assertRaises(TypeError, txt.writelines, None)
3130 self.assertRaises(TypeError, txt.writelines, b'abc')
3131
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003132 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003133 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003134
3135 # read one char at a time
3136 reads = ""
3137 while True:
3138 c = txt.read(1)
3139 if not c:
3140 break
3141 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003142 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003143
3144 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003145 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003146 txt._CHUNK_SIZE = 4
3147
3148 reads = ""
3149 while True:
3150 c = txt.read(4)
3151 if not c:
3152 break
3153 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003154 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003155
3156 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003157 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003158 txt._CHUNK_SIZE = 4
3159
3160 reads = txt.read(4)
3161 reads += txt.read(4)
3162 reads += txt.readline()
3163 reads += txt.readline()
3164 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003165 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003166
3167 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003168 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003169 txt._CHUNK_SIZE = 4
3170
3171 reads = txt.read(4)
3172 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003173 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003174
3175 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003176 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003177 txt._CHUNK_SIZE = 4
3178
3179 reads = txt.read(4)
3180 pos = txt.tell()
3181 txt.seek(0)
3182 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003183 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003184
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003185 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003186 buffer = self.BytesIO(self.testdata)
3187 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003188
3189 self.assertEqual(buffer.seekable(), txt.seekable())
3190
Antoine Pitroue4501852009-05-14 18:55:55 +00003191 def test_append_bom(self):
3192 # The BOM is not written again when appending to a non-empty file
3193 filename = support.TESTFN
3194 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3195 with self.open(filename, 'w', encoding=charset) as f:
3196 f.write('aaa')
3197 pos = f.tell()
3198 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003199 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003200
3201 with self.open(filename, 'a', encoding=charset) as f:
3202 f.write('xxx')
3203 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003204 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003205
3206 def test_seek_bom(self):
3207 # Same test, but when seeking manually
3208 filename = support.TESTFN
3209 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3210 with self.open(filename, 'w', encoding=charset) as f:
3211 f.write('aaa')
3212 pos = f.tell()
3213 with self.open(filename, 'r+', encoding=charset) as f:
3214 f.seek(pos)
3215 f.write('zzz')
3216 f.seek(0)
3217 f.write('bbb')
3218 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003219 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003220
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003221 def test_seek_append_bom(self):
3222 # Same test, but first seek to the start and then to the end
3223 filename = support.TESTFN
3224 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3225 with self.open(filename, 'w', encoding=charset) as f:
3226 f.write('aaa')
3227 with self.open(filename, 'a', encoding=charset) as f:
3228 f.seek(0)
3229 f.seek(0, self.SEEK_END)
3230 f.write('xxx')
3231 with self.open(filename, 'rb') as f:
3232 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3233
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003234 def test_errors_property(self):
3235 with self.open(support.TESTFN, "w") as f:
3236 self.assertEqual(f.errors, "strict")
3237 with self.open(support.TESTFN, "w", errors="replace") as f:
3238 self.assertEqual(f.errors, "replace")
3239
Brett Cannon31f59292011-02-21 19:29:56 +00003240 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003241 def test_threads_write(self):
3242 # Issue6750: concurrent writes could duplicate data
3243 event = threading.Event()
3244 with self.open(support.TESTFN, "w", buffering=1) as f:
3245 def run(n):
3246 text = "Thread%03d\n" % n
3247 event.wait()
3248 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003249 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003250 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003251 with support.start_threads(threads, event.set):
3252 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003253 with self.open(support.TESTFN) as f:
3254 content = f.read()
3255 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003256 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003257
Antoine Pitrou6be88762010-05-03 16:48:20 +00003258 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003259 # Test that text file is closed despite failed flush
3260 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003261 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003262 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003263 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003264 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003265 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003266 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003267 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003268 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003269 self.assertTrue(txt.buffer.closed)
3270 self.assertTrue(closed) # flush() called
3271 self.assertFalse(closed[0]) # flush() called before file closed
3272 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003273 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003274
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003275 def test_close_error_on_close(self):
3276 buffer = self.BytesIO(self.testdata)
3277 def bad_flush():
3278 raise OSError('flush')
3279 def bad_close():
3280 raise OSError('close')
3281 buffer.close = bad_close
3282 txt = self.TextIOWrapper(buffer, encoding="ascii")
3283 txt.flush = bad_flush
3284 with self.assertRaises(OSError) as err: # exception not swallowed
3285 txt.close()
3286 self.assertEqual(err.exception.args, ('close',))
3287 self.assertIsInstance(err.exception.__context__, OSError)
3288 self.assertEqual(err.exception.__context__.args, ('flush',))
3289 self.assertFalse(txt.closed)
3290
Victor Stinner472f7942019-04-12 21:58:24 +02003291 # Silence destructor error
3292 buffer.close = lambda: None
3293 txt.flush = lambda: None
3294
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003295 def test_nonnormalized_close_error_on_close(self):
3296 # Issue #21677
3297 buffer = self.BytesIO(self.testdata)
3298 def bad_flush():
3299 raise non_existing_flush
3300 def bad_close():
3301 raise non_existing_close
3302 buffer.close = bad_close
3303 txt = self.TextIOWrapper(buffer, encoding="ascii")
3304 txt.flush = bad_flush
3305 with self.assertRaises(NameError) as err: # exception not swallowed
3306 txt.close()
3307 self.assertIn('non_existing_close', str(err.exception))
3308 self.assertIsInstance(err.exception.__context__, NameError)
3309 self.assertIn('non_existing_flush', str(err.exception.__context__))
3310 self.assertFalse(txt.closed)
3311
Victor Stinner472f7942019-04-12 21:58:24 +02003312 # Silence destructor error
3313 buffer.close = lambda: None
3314 txt.flush = lambda: None
3315
Antoine Pitrou6be88762010-05-03 16:48:20 +00003316 def test_multi_close(self):
3317 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3318 txt.close()
3319 txt.close()
3320 txt.close()
3321 self.assertRaises(ValueError, txt.flush)
3322
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003323 def test_unseekable(self):
3324 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3325 self.assertRaises(self.UnsupportedOperation, txt.tell)
3326 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3327
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003328 def test_readonly_attributes(self):
3329 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3330 buf = self.BytesIO(self.testdata)
3331 with self.assertRaises(AttributeError):
3332 txt.buffer = buf
3333
Antoine Pitroue96ec682011-07-23 21:46:35 +02003334 def test_rawio(self):
3335 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3336 # that subprocess.Popen() can have the required unbuffered
3337 # semantics with universal_newlines=True.
3338 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3339 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3340 # Reads
3341 self.assertEqual(txt.read(4), 'abcd')
3342 self.assertEqual(txt.readline(), 'efghi\n')
3343 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3344
3345 def test_rawio_write_through(self):
3346 # Issue #12591: with write_through=True, writes don't need a flush
3347 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3348 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3349 write_through=True)
3350 txt.write('1')
3351 txt.write('23\n4')
3352 txt.write('5')
3353 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3354
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003355 def test_bufio_write_through(self):
3356 # Issue #21396: write_through=True doesn't force a flush()
3357 # on the underlying binary buffered object.
3358 flush_called, write_called = [], []
3359 class BufferedWriter(self.BufferedWriter):
3360 def flush(self, *args, **kwargs):
3361 flush_called.append(True)
3362 return super().flush(*args, **kwargs)
3363 def write(self, *args, **kwargs):
3364 write_called.append(True)
3365 return super().write(*args, **kwargs)
3366
3367 rawio = self.BytesIO()
3368 data = b"a"
3369 bufio = BufferedWriter(rawio, len(data)*2)
3370 textio = self.TextIOWrapper(bufio, encoding='ascii',
3371 write_through=True)
3372 # write to the buffered io but don't overflow the buffer
3373 text = data.decode('ascii')
3374 textio.write(text)
3375
3376 # buffer.flush is not called with write_through=True
3377 self.assertFalse(flush_called)
3378 # buffer.write *is* called with write_through=True
3379 self.assertTrue(write_called)
3380 self.assertEqual(rawio.getvalue(), b"") # no flush
3381
3382 write_called = [] # reset
3383 textio.write(text * 10) # total content is larger than bufio buffer
3384 self.assertTrue(write_called)
3385 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3386
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003387 def test_reconfigure_write_through(self):
3388 raw = self.MockRawIO([])
3389 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3390 t.write('1')
3391 t.reconfigure(write_through=True) # implied flush
3392 self.assertEqual(t.write_through, True)
3393 self.assertEqual(b''.join(raw._write_stack), b'1')
3394 t.write('23')
3395 self.assertEqual(b''.join(raw._write_stack), b'123')
3396 t.reconfigure(write_through=False)
3397 self.assertEqual(t.write_through, False)
3398 t.write('45')
3399 t.flush()
3400 self.assertEqual(b''.join(raw._write_stack), b'12345')
3401 # Keeping default value
3402 t.reconfigure()
3403 t.reconfigure(write_through=None)
3404 self.assertEqual(t.write_through, False)
3405 t.reconfigure(write_through=True)
3406 t.reconfigure()
3407 t.reconfigure(write_through=None)
3408 self.assertEqual(t.write_through, True)
3409
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003410 def test_read_nonbytes(self):
3411 # Issue #17106
3412 # Crash when underlying read() returns non-bytes
3413 t = self.TextIOWrapper(self.StringIO('a'))
3414 self.assertRaises(TypeError, t.read, 1)
3415 t = self.TextIOWrapper(self.StringIO('a'))
3416 self.assertRaises(TypeError, t.readline)
3417 t = self.TextIOWrapper(self.StringIO('a'))
3418 self.assertRaises(TypeError, t.read)
3419
Oren Milmana5b4ea12017-08-25 21:14:54 +03003420 def test_illegal_encoder(self):
3421 # Issue 31271: Calling write() while the return value of encoder's
3422 # encode() is invalid shouldn't cause an assertion failure.
3423 rot13 = codecs.lookup("rot13")
3424 with support.swap_attr(rot13, '_is_text_encoding', True):
3425 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3426 self.assertRaises(TypeError, t.write, 'bar')
3427
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003428 def test_illegal_decoder(self):
3429 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003430 # Bypass the early encoding check added in issue 20404
3431 def _make_illegal_wrapper():
3432 quopri = codecs.lookup("quopri")
3433 quopri._is_text_encoding = True
3434 try:
3435 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3436 newline='\n', encoding="quopri")
3437 finally:
3438 quopri._is_text_encoding = False
3439 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003440 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003441 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003442 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003443 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003444 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003445 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003446 self.assertRaises(TypeError, t.read)
3447
Oren Milmanba7d7362017-08-29 11:58:27 +03003448 # Issue 31243: calling read() while the return value of decoder's
3449 # getstate() is invalid should neither crash the interpreter nor
3450 # raise a SystemError.
3451 def _make_very_illegal_wrapper(getstate_ret_val):
3452 class BadDecoder:
3453 def getstate(self):
3454 return getstate_ret_val
3455 def _get_bad_decoder(dummy):
3456 return BadDecoder()
3457 quopri = codecs.lookup("quopri")
3458 with support.swap_attr(quopri, 'incrementaldecoder',
3459 _get_bad_decoder):
3460 return _make_illegal_wrapper()
3461 t = _make_very_illegal_wrapper(42)
3462 self.assertRaises(TypeError, t.read, 42)
3463 t = _make_very_illegal_wrapper(())
3464 self.assertRaises(TypeError, t.read, 42)
3465 t = _make_very_illegal_wrapper((1, 2))
3466 self.assertRaises(TypeError, t.read, 42)
3467
Antoine Pitrou712cb732013-12-21 15:51:54 +01003468 def _check_create_at_shutdown(self, **kwargs):
3469 # Issue #20037: creating a TextIOWrapper at shutdown
3470 # shouldn't crash the interpreter.
3471 iomod = self.io.__name__
3472 code = """if 1:
3473 import codecs
3474 import {iomod} as io
3475
3476 # Avoid looking up codecs at shutdown
3477 codecs.lookup('utf-8')
3478
3479 class C:
3480 def __init__(self):
3481 self.buf = io.BytesIO()
3482 def __del__(self):
3483 io.TextIOWrapper(self.buf, **{kwargs})
3484 print("ok")
3485 c = C()
3486 """.format(iomod=iomod, kwargs=kwargs)
3487 return assert_python_ok("-c", code)
3488
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003489 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003490 def test_create_at_shutdown_without_encoding(self):
3491 rc, out, err = self._check_create_at_shutdown()
3492 if err:
3493 # Can error out with a RuntimeError if the module state
3494 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003495 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003496 else:
3497 self.assertEqual("ok", out.decode().strip())
3498
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003499 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003500 def test_create_at_shutdown_with_encoding(self):
3501 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3502 errors='strict')
3503 self.assertFalse(err)
3504 self.assertEqual("ok", out.decode().strip())
3505
Antoine Pitroub8503892014-04-29 10:14:02 +02003506 def test_read_byteslike(self):
3507 r = MemviewBytesIO(b'Just some random string\n')
3508 t = self.TextIOWrapper(r, 'utf-8')
3509
3510 # TextIOwrapper will not read the full string, because
3511 # we truncate it to a multiple of the native int size
3512 # so that we can construct a more complex memoryview.
3513 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3514
3515 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3516
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003517 def test_issue22849(self):
3518 class F(object):
3519 def readable(self): return True
3520 def writable(self): return True
3521 def seekable(self): return True
3522
3523 for i in range(10):
3524 try:
3525 self.TextIOWrapper(F(), encoding='utf-8')
3526 except Exception:
3527 pass
3528
3529 F.tell = lambda x: 0
3530 t = self.TextIOWrapper(F(), encoding='utf-8')
3531
INADA Naoki507434f2017-12-21 09:59:53 +09003532 def test_reconfigure_encoding_read(self):
3533 # latin1 -> utf8
3534 # (latin1 can decode utf-8 encoded string)
3535 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3536 raw = self.BytesIO(data)
3537 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3538 self.assertEqual(txt.readline(), 'abc\xe9\n')
3539 with self.assertRaises(self.UnsupportedOperation):
3540 txt.reconfigure(encoding='utf-8')
3541 with self.assertRaises(self.UnsupportedOperation):
3542 txt.reconfigure(newline=None)
3543
3544 def test_reconfigure_write_fromascii(self):
3545 # ascii has a specific encodefunc in the C implementation,
3546 # but utf-8-sig has not. Make sure that we get rid of the
3547 # cached encodefunc when we switch encoders.
3548 raw = self.BytesIO()
3549 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3550 txt.write('foo\n')
3551 txt.reconfigure(encoding='utf-8-sig')
3552 txt.write('\xe9\n')
3553 txt.flush()
3554 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3555
3556 def test_reconfigure_write(self):
3557 # latin -> utf8
3558 raw = self.BytesIO()
3559 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3560 txt.write('abc\xe9\n')
3561 txt.reconfigure(encoding='utf-8')
3562 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3563 txt.write('d\xe9f\n')
3564 txt.flush()
3565 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3566
3567 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3568 # the file
3569 raw = self.BytesIO()
3570 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3571 txt.write('abc\n')
3572 txt.reconfigure(encoding='utf-8-sig')
3573 txt.write('d\xe9f\n')
3574 txt.flush()
3575 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3576
3577 def test_reconfigure_write_non_seekable(self):
3578 raw = self.BytesIO()
3579 raw.seekable = lambda: False
3580 raw.seek = None
3581 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3582 txt.write('abc\n')
3583 txt.reconfigure(encoding='utf-8-sig')
3584 txt.write('d\xe9f\n')
3585 txt.flush()
3586
3587 # If the raw stream is not seekable, there'll be a BOM
3588 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3589
3590 def test_reconfigure_defaults(self):
3591 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3592 txt.reconfigure(encoding=None)
3593 self.assertEqual(txt.encoding, 'ascii')
3594 self.assertEqual(txt.errors, 'replace')
3595 txt.write('LF\n')
3596
3597 txt.reconfigure(newline='\r\n')
3598 self.assertEqual(txt.encoding, 'ascii')
3599 self.assertEqual(txt.errors, 'replace')
3600
3601 txt.reconfigure(errors='ignore')
3602 self.assertEqual(txt.encoding, 'ascii')
3603 self.assertEqual(txt.errors, 'ignore')
3604 txt.write('CRLF\n')
3605
3606 txt.reconfigure(encoding='utf-8', newline=None)
3607 self.assertEqual(txt.errors, 'strict')
3608 txt.seek(0)
3609 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3610
3611 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3612
3613 def test_reconfigure_newline(self):
3614 raw = self.BytesIO(b'CR\rEOF')
3615 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3616 txt.reconfigure(newline=None)
3617 self.assertEqual(txt.readline(), 'CR\n')
3618 raw = self.BytesIO(b'CR\rEOF')
3619 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3620 txt.reconfigure(newline='')
3621 self.assertEqual(txt.readline(), 'CR\r')
3622 raw = self.BytesIO(b'CR\rLF\nEOF')
3623 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3624 txt.reconfigure(newline='\n')
3625 self.assertEqual(txt.readline(), 'CR\rLF\n')
3626 raw = self.BytesIO(b'LF\nCR\rEOF')
3627 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3628 txt.reconfigure(newline='\r')
3629 self.assertEqual(txt.readline(), 'LF\nCR\r')
3630 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3631 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3632 txt.reconfigure(newline='\r\n')
3633 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3634
3635 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3636 txt.reconfigure(newline=None)
3637 txt.write('linesep\n')
3638 txt.reconfigure(newline='')
3639 txt.write('LF\n')
3640 txt.reconfigure(newline='\n')
3641 txt.write('LF\n')
3642 txt.reconfigure(newline='\r')
3643 txt.write('CR\n')
3644 txt.reconfigure(newline='\r\n')
3645 txt.write('CRLF\n')
3646 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3647 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3648
Zackery Spytz23db9352018-06-29 04:14:58 -06003649 def test_issue25862(self):
3650 # Assertion failures occurred in tell() after read() and write().
3651 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3652 t.read(1)
3653 t.read()
3654 t.tell()
3655 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3656 t.read(1)
3657 t.write('x')
3658 t.tell()
3659
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003660
Antoine Pitroub8503892014-04-29 10:14:02 +02003661class MemviewBytesIO(io.BytesIO):
3662 '''A BytesIO object whose read method returns memoryviews
3663 rather than bytes'''
3664
3665 def read1(self, len_):
3666 return _to_memoryview(super().read1(len_))
3667
3668 def read(self, len_):
3669 return _to_memoryview(super().read(len_))
3670
3671def _to_memoryview(buf):
3672 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3673
3674 arr = array.array('i')
3675 idx = len(buf) - len(buf) % arr.itemsize
3676 arr.frombytes(buf[:idx])
3677 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003678
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003680class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003681 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003682 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003683
3684 def test_initialization(self):
3685 r = self.BytesIO(b"\xc3\xa9\n\n")
3686 b = self.BufferedReader(r, 1000)
3687 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003688 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3689 self.assertRaises(ValueError, t.read)
3690
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003691 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3692 self.assertRaises(Exception, repr, t)
3693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003694 def test_garbage_collection(self):
3695 # C TextIOWrapper objects are collected, and collecting them flushes
3696 # all data to disk.
3697 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003698 with support.check_warnings(('', ResourceWarning)):
3699 rawio = io.FileIO(support.TESTFN, "wb")
3700 b = self.BufferedWriter(rawio)
3701 t = self.TextIOWrapper(b, encoding="ascii")
3702 t.write("456def")
3703 t.x = t
3704 wr = weakref.ref(t)
3705 del t
3706 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003707 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003708 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003709 self.assertEqual(f.read(), b"456def")
3710
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003711 def test_rwpair_cleared_before_textio(self):
3712 # Issue 13070: TextIOWrapper's finalization would crash when called
3713 # after the reference to the underlying BufferedRWPair's writer got
3714 # cleared by the GC.
3715 for i in range(1000):
3716 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3717 t1 = self.TextIOWrapper(b1, encoding="ascii")
3718 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3719 t2 = self.TextIOWrapper(b2, encoding="ascii")
3720 # circular references
3721 t1.buddy = t2
3722 t2.buddy = t1
3723 support.gc_collect()
3724
Zackery Spytz842acaa2018-12-17 07:52:45 -07003725 def test_del__CHUNK_SIZE_SystemError(self):
3726 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3727 with self.assertRaises(AttributeError):
3728 del t._CHUNK_SIZE
3729
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003730
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003731class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003732 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003733 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003734
3735
3736class IncrementalNewlineDecoderTest(unittest.TestCase):
3737
3738 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003739 # UTF-8 specific tests for a newline decoder
3740 def _check_decode(b, s, **kwargs):
3741 # We exercise getstate() / setstate() as well as decode()
3742 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003743 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003744 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003745 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003746
Antoine Pitrou180a3362008-12-14 16:36:46 +00003747 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003748
Antoine Pitrou180a3362008-12-14 16:36:46 +00003749 _check_decode(b'\xe8', "")
3750 _check_decode(b'\xa2', "")
3751 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003752
Antoine Pitrou180a3362008-12-14 16:36:46 +00003753 _check_decode(b'\xe8', "")
3754 _check_decode(b'\xa2', "")
3755 _check_decode(b'\x88', "\u8888")
3756
3757 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003758 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3759
Antoine Pitrou180a3362008-12-14 16:36:46 +00003760 decoder.reset()
3761 _check_decode(b'\n', "\n")
3762 _check_decode(b'\r', "")
3763 _check_decode(b'', "\n", final=True)
3764 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003765
Antoine Pitrou180a3362008-12-14 16:36:46 +00003766 _check_decode(b'\r', "")
3767 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003768
Antoine Pitrou180a3362008-12-14 16:36:46 +00003769 _check_decode(b'\r\r\n', "\n\n")
3770 _check_decode(b'\r', "")
3771 _check_decode(b'\r', "\n")
3772 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003773
Antoine Pitrou180a3362008-12-14 16:36:46 +00003774 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3775 _check_decode(b'\xe8\xa2\x88', "\u8888")
3776 _check_decode(b'\n', "\n")
3777 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3778 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003779
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003780 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003781 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003782 if encoding is not None:
3783 encoder = codecs.getincrementalencoder(encoding)()
3784 def _decode_bytewise(s):
3785 # Decode one byte at a time
3786 for b in encoder.encode(s):
3787 result.append(decoder.decode(bytes([b])))
3788 else:
3789 encoder = None
3790 def _decode_bytewise(s):
3791 # Decode one char at a time
3792 for c in s:
3793 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003794 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003795 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003796 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003797 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003798 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003799 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003800 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003801 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003802 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003803 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003804 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003805 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003806 input = "abc"
3807 if encoder is not None:
3808 encoder.reset()
3809 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003810 self.assertEqual(decoder.decode(input), "abc")
3811 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003812
3813 def test_newline_decoder(self):
3814 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003815 # None meaning the IncrementalNewlineDecoder takes unicode input
3816 # rather than bytes input
3817 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003818 'utf-16', 'utf-16-le', 'utf-16-be',
3819 'utf-32', 'utf-32-le', 'utf-32-be',
3820 )
3821 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003822 decoder = enc and codecs.getincrementaldecoder(enc)()
3823 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3824 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003825 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003826 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3827 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003828 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003829
Antoine Pitrou66913e22009-03-06 23:40:56 +00003830 def test_newline_bytes(self):
3831 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3832 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003833 self.assertEqual(dec.newlines, None)
3834 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3835 self.assertEqual(dec.newlines, None)
3836 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3837 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003838 dec = self.IncrementalNewlineDecoder(None, translate=False)
3839 _check(dec)
3840 dec = self.IncrementalNewlineDecoder(None, translate=True)
3841 _check(dec)
3842
Xiang Zhangb08746b2018-10-31 19:49:16 +08003843 def test_translate(self):
3844 # issue 35062
3845 for translate in (-2, -1, 1, 2):
3846 decoder = codecs.getincrementaldecoder("utf-8")()
3847 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3848 self.check_newline_decoding_utf8(decoder)
3849 decoder = codecs.getincrementaldecoder("utf-8")()
3850 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3851 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3852
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003853class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3854 pass
3855
3856class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3857 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003858
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003859
Guido van Rossum01a27522007-03-07 01:00:12 +00003860# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003861
Guido van Rossum5abbf752007-08-27 17:39:33 +00003862class MiscIOTest(unittest.TestCase):
3863
Barry Warsaw40e82462008-11-20 20:14:50 +00003864 def tearDown(self):
3865 support.unlink(support.TESTFN)
3866
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003867 def test___all__(self):
3868 for name in self.io.__all__:
3869 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003870 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003871 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003872 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003873 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003874 self.assertTrue(issubclass(obj, Exception), name)
3875 elif not name.startswith("SEEK_"):
3876 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003877
Barry Warsaw40e82462008-11-20 20:14:50 +00003878 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003879 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003880 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003881 f.close()
3882
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003883 with support.check_warnings(('', DeprecationWarning)):
3884 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003885 self.assertEqual(f.name, support.TESTFN)
3886 self.assertEqual(f.buffer.name, support.TESTFN)
3887 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3888 self.assertEqual(f.mode, "U")
3889 self.assertEqual(f.buffer.mode, "rb")
3890 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003891 f.close()
3892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003893 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003894 self.assertEqual(f.mode, "w+")
3895 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3896 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003897
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003898 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003899 self.assertEqual(g.mode, "wb")
3900 self.assertEqual(g.raw.mode, "wb")
3901 self.assertEqual(g.name, f.fileno())
3902 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003903 f.close()
3904 g.close()
3905
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003906 def test_io_after_close(self):
3907 for kwargs in [
3908 {"mode": "w"},
3909 {"mode": "wb"},
3910 {"mode": "w", "buffering": 1},
3911 {"mode": "w", "buffering": 2},
3912 {"mode": "wb", "buffering": 0},
3913 {"mode": "r"},
3914 {"mode": "rb"},
3915 {"mode": "r", "buffering": 1},
3916 {"mode": "r", "buffering": 2},
3917 {"mode": "rb", "buffering": 0},
3918 {"mode": "w+"},
3919 {"mode": "w+b"},
3920 {"mode": "w+", "buffering": 1},
3921 {"mode": "w+", "buffering": 2},
3922 {"mode": "w+b", "buffering": 0},
3923 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003924 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003925 f.close()
3926 self.assertRaises(ValueError, f.flush)
3927 self.assertRaises(ValueError, f.fileno)
3928 self.assertRaises(ValueError, f.isatty)
3929 self.assertRaises(ValueError, f.__iter__)
3930 if hasattr(f, "peek"):
3931 self.assertRaises(ValueError, f.peek, 1)
3932 self.assertRaises(ValueError, f.read)
3933 if hasattr(f, "read1"):
3934 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003935 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003936 if hasattr(f, "readall"):
3937 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003938 if hasattr(f, "readinto"):
3939 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003940 if hasattr(f, "readinto1"):
3941 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003942 self.assertRaises(ValueError, f.readline)
3943 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003944 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003945 self.assertRaises(ValueError, f.seek, 0)
3946 self.assertRaises(ValueError, f.tell)
3947 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003948 self.assertRaises(ValueError, f.write,
3949 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003950 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003951 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003952
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003953 def test_blockingioerror(self):
3954 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003955 class C(str):
3956 pass
3957 c = C("")
3958 b = self.BlockingIOError(1, c)
3959 c.b = b
3960 b.c = c
3961 wr = weakref.ref(c)
3962 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003963 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003964 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003965
3966 def test_abcs(self):
3967 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003968 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3969 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3970 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3971 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003972
3973 def _check_abc_inheritance(self, abcmodule):
3974 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003975 self.assertIsInstance(f, abcmodule.IOBase)
3976 self.assertIsInstance(f, abcmodule.RawIOBase)
3977 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3978 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003979 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003980 self.assertIsInstance(f, abcmodule.IOBase)
3981 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3982 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3983 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003984 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003985 self.assertIsInstance(f, abcmodule.IOBase)
3986 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3987 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3988 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003989
3990 def test_abc_inheritance(self):
3991 # Test implementations inherit from their respective ABCs
3992 self._check_abc_inheritance(self)
3993
3994 def test_abc_inheritance_official(self):
3995 # Test implementations inherit from the official ABCs of the
3996 # baseline "io" module.
3997 self._check_abc_inheritance(io)
3998
Antoine Pitroue033e062010-10-29 10:38:18 +00003999 def _check_warn_on_dealloc(self, *args, **kwargs):
4000 f = open(*args, **kwargs)
4001 r = repr(f)
4002 with self.assertWarns(ResourceWarning) as cm:
4003 f = None
4004 support.gc_collect()
4005 self.assertIn(r, str(cm.warning.args[0]))
4006
4007 def test_warn_on_dealloc(self):
4008 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4009 self._check_warn_on_dealloc(support.TESTFN, "wb")
4010 self._check_warn_on_dealloc(support.TESTFN, "w")
4011
4012 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4013 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004014 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004015 for fd in fds:
4016 try:
4017 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004018 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004019 if e.errno != errno.EBADF:
4020 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004021 self.addCleanup(cleanup_fds)
4022 r, w = os.pipe()
4023 fds += r, w
4024 self._check_warn_on_dealloc(r, *args, **kwargs)
4025 # When using closefd=False, there's no warning
4026 r, w = os.pipe()
4027 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004028 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004029 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004030
4031 def test_warn_on_dealloc_fd(self):
4032 self._check_warn_on_dealloc_fd("rb", buffering=0)
4033 self._check_warn_on_dealloc_fd("rb")
4034 self._check_warn_on_dealloc_fd("r")
4035
4036
Antoine Pitrou243757e2010-11-05 21:15:39 +00004037 def test_pickling(self):
4038 # Pickling file objects is forbidden
4039 for kwargs in [
4040 {"mode": "w"},
4041 {"mode": "wb"},
4042 {"mode": "wb", "buffering": 0},
4043 {"mode": "r"},
4044 {"mode": "rb"},
4045 {"mode": "rb", "buffering": 0},
4046 {"mode": "w+"},
4047 {"mode": "w+b"},
4048 {"mode": "w+b", "buffering": 0},
4049 ]:
4050 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4051 with self.open(support.TESTFN, **kwargs) as f:
4052 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4053
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004054 def test_nonblock_pipe_write_bigbuf(self):
4055 self._test_nonblock_pipe_write(16*1024)
4056
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004057 def test_nonblock_pipe_write_smallbuf(self):
4058 self._test_nonblock_pipe_write(1024)
4059
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004060 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4061 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004062 def _test_nonblock_pipe_write(self, bufsize):
4063 sent = []
4064 received = []
4065 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004066 os.set_blocking(r, False)
4067 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004068
4069 # To exercise all code paths in the C implementation we need
4070 # to play with buffer sizes. For instance, if we choose a
4071 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4072 # then we will never get a partial write of the buffer.
4073 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4074 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4075
4076 with rf, wf:
4077 for N in 9999, 73, 7574:
4078 try:
4079 i = 0
4080 while True:
4081 msg = bytes([i % 26 + 97]) * N
4082 sent.append(msg)
4083 wf.write(msg)
4084 i += 1
4085
4086 except self.BlockingIOError as e:
4087 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004088 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004089 sent[-1] = sent[-1][:e.characters_written]
4090 received.append(rf.read())
4091 msg = b'BLOCKED'
4092 wf.write(msg)
4093 sent.append(msg)
4094
4095 while True:
4096 try:
4097 wf.flush()
4098 break
4099 except self.BlockingIOError as e:
4100 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004101 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004102 self.assertEqual(e.characters_written, 0)
4103 received.append(rf.read())
4104
4105 received += iter(rf.read, None)
4106
4107 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004108 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004109 self.assertTrue(wf.closed)
4110 self.assertTrue(rf.closed)
4111
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004112 def test_create_fail(self):
4113 # 'x' mode fails if file is existing
4114 with self.open(support.TESTFN, 'w'):
4115 pass
4116 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4117
4118 def test_create_writes(self):
4119 # 'x' mode opens for writing
4120 with self.open(support.TESTFN, 'xb') as f:
4121 f.write(b"spam")
4122 with self.open(support.TESTFN, 'rb') as f:
4123 self.assertEqual(b"spam", f.read())
4124
Christian Heimes7b648752012-09-10 14:48:43 +02004125 def test_open_allargs(self):
4126 # there used to be a buffer overflow in the parser for rawmode
4127 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4128
4129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004130class CMiscIOTest(MiscIOTest):
4131 io = io
4132
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004133 def test_readinto_buffer_overflow(self):
4134 # Issue #18025
4135 class BadReader(self.io.BufferedIOBase):
4136 def read(self, n=-1):
4137 return b'x' * 10**6
4138 bufio = BadReader()
4139 b = bytearray(2)
4140 self.assertRaises(ValueError, bufio.readinto, b)
4141
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004142 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4143 # Issue #23309: deadlocks at shutdown should be avoided when a
4144 # daemon thread and the main thread both write to a file.
4145 code = """if 1:
4146 import sys
4147 import time
4148 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004149 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004150
4151 file = sys.{stream_name}
4152
4153 def run():
4154 while True:
4155 file.write('.')
4156 file.flush()
4157
Victor Stinner2a1aed02017-04-21 17:59:23 +02004158 crash = SuppressCrashReport()
4159 crash.__enter__()
4160 # don't call __exit__(): the crash occurs at Python shutdown
4161
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004162 thread = threading.Thread(target=run)
4163 thread.daemon = True
4164 thread.start()
4165
4166 time.sleep(0.5)
4167 file.write('!')
4168 file.flush()
4169 """.format_map(locals())
4170 res, _ = run_python_until_end("-c", code)
4171 err = res.err.decode()
4172 if res.rc != 0:
4173 # Failure: should be a fatal error
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004174 pattern = (r"Fatal Python error: could not acquire lock "
4175 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4176 r"at interpreter shutdown, possibly due to "
4177 r"daemon threads".format_map(locals()))
4178 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004179 else:
4180 self.assertFalse(err.strip('.!'))
4181
4182 def test_daemon_threads_shutdown_stdout_deadlock(self):
4183 self.check_daemon_threads_shutdown_deadlock('stdout')
4184
4185 def test_daemon_threads_shutdown_stderr_deadlock(self):
4186 self.check_daemon_threads_shutdown_deadlock('stderr')
4187
4188
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004189class PyMiscIOTest(MiscIOTest):
4190 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004191
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004192
4193@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4194class SignalsTest(unittest.TestCase):
4195
4196 def setUp(self):
4197 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4198
4199 def tearDown(self):
4200 signal.signal(signal.SIGALRM, self.oldalrm)
4201
4202 def alarm_interrupt(self, sig, frame):
4203 1/0
4204
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004205 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4206 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004207 invokes the signal handler, and bubbles up the exception raised
4208 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004209 read_results = []
4210 def _read():
4211 s = os.read(r, 1)
4212 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004213
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004214 t = threading.Thread(target=_read)
4215 t.daemon = True
4216 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004217 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004218 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004219 try:
4220 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004221 if hasattr(signal, 'pthread_sigmask'):
4222 # create the thread with SIGALRM signal blocked
4223 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4224 t.start()
4225 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4226 else:
4227 t.start()
4228
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004229 # Fill the pipe enough that the write will be blocking.
4230 # It will be interrupted by the timer armed above. Since the
4231 # other thread has read one byte, the low-level write will
4232 # return with a successful (partial) result rather than an EINTR.
4233 # The buffered IO layer must check for pending signal
4234 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004235 signal.alarm(1)
4236 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004237 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004238 finally:
4239 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004240 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004241 # We got one byte, get another one and check that it isn't a
4242 # repeat of the first one.
4243 read_results.append(os.read(r, 1))
4244 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4245 finally:
4246 os.close(w)
4247 os.close(r)
4248 # This is deliberate. If we didn't close the file descriptor
4249 # before closing wio, wio would try to flush its internal
4250 # buffer, and block again.
4251 try:
4252 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004253 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004254 if e.errno != errno.EBADF:
4255 raise
4256
4257 def test_interrupted_write_unbuffered(self):
4258 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4259
4260 def test_interrupted_write_buffered(self):
4261 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4262
4263 def test_interrupted_write_text(self):
4264 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4265
Brett Cannon31f59292011-02-21 19:29:56 +00004266 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004267 def check_reentrant_write(self, data, **fdopen_kwargs):
4268 def on_alarm(*args):
4269 # Will be called reentrantly from the same thread
4270 wio.write(data)
4271 1/0
4272 signal.signal(signal.SIGALRM, on_alarm)
4273 r, w = os.pipe()
4274 wio = self.io.open(w, **fdopen_kwargs)
4275 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004276 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004277 # Either the reentrant call to wio.write() fails with RuntimeError,
4278 # or the signal handler raises ZeroDivisionError.
4279 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4280 while 1:
4281 for i in range(100):
4282 wio.write(data)
4283 wio.flush()
4284 # Make sure the buffer doesn't fill up and block further writes
4285 os.read(r, len(data) * 100)
4286 exc = cm.exception
4287 if isinstance(exc, RuntimeError):
4288 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4289 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004290 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004291 wio.close()
4292 os.close(r)
4293
4294 def test_reentrant_write_buffered(self):
4295 self.check_reentrant_write(b"xy", mode="wb")
4296
4297 def test_reentrant_write_text(self):
4298 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4299
Antoine Pitrou707ce822011-02-25 21:24:11 +00004300 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4301 """Check that a buffered read, when it gets interrupted (either
4302 returning a partial result or EINTR), properly invokes the signal
4303 handler and retries if the latter returned successfully."""
4304 r, w = os.pipe()
4305 fdopen_kwargs["closefd"] = False
4306 def alarm_handler(sig, frame):
4307 os.write(w, b"bar")
4308 signal.signal(signal.SIGALRM, alarm_handler)
4309 try:
4310 rio = self.io.open(r, **fdopen_kwargs)
4311 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004312 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004313 # Expected behaviour:
4314 # - first raw read() returns partial b"foo"
4315 # - second raw read() returns EINTR
4316 # - third raw read() returns b"bar"
4317 self.assertEqual(decode(rio.read(6)), "foobar")
4318 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004319 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004320 rio.close()
4321 os.close(w)
4322 os.close(r)
4323
Antoine Pitrou20db5112011-08-19 20:32:34 +02004324 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004325 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4326 mode="rb")
4327
Antoine Pitrou20db5112011-08-19 20:32:34 +02004328 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004329 self.check_interrupted_read_retry(lambda x: x,
4330 mode="r")
4331
Antoine Pitrou707ce822011-02-25 21:24:11 +00004332 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4333 """Check that a buffered write, when it gets interrupted (either
4334 returning a partial result or EINTR), properly invokes the signal
4335 handler and retries if the latter returned successfully."""
4336 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004337
Antoine Pitrou707ce822011-02-25 21:24:11 +00004338 # A quantity that exceeds the buffer size of an anonymous pipe's
4339 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004340 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004341 r, w = os.pipe()
4342 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004343
Antoine Pitrou707ce822011-02-25 21:24:11 +00004344 # We need a separate thread to read from the pipe and allow the
4345 # write() to finish. This thread is started after the SIGALRM is
4346 # received (forcing a first EINTR in write()).
4347 read_results = []
4348 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004349 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004350 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004351 try:
4352 while not write_finished:
4353 while r in select.select([r], [], [], 1.0)[0]:
4354 s = os.read(r, 1024)
4355 read_results.append(s)
4356 except BaseException as exc:
4357 nonlocal error
4358 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004359 t = threading.Thread(target=_read)
4360 t.daemon = True
4361 def alarm1(sig, frame):
4362 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004363 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004364 def alarm2(sig, frame):
4365 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004366
4367 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004368 signal.signal(signal.SIGALRM, alarm1)
4369 try:
4370 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004371 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004372 # Expected behaviour:
4373 # - first raw write() is partial (because of the limited pipe buffer
4374 # and the first alarm)
4375 # - second raw write() returns EINTR (because of the second alarm)
4376 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004377 written = wio.write(large_data)
4378 self.assertEqual(N, written)
4379
Antoine Pitrou707ce822011-02-25 21:24:11 +00004380 wio.flush()
4381 write_finished = True
4382 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004383
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004384 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004385 self.assertEqual(N, sum(len(x) for x in read_results))
4386 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004387 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004388 write_finished = True
4389 os.close(w)
4390 os.close(r)
4391 # This is deliberate. If we didn't close the file descriptor
4392 # before closing wio, wio would try to flush its internal
4393 # buffer, and could block (in case of failure).
4394 try:
4395 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004396 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004397 if e.errno != errno.EBADF:
4398 raise
4399
Antoine Pitrou20db5112011-08-19 20:32:34 +02004400 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004401 self.check_interrupted_write_retry(b"x", mode="wb")
4402
Antoine Pitrou20db5112011-08-19 20:32:34 +02004403 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004404 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4405
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004406
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004407class CSignalsTest(SignalsTest):
4408 io = io
4409
4410class PySignalsTest(SignalsTest):
4411 io = pyio
4412
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004413 # Handling reentrancy issues would slow down _pyio even more, so the
4414 # tests are disabled.
4415 test_reentrant_write_buffered = None
4416 test_reentrant_write_text = None
4417
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004418
Ezio Melottidaa42c72013-03-23 16:30:16 +02004419def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004420 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004421 CBufferedReaderTest, PyBufferedReaderTest,
4422 CBufferedWriterTest, PyBufferedWriterTest,
4423 CBufferedRWPairTest, PyBufferedRWPairTest,
4424 CBufferedRandomTest, PyBufferedRandomTest,
4425 StatefulIncrementalDecoderTest,
4426 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4427 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004428 CMiscIOTest, PyMiscIOTest,
4429 CSignalsTest, PySignalsTest,
4430 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004431
4432 # Put the namespaces of the IO module we are testing and some useful mock
4433 # classes in the __dict__ of each test.
4434 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004435 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4436 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004437 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4438 c_io_ns = {name : getattr(io, name) for name in all_members}
4439 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4440 globs = globals()
4441 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4442 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4443 # Avoid turning open into a bound method.
4444 py_io_ns["open"] = pyio.OpenWrapper
4445 for test in tests:
4446 if test.__name__.startswith("C"):
4447 for name, obj in c_io_ns.items():
4448 setattr(test, name, obj)
4449 elif test.__name__.startswith("Py"):
4450 for name, obj in py_io_ns.items():
4451 setattr(test, name, obj)
4452
Ezio Melottidaa42c72013-03-23 16:30:16 +02004453 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4454 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004455
4456if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004457 unittest.main()