blob: 102679b1d3424359a292f686cfbe5943249d7378 [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000033import time
Guido van Rossum28524c72007-02-27 05:47:44 +000034import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000035import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020037from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020038from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000039from test import support
Berker Peksagce643912015-05-06 06:33:17 +030040from test.support.script_helper import assert_python_ok, run_python_until_end
Serhiy Storchakab21d1552018-03-02 11:53:51 +020041from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000042
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000043import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044import io # C implementation of io
45import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Martin Panter6bb91f32016-05-28 00:41:57 +000047try:
48 import ctypes
49except ImportError:
50 def byteslike(*pos, **kw):
51 return array.array("b", bytes(*pos, **kw))
52else:
53 def byteslike(*pos, **kw):
54 """Create a bytes-like object having no string or sequence methods"""
55 data = bytes(*pos, **kw)
56 obj = EmptyStruct()
57 ctypes.resize(obj, len(data))
58 memoryview(obj).cast("B")[:] = data
59 return obj
60 class EmptyStruct(ctypes.Structure):
61 pass
62
Gregory P. Smithe5796c42018-12-30 20:17:57 -080063_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66 '-fsanitize=memory' in _cflags or
67 '--with-memory-sanitizer' in _config_args
68)
69
Victor Stinnerbc2aa812019-05-23 03:45:09 +020070# Does io.IOBase finalizer log the exception if the close() method fails?
71# The exception is ignored silently by default in release build.
72IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020073
74
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000075def _default_chunk_size():
76 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000077 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return f._CHUNK_SIZE
79
80
Antoine Pitrou328ec742010-09-14 18:37:24 +000081class MockRawIOWithoutRead:
82 """A RawIO implementation without read(), so as to exercise the default
83 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000084
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000085 def __init__(self, read_stack=()):
86 self._read_stack = list(read_stack)
87 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000089 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000090
Guido van Rossum01a27522007-03-07 01:00:12 +000091 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000093 return len(b)
94
95 def writable(self):
96 return True
97
Guido van Rossum68bbcd22007-02-27 17:19:33 +000098 def fileno(self):
99 return 42
100
101 def readable(self):
102 return True
103
Guido van Rossum01a27522007-03-07 01:00:12 +0000104 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000105 return True
106
Guido van Rossum01a27522007-03-07 01:00:12 +0000107 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000108 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000109
110 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000111 return 0 # same comment as above
112
113 def readinto(self, buf):
114 self._reads += 1
115 max_len = len(buf)
116 try:
117 data = self._read_stack[0]
118 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000119 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000120 return 0
121 if data is None:
122 del self._read_stack[0]
123 return None
124 n = len(data)
125 if len(data) <= max_len:
126 del self._read_stack[0]
127 buf[:n] = data
128 return n
129 else:
130 buf[:] = data[:max_len]
131 self._read_stack[0] = data[max_len:]
132 return max_len
133
134 def truncate(self, pos=None):
135 return pos
136
Antoine Pitrou328ec742010-09-14 18:37:24 +0000137class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
138 pass
139
140class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
141 pass
142
143
144class MockRawIO(MockRawIOWithoutRead):
145
146 def read(self, n=None):
147 self._reads += 1
148 try:
149 return self._read_stack.pop(0)
150 except:
151 self._extraneous_reads += 1
152 return b""
153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000154class CMockRawIO(MockRawIO, io.RawIOBase):
155 pass
156
157class PyMockRawIO(MockRawIO, pyio.RawIOBase):
158 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000159
Guido van Rossuma9e20242007-03-08 00:43:48 +0000160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000161class MisbehavedRawIO(MockRawIO):
162 def write(self, b):
163 return super().write(b) * 2
164
165 def read(self, n=None):
166 return super().read(n) * 2
167
168 def seek(self, pos, whence):
169 return -123
170
171 def tell(self):
172 return -456
173
174 def readinto(self, buf):
175 super().readinto(buf)
176 return len(buf) * 5
177
178class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
179 pass
180
181class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
182 pass
183
184
benfogle9703f092017-11-10 16:03:40 -0500185class SlowFlushRawIO(MockRawIO):
186 def __init__(self):
187 super().__init__()
188 self.in_flush = threading.Event()
189
190 def flush(self):
191 self.in_flush.set()
192 time.sleep(0.25)
193
194class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
195 pass
196
197class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
198 pass
199
200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000201class CloseFailureIO(MockRawIO):
202 closed = 0
203
204 def close(self):
205 if not self.closed:
206 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200207 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208
209class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
210 pass
211
212class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
213 pass
214
215
216class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000217
218 def __init__(self, data):
219 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000221
222 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000224 self.read_history.append(None if res is None else len(res))
225 return res
226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000227 def readinto(self, b):
228 res = super().readinto(b)
229 self.read_history.append(res)
230 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class CMockFileIO(MockFileIO, io.BytesIO):
233 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235class PyMockFileIO(MockFileIO, pyio.BytesIO):
236 pass
237
238
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000239class MockUnseekableIO:
240 def seekable(self):
241 return False
242
243 def seek(self, *args):
244 raise self.UnsupportedOperation("not seekable")
245
246 def tell(self, *args):
247 raise self.UnsupportedOperation("not seekable")
248
Martin Panter754aab22016-03-31 07:21:56 +0000249 def truncate(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000252class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
253 UnsupportedOperation = io.UnsupportedOperation
254
255class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
256 UnsupportedOperation = pyio.UnsupportedOperation
257
258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000259class MockNonBlockWriterIO:
260
261 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000262 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000263 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265 def pop_written(self):
266 s = b"".join(self._write_stack)
267 self._write_stack[:] = []
268 return s
269
270 def block_on(self, char):
271 """Block when a given char is encountered."""
272 self._blocker_char = char
273
274 def readable(self):
275 return True
276
277 def seekable(self):
278 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000279
Victor Stinnerb589cef2019-06-11 03:10:59 +0200280 def seek(self, pos, whence=0):
281 # naive implementation, enough for tests
282 return 0
283
Guido van Rossum01a27522007-03-07 01:00:12 +0000284 def writable(self):
285 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000287 def write(self, b):
288 b = bytes(b)
289 n = -1
290 if self._blocker_char:
291 try:
292 n = b.index(self._blocker_char)
293 except ValueError:
294 pass
295 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100296 if n > 0:
297 # write data up to the first blocker
298 self._write_stack.append(b[:n])
299 return n
300 else:
301 # cancel blocker and indicate would block
302 self._blocker_char = None
303 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000304 self._write_stack.append(b)
305 return len(b)
306
307class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
308 BlockingIOError = io.BlockingIOError
309
310class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
311 BlockingIOError = pyio.BlockingIOError
312
Guido van Rossuma9e20242007-03-08 00:43:48 +0000313
Guido van Rossum28524c72007-02-27 05:47:44 +0000314class IOTest(unittest.TestCase):
315
Neal Norwitze7789b12008-03-24 06:18:09 +0000316 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000317 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000318
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000319 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000320 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000321
Guido van Rossum28524c72007-02-27 05:47:44 +0000322 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000323 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000324 f.truncate(0)
325 self.assertEqual(f.tell(), 5)
326 f.seek(0)
327
328 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.seek(0), 0)
330 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000331 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000334 buffer = bytearray(b" world\n\n\n")
335 self.assertEqual(f.write(buffer), 9)
336 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000338 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.seek(-1, 2), 13)
340 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000341
Guido van Rossum87429772007-04-10 21:06:59 +0000342 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000343 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000344 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000345
Guido van Rossum9b76da62007-04-11 01:09:03 +0000346 def read_ops(self, f, buffered=False):
347 data = f.read(5)
348 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000349 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000351 self.assertEqual(bytes(data), b" worl")
352 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000353 self.assertEqual(f.readinto(data), 2)
354 self.assertEqual(len(data), 5)
355 self.assertEqual(data[:2], b"d\n")
356 self.assertEqual(f.seek(0), 0)
357 self.assertEqual(f.read(20), b"hello world\n")
358 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000359 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360 self.assertEqual(f.seek(-6, 2), 6)
361 self.assertEqual(f.read(5), b"world")
362 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000363 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000364 self.assertEqual(f.seek(-6, 1), 5)
365 self.assertEqual(f.read(5), b" worl")
366 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000367 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000368 if buffered:
369 f.seek(0)
370 self.assertEqual(f.read(), b"hello world\n")
371 f.seek(6)
372 self.assertEqual(f.read(), b"world\n")
373 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000374 f.seek(0)
375 data = byteslike(5)
376 self.assertEqual(f.readinto1(data), 5)
377 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000378
Guido van Rossum34d69e52007-04-10 20:08:41 +0000379 LARGE = 2**31
380
Guido van Rossum53807da2007-04-10 19:01:47 +0000381 def large_file_ops(self, f):
382 assert f.readable()
383 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100384 try:
385 self.assertEqual(f.seek(self.LARGE), self.LARGE)
386 except (OverflowError, ValueError):
387 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000388 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000389 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000390 self.assertEqual(f.tell(), self.LARGE + 3)
391 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000392 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000393 self.assertEqual(f.tell(), self.LARGE + 2)
394 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000395 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000396 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000397 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
398 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000399 self.assertEqual(f.read(2), b"x")
400
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000401 def test_invalid_operations(self):
402 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000403 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000404 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000405 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000406 self.assertRaises(exc, fp.read)
407 self.assertRaises(exc, fp.readline)
408 with self.open(support.TESTFN, "wb", buffering=0) as fp:
409 self.assertRaises(exc, fp.read)
410 self.assertRaises(exc, fp.readline)
411 with self.open(support.TESTFN, "rb", buffering=0) as fp:
412 self.assertRaises(exc, fp.write, b"blah")
413 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000414 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000415 self.assertRaises(exc, fp.write, b"blah")
416 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000417 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, "blah")
419 self.assertRaises(exc, fp.writelines, ["blah\n"])
420 # Non-zero seeking from current or end pos
421 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
422 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000423
Martin Panter754aab22016-03-31 07:21:56 +0000424 def test_optional_abilities(self):
425 # Test for OSError when optional APIs are not supported
426 # The purpose of this test is to try fileno(), reading, writing and
427 # seeking operations with various objects that indicate they do not
428 # support these operations.
429
430 def pipe_reader():
431 [r, w] = os.pipe()
432 os.close(w) # So that read() is harmless
433 return self.FileIO(r, "r")
434
435 def pipe_writer():
436 [r, w] = os.pipe()
437 self.addCleanup(os.close, r)
438 # Guarantee that we can write into the pipe without blocking
439 thread = threading.Thread(target=os.read, args=(r, 100))
440 thread.start()
441 self.addCleanup(thread.join)
442 return self.FileIO(w, "w")
443
444 def buffered_reader():
445 return self.BufferedReader(self.MockUnseekableIO())
446
447 def buffered_writer():
448 return self.BufferedWriter(self.MockUnseekableIO())
449
450 def buffered_random():
451 return self.BufferedRandom(self.BytesIO())
452
453 def buffered_rw_pair():
454 return self.BufferedRWPair(self.MockUnseekableIO(),
455 self.MockUnseekableIO())
456
457 def text_reader():
458 class UnseekableReader(self.MockUnseekableIO):
459 writable = self.BufferedIOBase.writable
460 write = self.BufferedIOBase.write
461 return self.TextIOWrapper(UnseekableReader(), "ascii")
462
463 def text_writer():
464 class UnseekableWriter(self.MockUnseekableIO):
465 readable = self.BufferedIOBase.readable
466 read = self.BufferedIOBase.read
467 return self.TextIOWrapper(UnseekableWriter(), "ascii")
468
469 tests = (
470 (pipe_reader, "fr"), (pipe_writer, "fw"),
471 (buffered_reader, "r"), (buffered_writer, "w"),
472 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
473 (text_reader, "r"), (text_writer, "w"),
474 (self.BytesIO, "rws"), (self.StringIO, "rws"),
475 )
476 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000477 with self.subTest(test), test() as obj:
478 readable = "r" in abilities
479 self.assertEqual(obj.readable(), readable)
480 writable = "w" in abilities
481 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000482
483 if isinstance(obj, self.TextIOBase):
484 data = "3"
485 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
486 data = b"3"
487 else:
488 self.fail("Unknown base class")
489
490 if "f" in abilities:
491 obj.fileno()
492 else:
493 self.assertRaises(OSError, obj.fileno)
494
495 if readable:
496 obj.read(1)
497 obj.read()
498 else:
499 self.assertRaises(OSError, obj.read, 1)
500 self.assertRaises(OSError, obj.read)
501
502 if writable:
503 obj.write(data)
504 else:
505 self.assertRaises(OSError, obj.write, data)
506
Martin Panter3ee147f2016-03-31 21:05:31 +0000507 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000508 pipe_reader, pipe_writer):
509 # Pipes seem to appear as seekable on Windows
510 continue
511 seekable = "s" in abilities
512 self.assertEqual(obj.seekable(), seekable)
513
Martin Panter754aab22016-03-31 07:21:56 +0000514 if seekable:
515 obj.tell()
516 obj.seek(0)
517 else:
518 self.assertRaises(OSError, obj.tell)
519 self.assertRaises(OSError, obj.seek, 0)
520
521 if writable and seekable:
522 obj.truncate()
523 obj.truncate(0)
524 else:
525 self.assertRaises(OSError, obj.truncate)
526 self.assertRaises(OSError, obj.truncate, 0)
527
Antoine Pitrou13348842012-01-29 18:36:34 +0100528 def test_open_handles_NUL_chars(self):
529 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300530 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100531
532 bytes_fn = bytes(fn_with_NUL, 'ascii')
533 with warnings.catch_warnings():
534 warnings.simplefilter("ignore", DeprecationWarning)
535 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100536
Guido van Rossum28524c72007-02-27 05:47:44 +0000537 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", buffering=0) as f:
539 self.assertEqual(f.readable(), False)
540 self.assertEqual(f.writable(), True)
541 self.assertEqual(f.seekable(), True)
542 self.write_ops(f)
543 with self.open(support.TESTFN, "rb", buffering=0) as f:
544 self.assertEqual(f.readable(), True)
545 self.assertEqual(f.writable(), False)
546 self.assertEqual(f.seekable(), True)
547 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000548
Guido van Rossum87429772007-04-10 21:06:59 +0000549 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000550 with self.open(support.TESTFN, "wb") as f:
551 self.assertEqual(f.readable(), False)
552 self.assertEqual(f.writable(), True)
553 self.assertEqual(f.seekable(), True)
554 self.write_ops(f)
555 with self.open(support.TESTFN, "rb") as f:
556 self.assertEqual(f.readable(), True)
557 self.assertEqual(f.writable(), False)
558 self.assertEqual(f.seekable(), True)
559 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000560
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000561 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000562 with self.open(support.TESTFN, "wb") as f:
563 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
564 with self.open(support.TESTFN, "rb") as f:
565 self.assertEqual(f.readline(), b"abc\n")
566 self.assertEqual(f.readline(10), b"def\n")
567 self.assertEqual(f.readline(2), b"xy")
568 self.assertEqual(f.readline(4), b"zzy\n")
569 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000570 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000571 self.assertRaises(TypeError, f.readline, 5.3)
572 with self.open(support.TESTFN, "r") as f:
573 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000574
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300575 def test_readline_nonsizeable(self):
576 # Issue #30061
577 # Crash when readline() returns an object without __len__
578 class R(self.IOBase):
579 def readline(self):
580 return None
581 self.assertRaises((TypeError, StopIteration), next, R())
582
583 def test_next_nonsizeable(self):
584 # Issue #30061
585 # Crash when __next__() returns an object without __len__
586 class R(self.IOBase):
587 def __next__(self):
588 return None
589 self.assertRaises(TypeError, R().readlines, 1)
590
Guido van Rossum28524c72007-02-27 05:47:44 +0000591 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000592 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000593 self.write_ops(f)
594 data = f.getvalue()
595 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000596 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000597 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000598
Guido van Rossum53807da2007-04-10 19:01:47 +0000599 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300600 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800601 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000602 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200603 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600604 support.requires(
605 'largefile',
606 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000607 with self.open(support.TESTFN, "w+b", 0) as f:
608 self.large_file_ops(f)
609 with self.open(support.TESTFN, "w+b") as f:
610 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000611
612 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300613 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000614 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000615 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000616 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000617 self.assertEqual(f.closed, True)
618 f = None
619 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000620 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000621 1/0
622 except ZeroDivisionError:
623 self.assertEqual(f.closed, True)
624 else:
625 self.fail("1/0 didn't raise an exception")
626
Antoine Pitrou08838b62009-01-21 00:55:13 +0000627 # issue 5008
628 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000630 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000632 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000634 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300636 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000637
Guido van Rossum87429772007-04-10 21:06:59 +0000638 def test_destructor(self):
639 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000640 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000641 def __del__(self):
642 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000643 try:
644 f = super().__del__
645 except AttributeError:
646 pass
647 else:
648 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000649 def close(self):
650 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000651 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000652 def flush(self):
653 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000655 with support.check_warnings(('', ResourceWarning)):
656 f = MyFileIO(support.TESTFN, "wb")
657 f.write(b"xxx")
658 del f
659 support.gc_collect()
660 self.assertEqual(record, [1, 2, 3])
661 with self.open(support.TESTFN, "rb") as f:
662 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000663
664 def _check_base_destructor(self, base):
665 record = []
666 class MyIO(base):
667 def __init__(self):
668 # This exercises the availability of attributes on object
669 # destruction.
670 # (in the C version, close() is called by the tp_dealloc
671 # function, not by __del__)
672 self.on_del = 1
673 self.on_close = 2
674 self.on_flush = 3
675 def __del__(self):
676 record.append(self.on_del)
677 try:
678 f = super().__del__
679 except AttributeError:
680 pass
681 else:
682 f()
683 def close(self):
684 record.append(self.on_close)
685 super().close()
686 def flush(self):
687 record.append(self.on_flush)
688 super().flush()
689 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000690 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000691 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000692 self.assertEqual(record, [1, 2, 3])
693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 def test_IOBase_destructor(self):
695 self._check_base_destructor(self.IOBase)
696
697 def test_RawIOBase_destructor(self):
698 self._check_base_destructor(self.RawIOBase)
699
700 def test_BufferedIOBase_destructor(self):
701 self._check_base_destructor(self.BufferedIOBase)
702
703 def test_TextIOBase_destructor(self):
704 self._check_base_destructor(self.TextIOBase)
705
Guido van Rossum87429772007-04-10 21:06:59 +0000706 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000707 with self.open(support.TESTFN, "wb") as f:
708 f.write(b"xxx")
709 with self.open(support.TESTFN, "rb") as f:
710 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000711
Guido van Rossumd4103952007-04-12 05:44:49 +0000712 def test_array_writes(self):
713 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000714 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000715 def check(f):
716 with f:
717 self.assertEqual(f.write(a), n)
718 f.writelines((a,))
719 check(self.BytesIO())
720 check(self.FileIO(support.TESTFN, "w"))
721 check(self.BufferedWriter(self.MockRawIO()))
722 check(self.BufferedRandom(self.MockRawIO()))
723 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000724
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000725 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000727 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 def test_read_closed(self):
730 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000731 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 with self.open(support.TESTFN, "r") as f:
733 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000734 self.assertEqual(file.read(), "egg\n")
735 file.seek(0)
736 file.close()
737 self.assertRaises(ValueError, file.read)
738
739 def test_no_closefd_with_filename(self):
740 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000741 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000742
743 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000744 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000745 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000747 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000748 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000749 self.assertEqual(file.buffer.raw.closefd, False)
750
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 def test_garbage_collection(self):
752 # FileIO objects are collected, and collecting them flushes
753 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000754 with support.check_warnings(('', ResourceWarning)):
755 f = self.FileIO(support.TESTFN, "wb")
756 f.write(b"abcxxx")
757 f.f = f
758 wr = weakref.ref(f)
759 del f
760 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300761 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000762 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000763 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000764
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000765 def test_unbounded_file(self):
766 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
767 zero = "/dev/zero"
768 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000769 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000770 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000771 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000772 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800773 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000774 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000775 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000776 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000777 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000778 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000779 self.assertRaises(OverflowError, f.read)
780
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200781 def check_flush_error_on_close(self, *args, **kwargs):
782 # Test that the file is closed despite failed flush
783 # and that flush() is called before file closed.
784 f = self.open(*args, **kwargs)
785 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000786 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200787 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200788 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000789 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200790 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600791 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200792 self.assertTrue(closed) # flush() called
793 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200794 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200795
796 def test_flush_error_on_close(self):
797 # raw file
798 # Issue #5700: io.FileIO calls flush() after file closed
799 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
800 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
801 self.check_flush_error_on_close(fd, 'wb', buffering=0)
802 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
803 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
804 os.close(fd)
805 # buffered io
806 self.check_flush_error_on_close(support.TESTFN, 'wb')
807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808 self.check_flush_error_on_close(fd, 'wb')
809 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
810 self.check_flush_error_on_close(fd, 'wb', closefd=False)
811 os.close(fd)
812 # text io
813 self.check_flush_error_on_close(support.TESTFN, 'w')
814 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
815 self.check_flush_error_on_close(fd, 'w')
816 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
817 self.check_flush_error_on_close(fd, 'w', closefd=False)
818 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000819
820 def test_multi_close(self):
821 f = self.open(support.TESTFN, "wb", buffering=0)
822 f.close()
823 f.close()
824 f.close()
825 self.assertRaises(ValueError, f.flush)
826
Antoine Pitrou328ec742010-09-14 18:37:24 +0000827 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530828 # Exercise the default limited RawIOBase.read(n) implementation (which
829 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000830 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
831 self.assertEqual(rawio.read(2), b"ab")
832 self.assertEqual(rawio.read(2), b"c")
833 self.assertEqual(rawio.read(2), b"d")
834 self.assertEqual(rawio.read(2), None)
835 self.assertEqual(rawio.read(2), b"ef")
836 self.assertEqual(rawio.read(2), b"g")
837 self.assertEqual(rawio.read(2), None)
838 self.assertEqual(rawio.read(2), b"")
839
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400840 def test_types_have_dict(self):
841 test = (
842 self.IOBase(),
843 self.RawIOBase(),
844 self.TextIOBase(),
845 self.StringIO(),
846 self.BytesIO()
847 )
848 for obj in test:
849 self.assertTrue(hasattr(obj, "__dict__"))
850
Ross Lagerwall59142db2011-10-31 20:34:46 +0200851 def test_opener(self):
852 with self.open(support.TESTFN, "w") as f:
853 f.write("egg\n")
854 fd = os.open(support.TESTFN, os.O_RDONLY)
855 def opener(path, flags):
856 return fd
857 with self.open("non-existent", "r", opener=opener) as f:
858 self.assertEqual(f.read(), "egg\n")
859
Barry Warsaw480e2852016-06-08 17:47:26 -0400860 def test_bad_opener_negative_1(self):
861 # Issue #27066.
862 def badopener(fname, flags):
863 return -1
864 with self.assertRaises(ValueError) as cm:
865 open('non-existent', 'r', opener=badopener)
866 self.assertEqual(str(cm.exception), 'opener returned -1')
867
868 def test_bad_opener_other_negative(self):
869 # Issue #27066.
870 def badopener(fname, flags):
871 return -2
872 with self.assertRaises(ValueError) as cm:
873 open('non-existent', 'r', opener=badopener)
874 self.assertEqual(str(cm.exception), 'opener returned -2')
875
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200876 def test_fileio_closefd(self):
877 # Issue #4841
878 with self.open(__file__, 'rb') as f1, \
879 self.open(__file__, 'rb') as f2:
880 fileio = self.FileIO(f1.fileno(), closefd=False)
881 # .__init__() must not close f1
882 fileio.__init__(f2.fileno(), closefd=False)
883 f1.readline()
884 # .close() must not close f2
885 fileio.close()
886 f2.readline()
887
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300888 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200889 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300890 with self.assertRaises(ValueError):
891 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300892
893 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200894 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300895 with self.assertRaises(ValueError):
896 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300897
Martin Panter6bb91f32016-05-28 00:41:57 +0000898 def test_buffered_readinto_mixin(self):
899 # Test the implementation provided by BufferedIOBase
900 class Stream(self.BufferedIOBase):
901 def read(self, size):
902 return b"12345"
903 read1 = read
904 stream = Stream()
905 for method in ("readinto", "readinto1"):
906 with self.subTest(method):
907 buffer = byteslike(5)
908 self.assertEqual(getattr(stream, method)(buffer), 5)
909 self.assertEqual(bytes(buffer), b"12345")
910
Ethan Furmand62548a2016-06-04 14:38:43 -0700911 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700912 def check_path_succeeds(path):
913 with self.open(path, "w") as f:
914 f.write("egg\n")
915
916 with self.open(path, "r") as f:
917 self.assertEqual(f.read(), "egg\n")
918
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200919 check_path_succeeds(FakePath(support.TESTFN))
920 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700921
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200922 with self.open(support.TESTFN, "w") as f:
923 bad_path = FakePath(f.fileno())
924 with self.assertRaises(TypeError):
925 self.open(bad_path, 'w')
926
927 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700928 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700929 self.open(bad_path, 'w')
930
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200931 bad_path = FakePath(FloatingPointError)
932 with self.assertRaises(FloatingPointError):
933 self.open(bad_path, 'w')
934
Ethan Furmand62548a2016-06-04 14:38:43 -0700935 # ensure that refcounting is correct with some error conditions
936 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200937 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700938
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530939 def test_RawIOBase_readall(self):
940 # Exercise the default unlimited RawIOBase.read() and readall()
941 # implementations.
942 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
943 self.assertEqual(rawio.read(), b"abcdefg")
944 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
945 self.assertEqual(rawio.readall(), b"abcdefg")
946
947 def test_BufferedIOBase_readinto(self):
948 # Exercise the default BufferedIOBase.readinto() and readinto1()
949 # implementations (which call read() or read1() internally).
950 class Reader(self.BufferedIOBase):
951 def __init__(self, avail):
952 self.avail = avail
953 def read(self, size):
954 result = self.avail[:size]
955 self.avail = self.avail[size:]
956 return result
957 def read1(self, size):
958 """Returns no more than 5 bytes at once"""
959 return self.read(min(size, 5))
960 tests = (
961 # (test method, total data available, read buffer size, expected
962 # read size)
963 ("readinto", 10, 5, 5),
964 ("readinto", 10, 6, 6), # More than read1() can return
965 ("readinto", 5, 6, 5), # Buffer larger than total available
966 ("readinto", 6, 7, 6),
967 ("readinto", 10, 0, 0), # Empty buffer
968 ("readinto1", 10, 5, 5), # Result limited to single read1() call
969 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
970 ("readinto1", 5, 6, 5), # Buffer larger than total available
971 ("readinto1", 6, 7, 5),
972 ("readinto1", 10, 0, 0), # Empty buffer
973 )
974 UNUSED_BYTE = 0x81
975 for test in tests:
976 with self.subTest(test):
977 method, avail, request, result = test
978 reader = Reader(bytes(range(avail)))
979 buffer = bytearray((UNUSED_BYTE,) * request)
980 method = getattr(reader, method)
981 self.assertEqual(method(buffer), result)
982 self.assertEqual(len(buffer), request)
983 self.assertSequenceEqual(buffer[:result], range(result))
984 unused = (UNUSED_BYTE,) * (request - result)
985 self.assertSequenceEqual(buffer[result:], unused)
986 self.assertEqual(len(reader.avail), avail - result)
987
Zackery Spytz28f07362018-07-17 00:31:44 -0600988 def test_close_assert(self):
989 class R(self.IOBase):
990 def __setattr__(self, name, value):
991 pass
992 def flush(self):
993 raise OSError()
994 f = R()
995 # This would cause an assertion failure.
996 self.assertRaises(OSError, f.close)
997
Victor Stinner472f7942019-04-12 21:58:24 +0200998 # Silence destructor error
999 R.flush = lambda self: None
1000
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001001
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001002class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001003
1004 def test_IOBase_finalize(self):
1005 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1006 # class which inherits IOBase and an object of this class are caught
1007 # in a reference cycle and close() is already in the method cache.
1008 class MyIO(self.IOBase):
1009 def close(self):
1010 pass
1011
1012 # create an instance to populate the method cache
1013 MyIO()
1014 obj = MyIO()
1015 obj.obj = obj
1016 wr = weakref.ref(obj)
1017 del MyIO
1018 del obj
1019 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001020 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001021
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001022class PyIOTest(IOTest):
1023 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001024
Guido van Rossuma9e20242007-03-08 00:43:48 +00001025
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001026@support.cpython_only
1027class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001028
Gregory P. Smith054b0652015-04-14 12:58:05 -07001029 def test_RawIOBase_io_in_pyio_match(self):
1030 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001031 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1032 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001033 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1034
1035 def test_RawIOBase_pyio_in_io_match(self):
1036 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1037 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1038 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1039
1040
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001041class CommonBufferedTests:
1042 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1043
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001044 def test_detach(self):
1045 raw = self.MockRawIO()
1046 buf = self.tp(raw)
1047 self.assertIs(buf.detach(), raw)
1048 self.assertRaises(ValueError, buf.detach)
1049
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001050 repr(buf) # Should still work
1051
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001052 def test_fileno(self):
1053 rawio = self.MockRawIO()
1054 bufio = self.tp(rawio)
1055
Ezio Melottib3aedd42010-11-20 19:04:17 +00001056 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001058 def test_invalid_args(self):
1059 rawio = self.MockRawIO()
1060 bufio = self.tp(rawio)
1061 # Invalid whence
1062 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001063 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001064
1065 def test_override_destructor(self):
1066 tp = self.tp
1067 record = []
1068 class MyBufferedIO(tp):
1069 def __del__(self):
1070 record.append(1)
1071 try:
1072 f = super().__del__
1073 except AttributeError:
1074 pass
1075 else:
1076 f()
1077 def close(self):
1078 record.append(2)
1079 super().close()
1080 def flush(self):
1081 record.append(3)
1082 super().flush()
1083 rawio = self.MockRawIO()
1084 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001085 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001086 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001087 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001088
1089 def test_context_manager(self):
1090 # Test usability as a context manager
1091 rawio = self.MockRawIO()
1092 bufio = self.tp(rawio)
1093 def _with():
1094 with bufio:
1095 pass
1096 _with()
1097 # bufio should now be closed, and using it a second time should raise
1098 # a ValueError.
1099 self.assertRaises(ValueError, _with)
1100
1101 def test_error_through_destructor(self):
1102 # Test that the exception state is not modified by a destructor,
1103 # even if close() fails.
1104 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001105 with support.catch_unraisable_exception() as cm:
1106 with self.assertRaises(AttributeError):
1107 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001108
1109 if not IOBASE_EMITS_UNRAISABLE:
1110 self.assertIsNone(cm.unraisable)
1111 elif cm.unraisable is not None:
1112 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001113
Antoine Pitrou716c4442009-05-23 19:04:03 +00001114 def test_repr(self):
1115 raw = self.MockRawIO()
1116 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001117 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1118 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001119 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001120 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001121 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001122 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001123
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001124 def test_recursive_repr(self):
1125 # Issue #25455
1126 raw = self.MockRawIO()
1127 b = self.tp(raw)
1128 with support.swap_attr(raw, 'name', b):
1129 try:
1130 repr(b) # Should not crash
1131 except RuntimeError:
1132 pass
1133
Antoine Pitrou6be88762010-05-03 16:48:20 +00001134 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001135 # Test that buffered file is closed despite failed flush
1136 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001137 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001138 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001139 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001140 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001141 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001142 raw.flush = bad_flush
1143 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001144 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001145 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001146 self.assertTrue(raw.closed)
1147 self.assertTrue(closed) # flush() called
1148 self.assertFalse(closed[0]) # flush() called before file closed
1149 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001150 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001151
1152 def test_close_error_on_close(self):
1153 raw = self.MockRawIO()
1154 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001155 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001156 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001157 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001158 raw.close = bad_close
1159 b = self.tp(raw)
1160 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001161 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001162 b.close()
1163 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001164 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001165 self.assertEqual(err.exception.__context__.args, ('flush',))
1166 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001167
Victor Stinner472f7942019-04-12 21:58:24 +02001168 # Silence destructor error
1169 raw.close = lambda: None
1170 b.flush = lambda: None
1171
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001172 def test_nonnormalized_close_error_on_close(self):
1173 # Issue #21677
1174 raw = self.MockRawIO()
1175 def bad_flush():
1176 raise non_existing_flush
1177 def bad_close():
1178 raise non_existing_close
1179 raw.close = bad_close
1180 b = self.tp(raw)
1181 b.flush = bad_flush
1182 with self.assertRaises(NameError) as err: # exception not swallowed
1183 b.close()
1184 self.assertIn('non_existing_close', str(err.exception))
1185 self.assertIsInstance(err.exception.__context__, NameError)
1186 self.assertIn('non_existing_flush', str(err.exception.__context__))
1187 self.assertFalse(b.closed)
1188
Victor Stinner472f7942019-04-12 21:58:24 +02001189 # Silence destructor error
1190 b.flush = lambda: None
1191 raw.close = lambda: None
1192
Antoine Pitrou6be88762010-05-03 16:48:20 +00001193 def test_multi_close(self):
1194 raw = self.MockRawIO()
1195 b = self.tp(raw)
1196 b.close()
1197 b.close()
1198 b.close()
1199 self.assertRaises(ValueError, b.flush)
1200
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001201 def test_unseekable(self):
1202 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1203 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1204 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1205
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001206 def test_readonly_attributes(self):
1207 raw = self.MockRawIO()
1208 buf = self.tp(raw)
1209 x = self.MockRawIO()
1210 with self.assertRaises(AttributeError):
1211 buf.raw = x
1212
Guido van Rossum78892e42007-04-06 17:31:18 +00001213
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001214class SizeofTest:
1215
1216 @support.cpython_only
1217 def test_sizeof(self):
1218 bufsize1 = 4096
1219 bufsize2 = 8192
1220 rawio = self.MockRawIO()
1221 bufio = self.tp(rawio, buffer_size=bufsize1)
1222 size = sys.getsizeof(bufio) - bufsize1
1223 rawio = self.MockRawIO()
1224 bufio = self.tp(rawio, buffer_size=bufsize2)
1225 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1226
Jesus Ceadc469452012-10-04 12:37:56 +02001227 @support.cpython_only
1228 def test_buffer_freeing(self) :
1229 bufsize = 4096
1230 rawio = self.MockRawIO()
1231 bufio = self.tp(rawio, buffer_size=bufsize)
1232 size = sys.getsizeof(bufio) - bufsize
1233 bufio.close()
1234 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001235
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001236class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1237 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001238
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001239 def test_constructor(self):
1240 rawio = self.MockRawIO([b"abc"])
1241 bufio = self.tp(rawio)
1242 bufio.__init__(rawio)
1243 bufio.__init__(rawio, buffer_size=1024)
1244 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001245 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001246 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1247 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1248 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1249 rawio = self.MockRawIO([b"abc"])
1250 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001251 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001252
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001253 def test_uninitialized(self):
1254 bufio = self.tp.__new__(self.tp)
1255 del bufio
1256 bufio = self.tp.__new__(self.tp)
1257 self.assertRaisesRegex((ValueError, AttributeError),
1258 'uninitialized|has no attribute',
1259 bufio.read, 0)
1260 bufio.__init__(self.MockRawIO())
1261 self.assertEqual(bufio.read(0), b'')
1262
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001263 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001264 for arg in (None, 7):
1265 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1266 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001267 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 # Invalid args
1269 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001270
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001271 def test_read1(self):
1272 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1273 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001274 self.assertEqual(b"a", bufio.read(1))
1275 self.assertEqual(b"b", bufio.read1(1))
1276 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001277 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001278 self.assertEqual(b"c", bufio.read1(100))
1279 self.assertEqual(rawio._reads, 1)
1280 self.assertEqual(b"d", bufio.read1(100))
1281 self.assertEqual(rawio._reads, 2)
1282 self.assertEqual(b"efg", bufio.read1(100))
1283 self.assertEqual(rawio._reads, 3)
1284 self.assertEqual(b"", bufio.read1(100))
1285 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001286
1287 def test_read1_arbitrary(self):
1288 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1289 bufio = self.tp(rawio)
1290 self.assertEqual(b"a", bufio.read(1))
1291 self.assertEqual(b"bc", bufio.read1())
1292 self.assertEqual(b"d", bufio.read1())
1293 self.assertEqual(b"efg", bufio.read1(-1))
1294 self.assertEqual(rawio._reads, 3)
1295 self.assertEqual(b"", bufio.read1())
1296 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001297
1298 def test_readinto(self):
1299 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1300 bufio = self.tp(rawio)
1301 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001302 self.assertEqual(bufio.readinto(b), 2)
1303 self.assertEqual(b, b"ab")
1304 self.assertEqual(bufio.readinto(b), 2)
1305 self.assertEqual(b, b"cd")
1306 self.assertEqual(bufio.readinto(b), 2)
1307 self.assertEqual(b, b"ef")
1308 self.assertEqual(bufio.readinto(b), 1)
1309 self.assertEqual(b, b"gf")
1310 self.assertEqual(bufio.readinto(b), 0)
1311 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001312 rawio = self.MockRawIO((b"abc", None))
1313 bufio = self.tp(rawio)
1314 self.assertEqual(bufio.readinto(b), 2)
1315 self.assertEqual(b, b"ab")
1316 self.assertEqual(bufio.readinto(b), 1)
1317 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001318
Benjamin Petersona96fea02014-06-22 14:17:44 -07001319 def test_readinto1(self):
1320 buffer_size = 10
1321 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1322 bufio = self.tp(rawio, buffer_size=buffer_size)
1323 b = bytearray(2)
1324 self.assertEqual(bufio.peek(3), b'abc')
1325 self.assertEqual(rawio._reads, 1)
1326 self.assertEqual(bufio.readinto1(b), 2)
1327 self.assertEqual(b, b"ab")
1328 self.assertEqual(rawio._reads, 1)
1329 self.assertEqual(bufio.readinto1(b), 1)
1330 self.assertEqual(b[:1], b"c")
1331 self.assertEqual(rawio._reads, 1)
1332 self.assertEqual(bufio.readinto1(b), 2)
1333 self.assertEqual(b, b"de")
1334 self.assertEqual(rawio._reads, 2)
1335 b = bytearray(2*buffer_size)
1336 self.assertEqual(bufio.peek(3), b'fgh')
1337 self.assertEqual(rawio._reads, 3)
1338 self.assertEqual(bufio.readinto1(b), 6)
1339 self.assertEqual(b[:6], b"fghjkl")
1340 self.assertEqual(rawio._reads, 4)
1341
1342 def test_readinto_array(self):
1343 buffer_size = 60
1344 data = b"a" * 26
1345 rawio = self.MockRawIO((data,))
1346 bufio = self.tp(rawio, buffer_size=buffer_size)
1347
1348 # Create an array with element size > 1 byte
1349 b = array.array('i', b'x' * 32)
1350 assert len(b) != 16
1351
1352 # Read into it. We should get as many *bytes* as we can fit into b
1353 # (which is more than the number of elements)
1354 n = bufio.readinto(b)
1355 self.assertGreater(n, len(b))
1356
1357 # Check that old contents of b are preserved
1358 bm = memoryview(b).cast('B')
1359 self.assertLess(n, len(bm))
1360 self.assertEqual(bm[:n], data[:n])
1361 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1362
1363 def test_readinto1_array(self):
1364 buffer_size = 60
1365 data = b"a" * 26
1366 rawio = self.MockRawIO((data,))
1367 bufio = self.tp(rawio, buffer_size=buffer_size)
1368
1369 # Create an array with element size > 1 byte
1370 b = array.array('i', b'x' * 32)
1371 assert len(b) != 16
1372
1373 # Read into it. We should get as many *bytes* as we can fit into b
1374 # (which is more than the number of elements)
1375 n = bufio.readinto1(b)
1376 self.assertGreater(n, len(b))
1377
1378 # Check that old contents of b are preserved
1379 bm = memoryview(b).cast('B')
1380 self.assertLess(n, len(bm))
1381 self.assertEqual(bm[:n], data[:n])
1382 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1383
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001384 def test_readlines(self):
1385 def bufio():
1386 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1387 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001388 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1389 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1390 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001391
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001392 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001393 data = b"abcdefghi"
1394 dlen = len(data)
1395
1396 tests = [
1397 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1398 [ 100, [ 3, 3, 3], [ dlen ] ],
1399 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1400 ]
1401
1402 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001403 rawio = self.MockFileIO(data)
1404 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001405 pos = 0
1406 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001407 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001408 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001409 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001410 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001411
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001412 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001413 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1415 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001416 self.assertEqual(b"abcd", bufio.read(6))
1417 self.assertEqual(b"e", bufio.read(1))
1418 self.assertEqual(b"fg", bufio.read())
1419 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001420 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001422
Victor Stinnera80987f2011-05-25 22:47:16 +02001423 rawio = self.MockRawIO((b"a", None, None))
1424 self.assertEqual(b"a", rawio.readall())
1425 self.assertIsNone(rawio.readall())
1426
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001427 def test_read_past_eof(self):
1428 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1429 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001430
Ezio Melottib3aedd42010-11-20 19:04:17 +00001431 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001432
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001433 def test_read_all(self):
1434 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1435 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001436
Ezio Melottib3aedd42010-11-20 19:04:17 +00001437 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001438
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001439 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001440 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001441 try:
1442 # Write out many bytes with exactly the same number of 0's,
1443 # 1's... 255's. This will help us check that concurrent reading
1444 # doesn't duplicate or forget contents.
1445 N = 1000
1446 l = list(range(256)) * N
1447 random.shuffle(l)
1448 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001449 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001450 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001451 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001452 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001453 errors = []
1454 results = []
1455 def f():
1456 try:
1457 # Intra-buffer read then buffer-flushing read
1458 for n in cycle([1, 19]):
1459 s = bufio.read(n)
1460 if not s:
1461 break
1462 # list.append() is atomic
1463 results.append(s)
1464 except Exception as e:
1465 errors.append(e)
1466 raise
1467 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001468 with support.start_threads(threads):
1469 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001470 self.assertFalse(errors,
1471 "the following exceptions were caught: %r" % errors)
1472 s = b''.join(results)
1473 for i in range(256):
1474 c = bytes(bytearray([i]))
1475 self.assertEqual(s.count(c), N)
1476 finally:
1477 support.unlink(support.TESTFN)
1478
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001479 def test_unseekable(self):
1480 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1481 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1482 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1483 bufio.read(1)
1484 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1485 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1486
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001487 def test_misbehaved_io(self):
1488 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1489 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001490 self.assertRaises(OSError, bufio.seek, 0)
1491 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492
Victor Stinnerb589cef2019-06-11 03:10:59 +02001493 # Silence destructor error
1494 bufio.close = lambda: None
1495
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001496 def test_no_extraneous_read(self):
1497 # Issue #9550; when the raw IO object has satisfied the read request,
1498 # we should not issue any additional reads, otherwise it may block
1499 # (e.g. socket).
1500 bufsize = 16
1501 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1502 rawio = self.MockRawIO([b"x" * n])
1503 bufio = self.tp(rawio, bufsize)
1504 self.assertEqual(bufio.read(n), b"x" * n)
1505 # Simple case: one raw read is enough to satisfy the request.
1506 self.assertEqual(rawio._extraneous_reads, 0,
1507 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1508 # A more complex case where two raw reads are needed to satisfy
1509 # the request.
1510 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1511 bufio = self.tp(rawio, bufsize)
1512 self.assertEqual(bufio.read(n), b"x" * n)
1513 self.assertEqual(rawio._extraneous_reads, 0,
1514 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1515
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001516 def test_read_on_closed(self):
1517 # Issue #23796
1518 b = io.BufferedReader(io.BytesIO(b"12"))
1519 b.read(1)
1520 b.close()
1521 self.assertRaises(ValueError, b.peek)
1522 self.assertRaises(ValueError, b.read1, 1)
1523
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001524
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001525class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001526 tp = io.BufferedReader
1527
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001528 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1529 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001530 def test_constructor(self):
1531 BufferedReaderTest.test_constructor(self)
1532 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001533 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001534 if sys.maxsize > 0x7FFFFFFF:
1535 rawio = self.MockRawIO()
1536 bufio = self.tp(rawio)
1537 self.assertRaises((OverflowError, MemoryError, ValueError),
1538 bufio.__init__, rawio, sys.maxsize)
1539
1540 def test_initialization(self):
1541 rawio = self.MockRawIO([b"abc"])
1542 bufio = self.tp(rawio)
1543 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1544 self.assertRaises(ValueError, bufio.read)
1545 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1546 self.assertRaises(ValueError, bufio.read)
1547 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1548 self.assertRaises(ValueError, bufio.read)
1549
1550 def test_misbehaved_io_read(self):
1551 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1552 bufio = self.tp(rawio)
1553 # _pyio.BufferedReader seems to implement reading different, so that
1554 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001555 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001556
1557 def test_garbage_collection(self):
1558 # C BufferedReader objects are collected.
1559 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001560 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001561 with support.check_warnings(('', ResourceWarning)):
1562 rawio = self.FileIO(support.TESTFN, "w+b")
1563 f = self.tp(rawio)
1564 f.f = f
1565 wr = weakref.ref(f)
1566 del f
1567 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001568 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001569
R David Murray67bfe802013-02-23 21:51:05 -05001570 def test_args_error(self):
1571 # Issue #17275
1572 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1573 self.tp(io.BytesIO(), 1024, 1024, 1024)
1574
1575
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001576class PyBufferedReaderTest(BufferedReaderTest):
1577 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001578
Guido van Rossuma9e20242007-03-08 00:43:48 +00001579
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001580class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1581 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001582
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001583 def test_constructor(self):
1584 rawio = self.MockRawIO()
1585 bufio = self.tp(rawio)
1586 bufio.__init__(rawio)
1587 bufio.__init__(rawio, buffer_size=1024)
1588 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001589 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001590 bufio.flush()
1591 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1592 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1593 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1594 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001595 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001596 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001597 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001598
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001599 def test_uninitialized(self):
1600 bufio = self.tp.__new__(self.tp)
1601 del bufio
1602 bufio = self.tp.__new__(self.tp)
1603 self.assertRaisesRegex((ValueError, AttributeError),
1604 'uninitialized|has no attribute',
1605 bufio.write, b'')
1606 bufio.__init__(self.MockRawIO())
1607 self.assertEqual(bufio.write(b''), 0)
1608
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001609 def test_detach_flush(self):
1610 raw = self.MockRawIO()
1611 buf = self.tp(raw)
1612 buf.write(b"howdy!")
1613 self.assertFalse(raw._write_stack)
1614 buf.detach()
1615 self.assertEqual(raw._write_stack, [b"howdy!"])
1616
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001617 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001618 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001619 writer = self.MockRawIO()
1620 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001621 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001622 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001623 buffer = bytearray(b"def")
1624 bufio.write(buffer)
1625 buffer[:] = b"***" # Overwrite our copy of the data
1626 bufio.flush()
1627 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001628
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001629 def test_write_overflow(self):
1630 writer = self.MockRawIO()
1631 bufio = self.tp(writer, 8)
1632 contents = b"abcdefghijklmnop"
1633 for n in range(0, len(contents), 3):
1634 bufio.write(contents[n:n+3])
1635 flushed = b"".join(writer._write_stack)
1636 # At least (total - 8) bytes were implicitly flushed, perhaps more
1637 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001638 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001639
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001640 def check_writes(self, intermediate_func):
1641 # Lots of writes, test the flushed output is as expected.
1642 contents = bytes(range(256)) * 1000
1643 n = 0
1644 writer = self.MockRawIO()
1645 bufio = self.tp(writer, 13)
1646 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1647 def gen_sizes():
1648 for size in count(1):
1649 for i in range(15):
1650 yield size
1651 sizes = gen_sizes()
1652 while n < len(contents):
1653 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001654 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001655 intermediate_func(bufio)
1656 n += size
1657 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001658 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001659
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 def test_writes(self):
1661 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001662
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001663 def test_writes_and_flushes(self):
1664 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001665
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001666 def test_writes_and_seeks(self):
1667 def _seekabs(bufio):
1668 pos = bufio.tell()
1669 bufio.seek(pos + 1, 0)
1670 bufio.seek(pos - 1, 0)
1671 bufio.seek(pos, 0)
1672 self.check_writes(_seekabs)
1673 def _seekrel(bufio):
1674 pos = bufio.seek(0, 1)
1675 bufio.seek(+1, 1)
1676 bufio.seek(-1, 1)
1677 bufio.seek(pos, 0)
1678 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001679
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001680 def test_writes_and_truncates(self):
1681 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001682
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001683 def test_write_non_blocking(self):
1684 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001685 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001686
Ezio Melottib3aedd42010-11-20 19:04:17 +00001687 self.assertEqual(bufio.write(b"abcd"), 4)
1688 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001689 # 1 byte will be written, the rest will be buffered
1690 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001691 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001692
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001693 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1694 raw.block_on(b"0")
1695 try:
1696 bufio.write(b"opqrwxyz0123456789")
1697 except self.BlockingIOError as e:
1698 written = e.characters_written
1699 else:
1700 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001701 self.assertEqual(written, 16)
1702 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001703 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001704
Ezio Melottib3aedd42010-11-20 19:04:17 +00001705 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001706 s = raw.pop_written()
1707 # Previously buffered bytes were flushed
1708 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001709
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001710 def test_write_and_rewind(self):
1711 raw = io.BytesIO()
1712 bufio = self.tp(raw, 4)
1713 self.assertEqual(bufio.write(b"abcdef"), 6)
1714 self.assertEqual(bufio.tell(), 6)
1715 bufio.seek(0, 0)
1716 self.assertEqual(bufio.write(b"XY"), 2)
1717 bufio.seek(6, 0)
1718 self.assertEqual(raw.getvalue(), b"XYcdef")
1719 self.assertEqual(bufio.write(b"123456"), 6)
1720 bufio.flush()
1721 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001722
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001723 def test_flush(self):
1724 writer = self.MockRawIO()
1725 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001726 bufio.write(b"abc")
1727 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001728 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001729
Antoine Pitrou131a4892012-10-16 22:57:11 +02001730 def test_writelines(self):
1731 l = [b'ab', b'cd', b'ef']
1732 writer = self.MockRawIO()
1733 bufio = self.tp(writer, 8)
1734 bufio.writelines(l)
1735 bufio.flush()
1736 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1737
1738 def test_writelines_userlist(self):
1739 l = UserList([b'ab', b'cd', b'ef'])
1740 writer = self.MockRawIO()
1741 bufio = self.tp(writer, 8)
1742 bufio.writelines(l)
1743 bufio.flush()
1744 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1745
1746 def test_writelines_error(self):
1747 writer = self.MockRawIO()
1748 bufio = self.tp(writer, 8)
1749 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1750 self.assertRaises(TypeError, bufio.writelines, None)
1751 self.assertRaises(TypeError, bufio.writelines, 'abc')
1752
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001753 def test_destructor(self):
1754 writer = self.MockRawIO()
1755 bufio = self.tp(writer, 8)
1756 bufio.write(b"abc")
1757 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001758 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001759 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001760
1761 def test_truncate(self):
1762 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001763 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001764 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765 bufio = self.tp(raw, 8)
1766 bufio.write(b"abcdef")
1767 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001768 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001769 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 self.assertEqual(f.read(), b"abc")
1771
Nitish Chandra059f58c2018-01-28 21:30:09 +05301772 def test_truncate_after_write(self):
1773 # Ensure that truncate preserves the file position after
1774 # writes longer than the buffer size.
1775 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001776 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301777 with self.open(support.TESTFN, "wb") as f:
1778 # Fill with some buffer
1779 f.write(b'\x00' * 10000)
1780 buffer_sizes = [8192, 4096, 200]
1781 for buffer_size in buffer_sizes:
1782 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1783 f.write(b'\x00' * (buffer_size + 1))
1784 # After write write_pos and write_end are set to 0
1785 f.read(1)
1786 # read operation makes sure that pos != raw_pos
1787 f.truncate()
1788 self.assertEqual(f.tell(), buffer_size + 2)
1789
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001790 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001791 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001792 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001793 # Write out many bytes from many threads and test they were
1794 # all flushed.
1795 N = 1000
1796 contents = bytes(range(256)) * N
1797 sizes = cycle([1, 19])
1798 n = 0
1799 queue = deque()
1800 while n < len(contents):
1801 size = next(sizes)
1802 queue.append(contents[n:n+size])
1803 n += size
1804 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001805 # We use a real file object because it allows us to
1806 # exercise situations where the GIL is released before
1807 # writing the buffer to the raw streams. This is in addition
1808 # to concurrency issues due to switching threads in the middle
1809 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001810 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001811 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001812 errors = []
1813 def f():
1814 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001815 while True:
1816 try:
1817 s = queue.popleft()
1818 except IndexError:
1819 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001820 bufio.write(s)
1821 except Exception as e:
1822 errors.append(e)
1823 raise
1824 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001825 with support.start_threads(threads):
1826 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001827 self.assertFalse(errors,
1828 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001829 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001830 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001831 s = f.read()
1832 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001833 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001834 finally:
1835 support.unlink(support.TESTFN)
1836
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001837 def test_misbehaved_io(self):
1838 rawio = self.MisbehavedRawIO()
1839 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001840 self.assertRaises(OSError, bufio.seek, 0)
1841 self.assertRaises(OSError, bufio.tell)
1842 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001843
Victor Stinnerb589cef2019-06-11 03:10:59 +02001844 # Silence destructor error
1845 bufio.close = lambda: None
1846
Florent Xicluna109d5732012-07-07 17:03:22 +02001847 def test_max_buffer_size_removal(self):
1848 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001849 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001850
Benjamin Peterson68623612012-12-20 11:53:11 -06001851 def test_write_error_on_close(self):
1852 raw = self.MockRawIO()
1853 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001854 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001855 raw.write = bad_write
1856 b = self.tp(raw)
1857 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001858 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001859 self.assertTrue(b.closed)
1860
benfogle9703f092017-11-10 16:03:40 -05001861 def test_slow_close_from_thread(self):
1862 # Issue #31976
1863 rawio = self.SlowFlushRawIO()
1864 bufio = self.tp(rawio, 8)
1865 t = threading.Thread(target=bufio.close)
1866 t.start()
1867 rawio.in_flush.wait()
1868 self.assertRaises(ValueError, bufio.write, b'spam')
1869 self.assertTrue(bufio.closed)
1870 t.join()
1871
1872
Benjamin Peterson59406a92009-03-26 17:10:29 +00001873
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001874class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001875 tp = io.BufferedWriter
1876
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001877 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1878 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001879 def test_constructor(self):
1880 BufferedWriterTest.test_constructor(self)
1881 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001882 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001883 if sys.maxsize > 0x7FFFFFFF:
1884 rawio = self.MockRawIO()
1885 bufio = self.tp(rawio)
1886 self.assertRaises((OverflowError, MemoryError, ValueError),
1887 bufio.__init__, rawio, sys.maxsize)
1888
1889 def test_initialization(self):
1890 rawio = self.MockRawIO()
1891 bufio = self.tp(rawio)
1892 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1893 self.assertRaises(ValueError, bufio.write, b"def")
1894 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1895 self.assertRaises(ValueError, bufio.write, b"def")
1896 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1897 self.assertRaises(ValueError, bufio.write, b"def")
1898
1899 def test_garbage_collection(self):
1900 # C BufferedWriter objects are collected, and collecting them flushes
1901 # all data to disk.
1902 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001903 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001904 with support.check_warnings(('', ResourceWarning)):
1905 rawio = self.FileIO(support.TESTFN, "w+b")
1906 f = self.tp(rawio)
1907 f.write(b"123xxx")
1908 f.x = f
1909 wr = weakref.ref(f)
1910 del f
1911 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001912 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001913 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001914 self.assertEqual(f.read(), b"123xxx")
1915
R David Murray67bfe802013-02-23 21:51:05 -05001916 def test_args_error(self):
1917 # Issue #17275
1918 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1919 self.tp(io.BytesIO(), 1024, 1024, 1024)
1920
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001921
1922class PyBufferedWriterTest(BufferedWriterTest):
1923 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001924
Guido van Rossum01a27522007-03-07 01:00:12 +00001925class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001926
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001927 def test_constructor(self):
1928 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001929 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001930
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001931 def test_uninitialized(self):
1932 pair = self.tp.__new__(self.tp)
1933 del pair
1934 pair = self.tp.__new__(self.tp)
1935 self.assertRaisesRegex((ValueError, AttributeError),
1936 'uninitialized|has no attribute',
1937 pair.read, 0)
1938 self.assertRaisesRegex((ValueError, AttributeError),
1939 'uninitialized|has no attribute',
1940 pair.write, b'')
1941 pair.__init__(self.MockRawIO(), self.MockRawIO())
1942 self.assertEqual(pair.read(0), b'')
1943 self.assertEqual(pair.write(b''), 0)
1944
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001945 def test_detach(self):
1946 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1947 self.assertRaises(self.UnsupportedOperation, pair.detach)
1948
Florent Xicluna109d5732012-07-07 17:03:22 +02001949 def test_constructor_max_buffer_size_removal(self):
1950 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001951 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001952
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001953 def test_constructor_with_not_readable(self):
1954 class NotReadable(MockRawIO):
1955 def readable(self):
1956 return False
1957
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001958 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001959
1960 def test_constructor_with_not_writeable(self):
1961 class NotWriteable(MockRawIO):
1962 def writable(self):
1963 return False
1964
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001965 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001966
1967 def test_read(self):
1968 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1969
1970 self.assertEqual(pair.read(3), b"abc")
1971 self.assertEqual(pair.read(1), b"d")
1972 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001973 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1974 self.assertEqual(pair.read(None), b"abc")
1975
1976 def test_readlines(self):
1977 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1978 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1979 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1980 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001981
1982 def test_read1(self):
1983 # .read1() is delegated to the underlying reader object, so this test
1984 # can be shallow.
1985 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1986
1987 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001988 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001989
1990 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001991 for method in ("readinto", "readinto1"):
1992 with self.subTest(method):
1993 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001994
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03001995 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00001996 self.assertEqual(getattr(pair, method)(data), 5)
1997 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001998
1999 def test_write(self):
2000 w = self.MockRawIO()
2001 pair = self.tp(self.MockRawIO(), w)
2002
2003 pair.write(b"abc")
2004 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002005 buffer = bytearray(b"def")
2006 pair.write(buffer)
2007 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002008 pair.flush()
2009 self.assertEqual(w._write_stack, [b"abc", b"def"])
2010
2011 def test_peek(self):
2012 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2013
2014 self.assertTrue(pair.peek(3).startswith(b"abc"))
2015 self.assertEqual(pair.read(3), b"abc")
2016
2017 def test_readable(self):
2018 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2019 self.assertTrue(pair.readable())
2020
2021 def test_writeable(self):
2022 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2023 self.assertTrue(pair.writable())
2024
2025 def test_seekable(self):
2026 # BufferedRWPairs are never seekable, even if their readers and writers
2027 # are.
2028 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2029 self.assertFalse(pair.seekable())
2030
2031 # .flush() is delegated to the underlying writer object and has been
2032 # tested in the test_write method.
2033
2034 def test_close_and_closed(self):
2035 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2036 self.assertFalse(pair.closed)
2037 pair.close()
2038 self.assertTrue(pair.closed)
2039
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002040 def test_reader_close_error_on_close(self):
2041 def reader_close():
2042 reader_non_existing
2043 reader = self.MockRawIO()
2044 reader.close = reader_close
2045 writer = self.MockRawIO()
2046 pair = self.tp(reader, writer)
2047 with self.assertRaises(NameError) as err:
2048 pair.close()
2049 self.assertIn('reader_non_existing', str(err.exception))
2050 self.assertTrue(pair.closed)
2051 self.assertFalse(reader.closed)
2052 self.assertTrue(writer.closed)
2053
Victor Stinner472f7942019-04-12 21:58:24 +02002054 # Silence destructor error
2055 reader.close = lambda: None
2056
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002057 def test_writer_close_error_on_close(self):
2058 def writer_close():
2059 writer_non_existing
2060 reader = self.MockRawIO()
2061 writer = self.MockRawIO()
2062 writer.close = writer_close
2063 pair = self.tp(reader, writer)
2064 with self.assertRaises(NameError) as err:
2065 pair.close()
2066 self.assertIn('writer_non_existing', str(err.exception))
2067 self.assertFalse(pair.closed)
2068 self.assertTrue(reader.closed)
2069 self.assertFalse(writer.closed)
2070
Victor Stinner472f7942019-04-12 21:58:24 +02002071 # Silence destructor error
2072 writer.close = lambda: None
2073
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002074 def test_reader_writer_close_error_on_close(self):
2075 def reader_close():
2076 reader_non_existing
2077 def writer_close():
2078 writer_non_existing
2079 reader = self.MockRawIO()
2080 reader.close = reader_close
2081 writer = self.MockRawIO()
2082 writer.close = writer_close
2083 pair = self.tp(reader, writer)
2084 with self.assertRaises(NameError) as err:
2085 pair.close()
2086 self.assertIn('reader_non_existing', str(err.exception))
2087 self.assertIsInstance(err.exception.__context__, NameError)
2088 self.assertIn('writer_non_existing', str(err.exception.__context__))
2089 self.assertFalse(pair.closed)
2090 self.assertFalse(reader.closed)
2091 self.assertFalse(writer.closed)
2092
Victor Stinner472f7942019-04-12 21:58:24 +02002093 # Silence destructor error
2094 reader.close = lambda: None
2095 writer.close = lambda: None
2096
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002097 def test_isatty(self):
2098 class SelectableIsAtty(MockRawIO):
2099 def __init__(self, isatty):
2100 MockRawIO.__init__(self)
2101 self._isatty = isatty
2102
2103 def isatty(self):
2104 return self._isatty
2105
2106 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2107 self.assertFalse(pair.isatty())
2108
2109 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2110 self.assertTrue(pair.isatty())
2111
2112 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2113 self.assertTrue(pair.isatty())
2114
2115 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2116 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002117
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002118 def test_weakref_clearing(self):
2119 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2120 ref = weakref.ref(brw)
2121 brw = None
2122 ref = None # Shouldn't segfault.
2123
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002124class CBufferedRWPairTest(BufferedRWPairTest):
2125 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002126
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002127class PyBufferedRWPairTest(BufferedRWPairTest):
2128 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002129
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002130
2131class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2132 read_mode = "rb+"
2133 write_mode = "wb+"
2134
2135 def test_constructor(self):
2136 BufferedReaderTest.test_constructor(self)
2137 BufferedWriterTest.test_constructor(self)
2138
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002139 def test_uninitialized(self):
2140 BufferedReaderTest.test_uninitialized(self)
2141 BufferedWriterTest.test_uninitialized(self)
2142
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002143 def test_read_and_write(self):
2144 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002145 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002146
2147 self.assertEqual(b"as", rw.read(2))
2148 rw.write(b"ddd")
2149 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002150 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002151 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002152 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002154 def test_seek_and_tell(self):
2155 raw = self.BytesIO(b"asdfghjkl")
2156 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002157
Ezio Melottib3aedd42010-11-20 19:04:17 +00002158 self.assertEqual(b"as", rw.read(2))
2159 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002160 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002161 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002162
Antoine Pitroue05565e2011-08-20 14:39:23 +02002163 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002164 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002165 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002166 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002167 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002168 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002169 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002170 self.assertEqual(7, rw.tell())
2171 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002172 rw.flush()
2173 self.assertEqual(b"asdf123fl", raw.getvalue())
2174
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002175 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002176
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002177 def check_flush_and_read(self, read_func):
2178 raw = self.BytesIO(b"abcdefghi")
2179 bufio = self.tp(raw)
2180
Ezio Melottib3aedd42010-11-20 19:04:17 +00002181 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002182 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002183 self.assertEqual(b"ef", read_func(bufio, 2))
2184 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002185 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002186 self.assertEqual(6, bufio.tell())
2187 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002188 raw.seek(0, 0)
2189 raw.write(b"XYZ")
2190 # flush() resets the read buffer
2191 bufio.flush()
2192 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002193 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002194
2195 def test_flush_and_read(self):
2196 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2197
2198 def test_flush_and_readinto(self):
2199 def _readinto(bufio, n=-1):
2200 b = bytearray(n if n >= 0 else 9999)
2201 n = bufio.readinto(b)
2202 return bytes(b[:n])
2203 self.check_flush_and_read(_readinto)
2204
2205 def test_flush_and_peek(self):
2206 def _peek(bufio, n=-1):
2207 # This relies on the fact that the buffer can contain the whole
2208 # raw stream, otherwise peek() can return less.
2209 b = bufio.peek(n)
2210 if n != -1:
2211 b = b[:n]
2212 bufio.seek(len(b), 1)
2213 return b
2214 self.check_flush_and_read(_peek)
2215
2216 def test_flush_and_write(self):
2217 raw = self.BytesIO(b"abcdefghi")
2218 bufio = self.tp(raw)
2219
2220 bufio.write(b"123")
2221 bufio.flush()
2222 bufio.write(b"45")
2223 bufio.flush()
2224 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002225 self.assertEqual(b"12345fghi", raw.getvalue())
2226 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002227
2228 def test_threads(self):
2229 BufferedReaderTest.test_threads(self)
2230 BufferedWriterTest.test_threads(self)
2231
2232 def test_writes_and_peek(self):
2233 def _peek(bufio):
2234 bufio.peek(1)
2235 self.check_writes(_peek)
2236 def _peek(bufio):
2237 pos = bufio.tell()
2238 bufio.seek(-1, 1)
2239 bufio.peek(1)
2240 bufio.seek(pos, 0)
2241 self.check_writes(_peek)
2242
2243 def test_writes_and_reads(self):
2244 def _read(bufio):
2245 bufio.seek(-1, 1)
2246 bufio.read(1)
2247 self.check_writes(_read)
2248
2249 def test_writes_and_read1s(self):
2250 def _read1(bufio):
2251 bufio.seek(-1, 1)
2252 bufio.read1(1)
2253 self.check_writes(_read1)
2254
2255 def test_writes_and_readintos(self):
2256 def _read(bufio):
2257 bufio.seek(-1, 1)
2258 bufio.readinto(bytearray(1))
2259 self.check_writes(_read)
2260
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002261 def test_write_after_readahead(self):
2262 # Issue #6629: writing after the buffer was filled by readahead should
2263 # first rewind the raw stream.
2264 for overwrite_size in [1, 5]:
2265 raw = self.BytesIO(b"A" * 10)
2266 bufio = self.tp(raw, 4)
2267 # Trigger readahead
2268 self.assertEqual(bufio.read(1), b"A")
2269 self.assertEqual(bufio.tell(), 1)
2270 # Overwriting should rewind the raw stream if it needs so
2271 bufio.write(b"B" * overwrite_size)
2272 self.assertEqual(bufio.tell(), overwrite_size + 1)
2273 # If the write size was smaller than the buffer size, flush() and
2274 # check that rewind happens.
2275 bufio.flush()
2276 self.assertEqual(bufio.tell(), overwrite_size + 1)
2277 s = raw.getvalue()
2278 self.assertEqual(s,
2279 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2280
Antoine Pitrou7c404892011-05-13 00:13:33 +02002281 def test_write_rewind_write(self):
2282 # Various combinations of reading / writing / seeking backwards / writing again
2283 def mutate(bufio, pos1, pos2):
2284 assert pos2 >= pos1
2285 # Fill the buffer
2286 bufio.seek(pos1)
2287 bufio.read(pos2 - pos1)
2288 bufio.write(b'\x02')
2289 # This writes earlier than the previous write, but still inside
2290 # the buffer.
2291 bufio.seek(pos1)
2292 bufio.write(b'\x01')
2293
2294 b = b"\x80\x81\x82\x83\x84"
2295 for i in range(0, len(b)):
2296 for j in range(i, len(b)):
2297 raw = self.BytesIO(b)
2298 bufio = self.tp(raw, 100)
2299 mutate(bufio, i, j)
2300 bufio.flush()
2301 expected = bytearray(b)
2302 expected[j] = 2
2303 expected[i] = 1
2304 self.assertEqual(raw.getvalue(), expected,
2305 "failed result for i=%d, j=%d" % (i, j))
2306
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002307 def test_truncate_after_read_or_write(self):
2308 raw = self.BytesIO(b"A" * 10)
2309 bufio = self.tp(raw, 100)
2310 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2311 self.assertEqual(bufio.truncate(), 2)
2312 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2313 self.assertEqual(bufio.truncate(), 4)
2314
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002315 def test_misbehaved_io(self):
2316 BufferedReaderTest.test_misbehaved_io(self)
2317 BufferedWriterTest.test_misbehaved_io(self)
2318
Antoine Pitroue05565e2011-08-20 14:39:23 +02002319 def test_interleaved_read_write(self):
2320 # Test for issue #12213
2321 with self.BytesIO(b'abcdefgh') as raw:
2322 with self.tp(raw, 100) as f:
2323 f.write(b"1")
2324 self.assertEqual(f.read(1), b'b')
2325 f.write(b'2')
2326 self.assertEqual(f.read1(1), b'd')
2327 f.write(b'3')
2328 buf = bytearray(1)
2329 f.readinto(buf)
2330 self.assertEqual(buf, b'f')
2331 f.write(b'4')
2332 self.assertEqual(f.peek(1), b'h')
2333 f.flush()
2334 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2335
2336 with self.BytesIO(b'abc') as raw:
2337 with self.tp(raw, 100) as f:
2338 self.assertEqual(f.read(1), b'a')
2339 f.write(b"2")
2340 self.assertEqual(f.read(1), b'c')
2341 f.flush()
2342 self.assertEqual(raw.getvalue(), b'a2c')
2343
2344 def test_interleaved_readline_write(self):
2345 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2346 with self.tp(raw) as f:
2347 f.write(b'1')
2348 self.assertEqual(f.readline(), b'b\n')
2349 f.write(b'2')
2350 self.assertEqual(f.readline(), b'def\n')
2351 f.write(b'3')
2352 self.assertEqual(f.readline(), b'\n')
2353 f.flush()
2354 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2355
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002356 # You can't construct a BufferedRandom over a non-seekable stream.
2357 test_unseekable = None
2358
R David Murray67bfe802013-02-23 21:51:05 -05002359
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002360class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002361 tp = io.BufferedRandom
2362
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002363 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2364 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002365 def test_constructor(self):
2366 BufferedRandomTest.test_constructor(self)
2367 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002368 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002369 if sys.maxsize > 0x7FFFFFFF:
2370 rawio = self.MockRawIO()
2371 bufio = self.tp(rawio)
2372 self.assertRaises((OverflowError, MemoryError, ValueError),
2373 bufio.__init__, rawio, sys.maxsize)
2374
2375 def test_garbage_collection(self):
2376 CBufferedReaderTest.test_garbage_collection(self)
2377 CBufferedWriterTest.test_garbage_collection(self)
2378
R David Murray67bfe802013-02-23 21:51:05 -05002379 def test_args_error(self):
2380 # Issue #17275
2381 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2382 self.tp(io.BytesIO(), 1024, 1024, 1024)
2383
2384
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002385class PyBufferedRandomTest(BufferedRandomTest):
2386 tp = pyio.BufferedRandom
2387
2388
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002389# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2390# properties:
2391# - A single output character can correspond to many bytes of input.
2392# - The number of input bytes to complete the character can be
2393# undetermined until the last input byte is received.
2394# - The number of input bytes can vary depending on previous input.
2395# - A single input byte can correspond to many characters of output.
2396# - The number of output characters can be undetermined until the
2397# last input byte is received.
2398# - The number of output characters can vary depending on previous input.
2399
2400class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2401 """
2402 For testing seek/tell behavior with a stateful, buffering decoder.
2403
2404 Input is a sequence of words. Words may be fixed-length (length set
2405 by input) or variable-length (period-terminated). In variable-length
2406 mode, extra periods are ignored. Possible words are:
2407 - 'i' followed by a number sets the input length, I (maximum 99).
2408 When I is set to 0, words are space-terminated.
2409 - 'o' followed by a number sets the output length, O (maximum 99).
2410 - Any other word is converted into a word followed by a period on
2411 the output. The output word consists of the input word truncated
2412 or padded out with hyphens to make its length equal to O. If O
2413 is 0, the word is output verbatim without truncating or padding.
2414 I and O are initially set to 1. When I changes, any buffered input is
2415 re-scanned according to the new I. EOF also terminates the last word.
2416 """
2417
2418 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002419 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002420 self.reset()
2421
2422 def __repr__(self):
2423 return '<SID %x>' % id(self)
2424
2425 def reset(self):
2426 self.i = 1
2427 self.o = 1
2428 self.buffer = bytearray()
2429
2430 def getstate(self):
2431 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2432 return bytes(self.buffer), i*100 + o
2433
2434 def setstate(self, state):
2435 buffer, io = state
2436 self.buffer = bytearray(buffer)
2437 i, o = divmod(io, 100)
2438 self.i, self.o = i ^ 1, o ^ 1
2439
2440 def decode(self, input, final=False):
2441 output = ''
2442 for b in input:
2443 if self.i == 0: # variable-length, terminated with period
2444 if b == ord('.'):
2445 if self.buffer:
2446 output += self.process_word()
2447 else:
2448 self.buffer.append(b)
2449 else: # fixed-length, terminate after self.i bytes
2450 self.buffer.append(b)
2451 if len(self.buffer) == self.i:
2452 output += self.process_word()
2453 if final and self.buffer: # EOF terminates the last word
2454 output += self.process_word()
2455 return output
2456
2457 def process_word(self):
2458 output = ''
2459 if self.buffer[0] == ord('i'):
2460 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2461 elif self.buffer[0] == ord('o'):
2462 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2463 else:
2464 output = self.buffer.decode('ascii')
2465 if len(output) < self.o:
2466 output += '-'*self.o # pad out with hyphens
2467 if self.o:
2468 output = output[:self.o] # truncate to output length
2469 output += '.'
2470 self.buffer = bytearray()
2471 return output
2472
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002473 codecEnabled = False
2474
2475 @classmethod
2476 def lookupTestDecoder(cls, name):
2477 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002478 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002479 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002480 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002481 incrementalencoder=None,
2482 streamreader=None, streamwriter=None,
2483 incrementaldecoder=cls)
2484
2485# Register the previous decoder for testing.
2486# Disabled by default, tests will enable it.
2487codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2488
2489
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002490class StatefulIncrementalDecoderTest(unittest.TestCase):
2491 """
2492 Make sure the StatefulIncrementalDecoder actually works.
2493 """
2494
2495 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002496 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002497 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002498 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002499 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002500 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002501 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002502 # I=0, O=6 (variable-length input, fixed-length output)
2503 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2504 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002505 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002506 # I=6, O=3 (fixed-length input > fixed-length output)
2507 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2508 # I=0, then 3; O=29, then 15 (with longer output)
2509 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2510 'a----------------------------.' +
2511 'b----------------------------.' +
2512 'cde--------------------------.' +
2513 'abcdefghijabcde.' +
2514 'a.b------------.' +
2515 '.c.------------.' +
2516 'd.e------------.' +
2517 'k--------------.' +
2518 'l--------------.' +
2519 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002520 ]
2521
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002522 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002523 # Try a few one-shot test cases.
2524 for input, eof, output in self.test_cases:
2525 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002526 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002527
2528 # Also test an unfinished decode, followed by forcing EOF.
2529 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002530 self.assertEqual(d.decode(b'oiabcd'), '')
2531 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002532
2533class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002534
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002535 def setUp(self):
2536 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2537 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002538 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002539
Guido van Rossumd0712812007-04-11 16:32:43 +00002540 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002541 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002542
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002543 def test_constructor(self):
2544 r = self.BytesIO(b"\xc3\xa9\n\n")
2545 b = self.BufferedReader(r, 1000)
2546 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002547 t.__init__(b, encoding="latin-1", newline="\r\n")
2548 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002549 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002550 t.__init__(b, encoding="utf-8", line_buffering=True)
2551 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002552 self.assertEqual(t.line_buffering, True)
2553 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002554 self.assertRaises(TypeError, t.__init__, b, newline=42)
2555 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2556
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002557 def test_uninitialized(self):
2558 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2559 del t
2560 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2561 self.assertRaises(Exception, repr, t)
2562 self.assertRaisesRegex((ValueError, AttributeError),
2563 'uninitialized|has no attribute',
2564 t.read, 0)
2565 t.__init__(self.MockRawIO())
2566 self.assertEqual(t.read(0), '')
2567
Nick Coghlana9b15242014-02-04 22:11:18 +10002568 def test_non_text_encoding_codecs_are_rejected(self):
2569 # Ensure the constructor complains if passed a codec that isn't
2570 # marked as a text encoding
2571 # http://bugs.python.org/issue20404
2572 r = self.BytesIO()
2573 b = self.BufferedWriter(r)
2574 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2575 self.TextIOWrapper(b, encoding="hex")
2576
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002577 def test_detach(self):
2578 r = self.BytesIO()
2579 b = self.BufferedWriter(r)
2580 t = self.TextIOWrapper(b)
2581 self.assertIs(t.detach(), b)
2582
2583 t = self.TextIOWrapper(b, encoding="ascii")
2584 t.write("howdy")
2585 self.assertFalse(r.getvalue())
2586 t.detach()
2587 self.assertEqual(r.getvalue(), b"howdy")
2588 self.assertRaises(ValueError, t.detach)
2589
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002590 # Operations independent of the detached stream should still work
2591 repr(t)
2592 self.assertEqual(t.encoding, "ascii")
2593 self.assertEqual(t.errors, "strict")
2594 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002595 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002596
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002597 def test_repr(self):
2598 raw = self.BytesIO("hello".encode("utf-8"))
2599 b = self.BufferedReader(raw)
2600 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002601 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002602 self.assertRegex(repr(t),
2603 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002604 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002605 self.assertRegex(repr(t),
2606 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002607 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002608 self.assertRegex(repr(t),
2609 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002610 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002611 self.assertRegex(repr(t),
2612 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002613
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002614 t.buffer.detach()
2615 repr(t) # Should not raise an exception
2616
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002617 def test_recursive_repr(self):
2618 # Issue #25455
2619 raw = self.BytesIO()
2620 t = self.TextIOWrapper(raw)
2621 with support.swap_attr(raw, 'name', t):
2622 try:
2623 repr(t) # Should not crash
2624 except RuntimeError:
2625 pass
2626
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002627 def test_line_buffering(self):
2628 r = self.BytesIO()
2629 b = self.BufferedWriter(r, 1000)
2630 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002631 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002632 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002633 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002634 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002635 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002636 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002637
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002638 def test_reconfigure_line_buffering(self):
2639 r = self.BytesIO()
2640 b = self.BufferedWriter(r, 1000)
2641 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2642 t.write("AB\nC")
2643 self.assertEqual(r.getvalue(), b"")
2644
2645 t.reconfigure(line_buffering=True) # implicit flush
2646 self.assertEqual(r.getvalue(), b"AB\nC")
2647 t.write("DEF\nG")
2648 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2649 t.write("H")
2650 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2651 t.reconfigure(line_buffering=False) # implicit flush
2652 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2653 t.write("IJ")
2654 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2655
2656 # Keeping default value
2657 t.reconfigure()
2658 t.reconfigure(line_buffering=None)
2659 self.assertEqual(t.line_buffering, False)
2660 t.reconfigure(line_buffering=True)
2661 t.reconfigure()
2662 t.reconfigure(line_buffering=None)
2663 self.assertEqual(t.line_buffering, True)
2664
Victor Stinner91106cd2017-12-13 12:29:09 +01002665 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002666 def test_default_encoding(self):
2667 old_environ = dict(os.environ)
2668 try:
2669 # try to get a user preferred encoding different than the current
2670 # locale encoding to check that TextIOWrapper() uses the current
2671 # locale encoding and not the user preferred encoding
2672 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2673 if key in os.environ:
2674 del os.environ[key]
2675
2676 current_locale_encoding = locale.getpreferredencoding(False)
2677 b = self.BytesIO()
2678 t = self.TextIOWrapper(b)
2679 self.assertEqual(t.encoding, current_locale_encoding)
2680 finally:
2681 os.environ.clear()
2682 os.environ.update(old_environ)
2683
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002684 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002685 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002686 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002687 # Issue 15989
2688 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002689 b = self.BytesIO()
2690 b.fileno = lambda: _testcapi.INT_MAX + 1
2691 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2692 b.fileno = lambda: _testcapi.UINT_MAX + 1
2693 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2694
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002695 def test_encoding(self):
2696 # Check the encoding attribute is always set, and valid
2697 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002698 t = self.TextIOWrapper(b, encoding="utf-8")
2699 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002700 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002701 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002702 codecs.lookup(t.encoding)
2703
2704 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002705 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002706 b = self.BytesIO(b"abc\n\xff\n")
2707 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002708 self.assertRaises(UnicodeError, t.read)
2709 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002710 b = self.BytesIO(b"abc\n\xff\n")
2711 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002712 self.assertRaises(UnicodeError, t.read)
2713 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002714 b = self.BytesIO(b"abc\n\xff\n")
2715 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002716 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002717 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002718 b = self.BytesIO(b"abc\n\xff\n")
2719 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002720 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002721
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002722 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002723 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002724 b = self.BytesIO()
2725 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002726 self.assertRaises(UnicodeError, t.write, "\xff")
2727 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002728 b = self.BytesIO()
2729 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002730 self.assertRaises(UnicodeError, t.write, "\xff")
2731 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002732 b = self.BytesIO()
2733 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002734 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002735 t.write("abc\xffdef\n")
2736 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002737 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002738 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002739 b = self.BytesIO()
2740 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002741 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002742 t.write("abc\xffdef\n")
2743 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002744 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002745
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002747 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2748
2749 tests = [
2750 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002751 [ '', input_lines ],
2752 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2753 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2754 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002755 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002756 encodings = (
2757 'utf-8', 'latin-1',
2758 'utf-16', 'utf-16-le', 'utf-16-be',
2759 'utf-32', 'utf-32-le', 'utf-32-be',
2760 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002761
Guido van Rossum8358db22007-08-18 21:39:55 +00002762 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002763 # character in TextIOWrapper._pending_line.
2764 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002765 # XXX: str.encode() should return bytes
2766 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002767 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002768 for bufsize in range(1, 10):
2769 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002770 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2771 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002772 encoding=encoding)
2773 if do_reads:
2774 got_lines = []
2775 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002776 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002777 if c2 == '':
2778 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002779 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002780 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002781 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002782 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002783
2784 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002785 self.assertEqual(got_line, exp_line)
2786 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002787
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002788 def test_newlines_input(self):
2789 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002790 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2791 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002792 (None, normalized.decode("ascii").splitlines(keepends=True)),
2793 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002794 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2795 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2796 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002797 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002798 buf = self.BytesIO(testdata)
2799 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002800 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002801 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002802 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002803
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002804 def test_newlines_output(self):
2805 testdict = {
2806 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2807 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2808 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2809 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2810 }
2811 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2812 for newline, expected in tests:
2813 buf = self.BytesIO()
2814 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2815 txt.write("AAA\nB")
2816 txt.write("BB\nCCC\n")
2817 txt.write("X\rY\r\nZ")
2818 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002819 self.assertEqual(buf.closed, False)
2820 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002821
2822 def test_destructor(self):
2823 l = []
2824 base = self.BytesIO
2825 class MyBytesIO(base):
2826 def close(self):
2827 l.append(self.getvalue())
2828 base.close(self)
2829 b = MyBytesIO()
2830 t = self.TextIOWrapper(b, encoding="ascii")
2831 t.write("abc")
2832 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002833 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002834 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002835
2836 def test_override_destructor(self):
2837 record = []
2838 class MyTextIO(self.TextIOWrapper):
2839 def __del__(self):
2840 record.append(1)
2841 try:
2842 f = super().__del__
2843 except AttributeError:
2844 pass
2845 else:
2846 f()
2847 def close(self):
2848 record.append(2)
2849 super().close()
2850 def flush(self):
2851 record.append(3)
2852 super().flush()
2853 b = self.BytesIO()
2854 t = MyTextIO(b, encoding="ascii")
2855 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002856 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002857 self.assertEqual(record, [1, 2, 3])
2858
2859 def test_error_through_destructor(self):
2860 # Test that the exception state is not modified by a destructor,
2861 # even if close() fails.
2862 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002863 with support.catch_unraisable_exception() as cm:
2864 with self.assertRaises(AttributeError):
2865 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002866
2867 if not IOBASE_EMITS_UNRAISABLE:
2868 self.assertIsNone(cm.unraisable)
2869 elif cm.unraisable is not None:
2870 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002871
Guido van Rossum9b76da62007-04-11 01:09:03 +00002872 # Systematic tests of the text I/O API
2873
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002874 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002875 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002876 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002877 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002878 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002879 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002880 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002881 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002882 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002883 self.assertEqual(f.tell(), 0)
2884 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002885 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002886 self.assertEqual(f.seek(0), 0)
2887 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002888 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002889 self.assertEqual(f.read(2), "ab")
2890 self.assertEqual(f.read(1), "c")
2891 self.assertEqual(f.read(1), "")
2892 self.assertEqual(f.read(), "")
2893 self.assertEqual(f.tell(), cookie)
2894 self.assertEqual(f.seek(0), 0)
2895 self.assertEqual(f.seek(0, 2), cookie)
2896 self.assertEqual(f.write("def"), 3)
2897 self.assertEqual(f.seek(cookie), cookie)
2898 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002899 if enc.startswith("utf"):
2900 self.multi_line_test(f, enc)
2901 f.close()
2902
2903 def multi_line_test(self, f, enc):
2904 f.seek(0)
2905 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002906 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002907 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002908 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002909 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002910 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002911 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002912 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002913 wlines.append((f.tell(), line))
2914 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002915 f.seek(0)
2916 rlines = []
2917 while True:
2918 pos = f.tell()
2919 line = f.readline()
2920 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002921 break
2922 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002923 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002924
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002925 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002926 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002927 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002928 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002929 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002930 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002931 p2 = f.tell()
2932 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002933 self.assertEqual(f.tell(), p0)
2934 self.assertEqual(f.readline(), "\xff\n")
2935 self.assertEqual(f.tell(), p1)
2936 self.assertEqual(f.readline(), "\xff\n")
2937 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002938 f.seek(0)
2939 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002940 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002941 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002942 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002943 f.close()
2944
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002945 def test_seeking(self):
2946 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002947 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002948 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002949 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002950 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002951 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002952 suffix = bytes(u_suffix.encode("utf-8"))
2953 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002954 with self.open(support.TESTFN, "wb") as f:
2955 f.write(line*2)
2956 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2957 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002958 self.assertEqual(s, str(prefix, "ascii"))
2959 self.assertEqual(f.tell(), prefix_size)
2960 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002961
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002962 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002963 # Regression test for a specific bug
2964 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002965 with self.open(support.TESTFN, "wb") as f:
2966 f.write(data)
2967 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2968 f._CHUNK_SIZE # Just test that it exists
2969 f._CHUNK_SIZE = 2
2970 f.readline()
2971 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002973 def test_seek_and_tell(self):
2974 #Test seek/tell using the StatefulIncrementalDecoder.
2975 # Make test faster by doing smaller seeks
2976 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002977
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002978 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002979 """Tell/seek to various points within a data stream and ensure
2980 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002981 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002982 f.write(data)
2983 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002984 f = self.open(support.TESTFN, encoding='test_decoder')
2985 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002986 decoded = f.read()
2987 f.close()
2988
Neal Norwitze2b07052008-03-18 19:52:05 +00002989 for i in range(min_pos, len(decoded) + 1): # seek positions
2990 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002991 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00002992 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002993 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002994 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002995 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002996 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002997 f.close()
2998
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002999 # Enable the test decoder.
3000 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003001
3002 # Run the tests.
3003 try:
3004 # Try each test case.
3005 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003006 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003007
3008 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003009 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3010 offset = CHUNK_SIZE - len(input)//2
3011 prefix = b'.'*offset
3012 # Don't bother seeking into the prefix (takes too long).
3013 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003014 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003015
3016 # Ensure our test decoder won't interfere with subsequent tests.
3017 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003018 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003019
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003020 def test_multibyte_seek_and_tell(self):
3021 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3022 f.write("AB\n\u3046\u3048\n")
3023 f.close()
3024
3025 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3026 self.assertEqual(f.readline(), "AB\n")
3027 p0 = f.tell()
3028 self.assertEqual(f.readline(), "\u3046\u3048\n")
3029 p1 = f.tell()
3030 f.seek(p0)
3031 self.assertEqual(f.readline(), "\u3046\u3048\n")
3032 self.assertEqual(f.tell(), p1)
3033 f.close()
3034
3035 def test_seek_with_encoder_state(self):
3036 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3037 f.write("\u00e6\u0300")
3038 p0 = f.tell()
3039 f.write("\u00e6")
3040 f.seek(p0)
3041 f.write("\u0300")
3042 f.close()
3043
3044 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3045 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3046 f.close()
3047
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003048 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003049 data = "1234567890"
3050 tests = ("utf-16",
3051 "utf-16-le",
3052 "utf-16-be",
3053 "utf-32",
3054 "utf-32-le",
3055 "utf-32-be")
3056 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003057 buf = self.BytesIO()
3058 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003059 # Check if the BOM is written only once (see issue1753).
3060 f.write(data)
3061 f.write(data)
3062 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003063 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003064 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003065 self.assertEqual(f.read(), data * 2)
3066 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003067
Benjamin Petersona1b49012009-03-31 23:11:32 +00003068 def test_unreadable(self):
3069 class UnReadable(self.BytesIO):
3070 def readable(self):
3071 return False
3072 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003073 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003074
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003075 def test_read_one_by_one(self):
3076 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003077 reads = ""
3078 while True:
3079 c = txt.read(1)
3080 if not c:
3081 break
3082 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003083 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003084
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003085 def test_readlines(self):
3086 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3087 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3088 txt.seek(0)
3089 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3090 txt.seek(0)
3091 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3092
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003093 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003094 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003095 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003096 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003097 reads = ""
3098 while True:
3099 c = txt.read(128)
3100 if not c:
3101 break
3102 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003103 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003104
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003105 def test_writelines(self):
3106 l = ['ab', 'cd', 'ef']
3107 buf = self.BytesIO()
3108 txt = self.TextIOWrapper(buf)
3109 txt.writelines(l)
3110 txt.flush()
3111 self.assertEqual(buf.getvalue(), b'abcdef')
3112
3113 def test_writelines_userlist(self):
3114 l = UserList(['ab', 'cd', 'ef'])
3115 buf = self.BytesIO()
3116 txt = self.TextIOWrapper(buf)
3117 txt.writelines(l)
3118 txt.flush()
3119 self.assertEqual(buf.getvalue(), b'abcdef')
3120
3121 def test_writelines_error(self):
3122 txt = self.TextIOWrapper(self.BytesIO())
3123 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3124 self.assertRaises(TypeError, txt.writelines, None)
3125 self.assertRaises(TypeError, txt.writelines, b'abc')
3126
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003127 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003128 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003129
3130 # read one char at a time
3131 reads = ""
3132 while True:
3133 c = txt.read(1)
3134 if not c:
3135 break
3136 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003137 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003138
3139 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003140 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003141 txt._CHUNK_SIZE = 4
3142
3143 reads = ""
3144 while True:
3145 c = txt.read(4)
3146 if not c:
3147 break
3148 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003149 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003150
3151 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003152 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003153 txt._CHUNK_SIZE = 4
3154
3155 reads = txt.read(4)
3156 reads += txt.read(4)
3157 reads += txt.readline()
3158 reads += txt.readline()
3159 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003160 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003161
3162 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003163 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003164 txt._CHUNK_SIZE = 4
3165
3166 reads = txt.read(4)
3167 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003168 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003169
3170 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003171 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003172 txt._CHUNK_SIZE = 4
3173
3174 reads = txt.read(4)
3175 pos = txt.tell()
3176 txt.seek(0)
3177 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003178 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003179
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003180 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003181 buffer = self.BytesIO(self.testdata)
3182 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003183
3184 self.assertEqual(buffer.seekable(), txt.seekable())
3185
Antoine Pitroue4501852009-05-14 18:55:55 +00003186 def test_append_bom(self):
3187 # The BOM is not written again when appending to a non-empty file
3188 filename = support.TESTFN
3189 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3190 with self.open(filename, 'w', encoding=charset) as f:
3191 f.write('aaa')
3192 pos = f.tell()
3193 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003194 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003195
3196 with self.open(filename, 'a', encoding=charset) as f:
3197 f.write('xxx')
3198 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003199 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003200
3201 def test_seek_bom(self):
3202 # Same test, but when seeking manually
3203 filename = support.TESTFN
3204 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3205 with self.open(filename, 'w', encoding=charset) as f:
3206 f.write('aaa')
3207 pos = f.tell()
3208 with self.open(filename, 'r+', encoding=charset) as f:
3209 f.seek(pos)
3210 f.write('zzz')
3211 f.seek(0)
3212 f.write('bbb')
3213 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003214 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003215
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003216 def test_seek_append_bom(self):
3217 # Same test, but first seek to the start and then to the end
3218 filename = support.TESTFN
3219 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3220 with self.open(filename, 'w', encoding=charset) as f:
3221 f.write('aaa')
3222 with self.open(filename, 'a', encoding=charset) as f:
3223 f.seek(0)
3224 f.seek(0, self.SEEK_END)
3225 f.write('xxx')
3226 with self.open(filename, 'rb') as f:
3227 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3228
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003229 def test_errors_property(self):
3230 with self.open(support.TESTFN, "w") as f:
3231 self.assertEqual(f.errors, "strict")
3232 with self.open(support.TESTFN, "w", errors="replace") as f:
3233 self.assertEqual(f.errors, "replace")
3234
Brett Cannon31f59292011-02-21 19:29:56 +00003235 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003236 def test_threads_write(self):
3237 # Issue6750: concurrent writes could duplicate data
3238 event = threading.Event()
3239 with self.open(support.TESTFN, "w", buffering=1) as f:
3240 def run(n):
3241 text = "Thread%03d\n" % n
3242 event.wait()
3243 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003244 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003245 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003246 with support.start_threads(threads, event.set):
3247 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003248 with self.open(support.TESTFN) as f:
3249 content = f.read()
3250 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003251 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003252
Antoine Pitrou6be88762010-05-03 16:48:20 +00003253 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003254 # Test that text file is closed despite failed flush
3255 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003256 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003257 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003258 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003259 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003260 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003261 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003262 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003263 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003264 self.assertTrue(txt.buffer.closed)
3265 self.assertTrue(closed) # flush() called
3266 self.assertFalse(closed[0]) # flush() called before file closed
3267 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003268 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003269
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003270 def test_close_error_on_close(self):
3271 buffer = self.BytesIO(self.testdata)
3272 def bad_flush():
3273 raise OSError('flush')
3274 def bad_close():
3275 raise OSError('close')
3276 buffer.close = bad_close
3277 txt = self.TextIOWrapper(buffer, encoding="ascii")
3278 txt.flush = bad_flush
3279 with self.assertRaises(OSError) as err: # exception not swallowed
3280 txt.close()
3281 self.assertEqual(err.exception.args, ('close',))
3282 self.assertIsInstance(err.exception.__context__, OSError)
3283 self.assertEqual(err.exception.__context__.args, ('flush',))
3284 self.assertFalse(txt.closed)
3285
Victor Stinner472f7942019-04-12 21:58:24 +02003286 # Silence destructor error
3287 buffer.close = lambda: None
3288 txt.flush = lambda: None
3289
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003290 def test_nonnormalized_close_error_on_close(self):
3291 # Issue #21677
3292 buffer = self.BytesIO(self.testdata)
3293 def bad_flush():
3294 raise non_existing_flush
3295 def bad_close():
3296 raise non_existing_close
3297 buffer.close = bad_close
3298 txt = self.TextIOWrapper(buffer, encoding="ascii")
3299 txt.flush = bad_flush
3300 with self.assertRaises(NameError) as err: # exception not swallowed
3301 txt.close()
3302 self.assertIn('non_existing_close', str(err.exception))
3303 self.assertIsInstance(err.exception.__context__, NameError)
3304 self.assertIn('non_existing_flush', str(err.exception.__context__))
3305 self.assertFalse(txt.closed)
3306
Victor Stinner472f7942019-04-12 21:58:24 +02003307 # Silence destructor error
3308 buffer.close = lambda: None
3309 txt.flush = lambda: None
3310
Antoine Pitrou6be88762010-05-03 16:48:20 +00003311 def test_multi_close(self):
3312 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3313 txt.close()
3314 txt.close()
3315 txt.close()
3316 self.assertRaises(ValueError, txt.flush)
3317
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003318 def test_unseekable(self):
3319 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3320 self.assertRaises(self.UnsupportedOperation, txt.tell)
3321 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3322
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003323 def test_readonly_attributes(self):
3324 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3325 buf = self.BytesIO(self.testdata)
3326 with self.assertRaises(AttributeError):
3327 txt.buffer = buf
3328
Antoine Pitroue96ec682011-07-23 21:46:35 +02003329 def test_rawio(self):
3330 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3331 # that subprocess.Popen() can have the required unbuffered
3332 # semantics with universal_newlines=True.
3333 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3334 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3335 # Reads
3336 self.assertEqual(txt.read(4), 'abcd')
3337 self.assertEqual(txt.readline(), 'efghi\n')
3338 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3339
3340 def test_rawio_write_through(self):
3341 # Issue #12591: with write_through=True, writes don't need a flush
3342 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3343 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3344 write_through=True)
3345 txt.write('1')
3346 txt.write('23\n4')
3347 txt.write('5')
3348 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3349
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003350 def test_bufio_write_through(self):
3351 # Issue #21396: write_through=True doesn't force a flush()
3352 # on the underlying binary buffered object.
3353 flush_called, write_called = [], []
3354 class BufferedWriter(self.BufferedWriter):
3355 def flush(self, *args, **kwargs):
3356 flush_called.append(True)
3357 return super().flush(*args, **kwargs)
3358 def write(self, *args, **kwargs):
3359 write_called.append(True)
3360 return super().write(*args, **kwargs)
3361
3362 rawio = self.BytesIO()
3363 data = b"a"
3364 bufio = BufferedWriter(rawio, len(data)*2)
3365 textio = self.TextIOWrapper(bufio, encoding='ascii',
3366 write_through=True)
3367 # write to the buffered io but don't overflow the buffer
3368 text = data.decode('ascii')
3369 textio.write(text)
3370
3371 # buffer.flush is not called with write_through=True
3372 self.assertFalse(flush_called)
3373 # buffer.write *is* called with write_through=True
3374 self.assertTrue(write_called)
3375 self.assertEqual(rawio.getvalue(), b"") # no flush
3376
3377 write_called = [] # reset
3378 textio.write(text * 10) # total content is larger than bufio buffer
3379 self.assertTrue(write_called)
3380 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3381
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003382 def test_reconfigure_write_through(self):
3383 raw = self.MockRawIO([])
3384 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3385 t.write('1')
3386 t.reconfigure(write_through=True) # implied flush
3387 self.assertEqual(t.write_through, True)
3388 self.assertEqual(b''.join(raw._write_stack), b'1')
3389 t.write('23')
3390 self.assertEqual(b''.join(raw._write_stack), b'123')
3391 t.reconfigure(write_through=False)
3392 self.assertEqual(t.write_through, False)
3393 t.write('45')
3394 t.flush()
3395 self.assertEqual(b''.join(raw._write_stack), b'12345')
3396 # Keeping default value
3397 t.reconfigure()
3398 t.reconfigure(write_through=None)
3399 self.assertEqual(t.write_through, False)
3400 t.reconfigure(write_through=True)
3401 t.reconfigure()
3402 t.reconfigure(write_through=None)
3403 self.assertEqual(t.write_through, True)
3404
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003405 def test_read_nonbytes(self):
3406 # Issue #17106
3407 # Crash when underlying read() returns non-bytes
3408 t = self.TextIOWrapper(self.StringIO('a'))
3409 self.assertRaises(TypeError, t.read, 1)
3410 t = self.TextIOWrapper(self.StringIO('a'))
3411 self.assertRaises(TypeError, t.readline)
3412 t = self.TextIOWrapper(self.StringIO('a'))
3413 self.assertRaises(TypeError, t.read)
3414
Oren Milmana5b4ea12017-08-25 21:14:54 +03003415 def test_illegal_encoder(self):
3416 # Issue 31271: Calling write() while the return value of encoder's
3417 # encode() is invalid shouldn't cause an assertion failure.
3418 rot13 = codecs.lookup("rot13")
3419 with support.swap_attr(rot13, '_is_text_encoding', True):
3420 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3421 self.assertRaises(TypeError, t.write, 'bar')
3422
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003423 def test_illegal_decoder(self):
3424 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003425 # Bypass the early encoding check added in issue 20404
3426 def _make_illegal_wrapper():
3427 quopri = codecs.lookup("quopri")
3428 quopri._is_text_encoding = True
3429 try:
3430 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3431 newline='\n', encoding="quopri")
3432 finally:
3433 quopri._is_text_encoding = False
3434 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003435 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003436 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003437 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003438 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003439 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003440 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003441 self.assertRaises(TypeError, t.read)
3442
Oren Milmanba7d7362017-08-29 11:58:27 +03003443 # Issue 31243: calling read() while the return value of decoder's
3444 # getstate() is invalid should neither crash the interpreter nor
3445 # raise a SystemError.
3446 def _make_very_illegal_wrapper(getstate_ret_val):
3447 class BadDecoder:
3448 def getstate(self):
3449 return getstate_ret_val
3450 def _get_bad_decoder(dummy):
3451 return BadDecoder()
3452 quopri = codecs.lookup("quopri")
3453 with support.swap_attr(quopri, 'incrementaldecoder',
3454 _get_bad_decoder):
3455 return _make_illegal_wrapper()
3456 t = _make_very_illegal_wrapper(42)
3457 self.assertRaises(TypeError, t.read, 42)
3458 t = _make_very_illegal_wrapper(())
3459 self.assertRaises(TypeError, t.read, 42)
3460 t = _make_very_illegal_wrapper((1, 2))
3461 self.assertRaises(TypeError, t.read, 42)
3462
Antoine Pitrou712cb732013-12-21 15:51:54 +01003463 def _check_create_at_shutdown(self, **kwargs):
3464 # Issue #20037: creating a TextIOWrapper at shutdown
3465 # shouldn't crash the interpreter.
3466 iomod = self.io.__name__
3467 code = """if 1:
3468 import codecs
3469 import {iomod} as io
3470
3471 # Avoid looking up codecs at shutdown
3472 codecs.lookup('utf-8')
3473
3474 class C:
3475 def __init__(self):
3476 self.buf = io.BytesIO()
3477 def __del__(self):
3478 io.TextIOWrapper(self.buf, **{kwargs})
3479 print("ok")
3480 c = C()
3481 """.format(iomod=iomod, kwargs=kwargs)
3482 return assert_python_ok("-c", code)
3483
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003484 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003485 def test_create_at_shutdown_without_encoding(self):
3486 rc, out, err = self._check_create_at_shutdown()
3487 if err:
3488 # Can error out with a RuntimeError if the module state
3489 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003490 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003491 else:
3492 self.assertEqual("ok", out.decode().strip())
3493
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003494 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003495 def test_create_at_shutdown_with_encoding(self):
3496 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3497 errors='strict')
3498 self.assertFalse(err)
3499 self.assertEqual("ok", out.decode().strip())
3500
Antoine Pitroub8503892014-04-29 10:14:02 +02003501 def test_read_byteslike(self):
3502 r = MemviewBytesIO(b'Just some random string\n')
3503 t = self.TextIOWrapper(r, 'utf-8')
3504
3505 # TextIOwrapper will not read the full string, because
3506 # we truncate it to a multiple of the native int size
3507 # so that we can construct a more complex memoryview.
3508 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3509
3510 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3511
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003512 def test_issue22849(self):
3513 class F(object):
3514 def readable(self): return True
3515 def writable(self): return True
3516 def seekable(self): return True
3517
3518 for i in range(10):
3519 try:
3520 self.TextIOWrapper(F(), encoding='utf-8')
3521 except Exception:
3522 pass
3523
3524 F.tell = lambda x: 0
3525 t = self.TextIOWrapper(F(), encoding='utf-8')
3526
INADA Naoki507434f2017-12-21 09:59:53 +09003527 def test_reconfigure_encoding_read(self):
3528 # latin1 -> utf8
3529 # (latin1 can decode utf-8 encoded string)
3530 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3531 raw = self.BytesIO(data)
3532 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3533 self.assertEqual(txt.readline(), 'abc\xe9\n')
3534 with self.assertRaises(self.UnsupportedOperation):
3535 txt.reconfigure(encoding='utf-8')
3536 with self.assertRaises(self.UnsupportedOperation):
3537 txt.reconfigure(newline=None)
3538
3539 def test_reconfigure_write_fromascii(self):
3540 # ascii has a specific encodefunc in the C implementation,
3541 # but utf-8-sig has not. Make sure that we get rid of the
3542 # cached encodefunc when we switch encoders.
3543 raw = self.BytesIO()
3544 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3545 txt.write('foo\n')
3546 txt.reconfigure(encoding='utf-8-sig')
3547 txt.write('\xe9\n')
3548 txt.flush()
3549 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3550
3551 def test_reconfigure_write(self):
3552 # latin -> utf8
3553 raw = self.BytesIO()
3554 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3555 txt.write('abc\xe9\n')
3556 txt.reconfigure(encoding='utf-8')
3557 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3558 txt.write('d\xe9f\n')
3559 txt.flush()
3560 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3561
3562 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3563 # the file
3564 raw = self.BytesIO()
3565 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3566 txt.write('abc\n')
3567 txt.reconfigure(encoding='utf-8-sig')
3568 txt.write('d\xe9f\n')
3569 txt.flush()
3570 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3571
3572 def test_reconfigure_write_non_seekable(self):
3573 raw = self.BytesIO()
3574 raw.seekable = lambda: False
3575 raw.seek = None
3576 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3577 txt.write('abc\n')
3578 txt.reconfigure(encoding='utf-8-sig')
3579 txt.write('d\xe9f\n')
3580 txt.flush()
3581
3582 # If the raw stream is not seekable, there'll be a BOM
3583 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3584
3585 def test_reconfigure_defaults(self):
3586 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3587 txt.reconfigure(encoding=None)
3588 self.assertEqual(txt.encoding, 'ascii')
3589 self.assertEqual(txt.errors, 'replace')
3590 txt.write('LF\n')
3591
3592 txt.reconfigure(newline='\r\n')
3593 self.assertEqual(txt.encoding, 'ascii')
3594 self.assertEqual(txt.errors, 'replace')
3595
3596 txt.reconfigure(errors='ignore')
3597 self.assertEqual(txt.encoding, 'ascii')
3598 self.assertEqual(txt.errors, 'ignore')
3599 txt.write('CRLF\n')
3600
3601 txt.reconfigure(encoding='utf-8', newline=None)
3602 self.assertEqual(txt.errors, 'strict')
3603 txt.seek(0)
3604 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3605
3606 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3607
3608 def test_reconfigure_newline(self):
3609 raw = self.BytesIO(b'CR\rEOF')
3610 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3611 txt.reconfigure(newline=None)
3612 self.assertEqual(txt.readline(), 'CR\n')
3613 raw = self.BytesIO(b'CR\rEOF')
3614 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3615 txt.reconfigure(newline='')
3616 self.assertEqual(txt.readline(), 'CR\r')
3617 raw = self.BytesIO(b'CR\rLF\nEOF')
3618 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3619 txt.reconfigure(newline='\n')
3620 self.assertEqual(txt.readline(), 'CR\rLF\n')
3621 raw = self.BytesIO(b'LF\nCR\rEOF')
3622 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3623 txt.reconfigure(newline='\r')
3624 self.assertEqual(txt.readline(), 'LF\nCR\r')
3625 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3626 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3627 txt.reconfigure(newline='\r\n')
3628 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3629
3630 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3631 txt.reconfigure(newline=None)
3632 txt.write('linesep\n')
3633 txt.reconfigure(newline='')
3634 txt.write('LF\n')
3635 txt.reconfigure(newline='\n')
3636 txt.write('LF\n')
3637 txt.reconfigure(newline='\r')
3638 txt.write('CR\n')
3639 txt.reconfigure(newline='\r\n')
3640 txt.write('CRLF\n')
3641 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3642 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3643
Zackery Spytz23db9352018-06-29 04:14:58 -06003644 def test_issue25862(self):
3645 # Assertion failures occurred in tell() after read() and write().
3646 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3647 t.read(1)
3648 t.read()
3649 t.tell()
3650 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3651 t.read(1)
3652 t.write('x')
3653 t.tell()
3654
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003655
Antoine Pitroub8503892014-04-29 10:14:02 +02003656class MemviewBytesIO(io.BytesIO):
3657 '''A BytesIO object whose read method returns memoryviews
3658 rather than bytes'''
3659
3660 def read1(self, len_):
3661 return _to_memoryview(super().read1(len_))
3662
3663 def read(self, len_):
3664 return _to_memoryview(super().read(len_))
3665
3666def _to_memoryview(buf):
3667 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3668
3669 arr = array.array('i')
3670 idx = len(buf) - len(buf) % arr.itemsize
3671 arr.frombytes(buf[:idx])
3672 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003673
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003674
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003675class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003676 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003677 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003678
3679 def test_initialization(self):
3680 r = self.BytesIO(b"\xc3\xa9\n\n")
3681 b = self.BufferedReader(r, 1000)
3682 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003683 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3684 self.assertRaises(ValueError, t.read)
3685
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003686 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3687 self.assertRaises(Exception, repr, t)
3688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003689 def test_garbage_collection(self):
3690 # C TextIOWrapper objects are collected, and collecting them flushes
3691 # all data to disk.
3692 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003693 with support.check_warnings(('', ResourceWarning)):
3694 rawio = io.FileIO(support.TESTFN, "wb")
3695 b = self.BufferedWriter(rawio)
3696 t = self.TextIOWrapper(b, encoding="ascii")
3697 t.write("456def")
3698 t.x = t
3699 wr = weakref.ref(t)
3700 del t
3701 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003702 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003703 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003704 self.assertEqual(f.read(), b"456def")
3705
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003706 def test_rwpair_cleared_before_textio(self):
3707 # Issue 13070: TextIOWrapper's finalization would crash when called
3708 # after the reference to the underlying BufferedRWPair's writer got
3709 # cleared by the GC.
3710 for i in range(1000):
3711 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3712 t1 = self.TextIOWrapper(b1, encoding="ascii")
3713 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3714 t2 = self.TextIOWrapper(b2, encoding="ascii")
3715 # circular references
3716 t1.buddy = t2
3717 t2.buddy = t1
3718 support.gc_collect()
3719
Zackery Spytz842acaa2018-12-17 07:52:45 -07003720 def test_del__CHUNK_SIZE_SystemError(self):
3721 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3722 with self.assertRaises(AttributeError):
3723 del t._CHUNK_SIZE
3724
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003725
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003726class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003727 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003728 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003729
3730
3731class IncrementalNewlineDecoderTest(unittest.TestCase):
3732
3733 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003734 # UTF-8 specific tests for a newline decoder
3735 def _check_decode(b, s, **kwargs):
3736 # We exercise getstate() / setstate() as well as decode()
3737 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003738 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003739 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003740 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003741
Antoine Pitrou180a3362008-12-14 16:36:46 +00003742 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003743
Antoine Pitrou180a3362008-12-14 16:36:46 +00003744 _check_decode(b'\xe8', "")
3745 _check_decode(b'\xa2', "")
3746 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003747
Antoine Pitrou180a3362008-12-14 16:36:46 +00003748 _check_decode(b'\xe8', "")
3749 _check_decode(b'\xa2', "")
3750 _check_decode(b'\x88', "\u8888")
3751
3752 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003753 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3754
Antoine Pitrou180a3362008-12-14 16:36:46 +00003755 decoder.reset()
3756 _check_decode(b'\n', "\n")
3757 _check_decode(b'\r', "")
3758 _check_decode(b'', "\n", final=True)
3759 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003760
Antoine Pitrou180a3362008-12-14 16:36:46 +00003761 _check_decode(b'\r', "")
3762 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003763
Antoine Pitrou180a3362008-12-14 16:36:46 +00003764 _check_decode(b'\r\r\n', "\n\n")
3765 _check_decode(b'\r', "")
3766 _check_decode(b'\r', "\n")
3767 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003768
Antoine Pitrou180a3362008-12-14 16:36:46 +00003769 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3770 _check_decode(b'\xe8\xa2\x88', "\u8888")
3771 _check_decode(b'\n', "\n")
3772 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3773 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003774
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003775 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003776 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003777 if encoding is not None:
3778 encoder = codecs.getincrementalencoder(encoding)()
3779 def _decode_bytewise(s):
3780 # Decode one byte at a time
3781 for b in encoder.encode(s):
3782 result.append(decoder.decode(bytes([b])))
3783 else:
3784 encoder = None
3785 def _decode_bytewise(s):
3786 # Decode one char at a time
3787 for c in s:
3788 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003789 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003790 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003791 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003792 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003793 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003794 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003795 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003796 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003797 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003798 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003799 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003800 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003801 input = "abc"
3802 if encoder is not None:
3803 encoder.reset()
3804 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003805 self.assertEqual(decoder.decode(input), "abc")
3806 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003807
3808 def test_newline_decoder(self):
3809 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003810 # None meaning the IncrementalNewlineDecoder takes unicode input
3811 # rather than bytes input
3812 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003813 'utf-16', 'utf-16-le', 'utf-16-be',
3814 'utf-32', 'utf-32-le', 'utf-32-be',
3815 )
3816 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003817 decoder = enc and codecs.getincrementaldecoder(enc)()
3818 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3819 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003820 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003821 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3822 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003823 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003824
Antoine Pitrou66913e22009-03-06 23:40:56 +00003825 def test_newline_bytes(self):
3826 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3827 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003828 self.assertEqual(dec.newlines, None)
3829 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3830 self.assertEqual(dec.newlines, None)
3831 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3832 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003833 dec = self.IncrementalNewlineDecoder(None, translate=False)
3834 _check(dec)
3835 dec = self.IncrementalNewlineDecoder(None, translate=True)
3836 _check(dec)
3837
Xiang Zhangb08746b2018-10-31 19:49:16 +08003838 def test_translate(self):
3839 # issue 35062
3840 for translate in (-2, -1, 1, 2):
3841 decoder = codecs.getincrementaldecoder("utf-8")()
3842 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3843 self.check_newline_decoding_utf8(decoder)
3844 decoder = codecs.getincrementaldecoder("utf-8")()
3845 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3846 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3847
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003848class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3849 pass
3850
3851class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3852 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003853
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003854
Guido van Rossum01a27522007-03-07 01:00:12 +00003855# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003856
Guido van Rossum5abbf752007-08-27 17:39:33 +00003857class MiscIOTest(unittest.TestCase):
3858
Barry Warsaw40e82462008-11-20 20:14:50 +00003859 def tearDown(self):
3860 support.unlink(support.TESTFN)
3861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003862 def test___all__(self):
3863 for name in self.io.__all__:
3864 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003865 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003866 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003867 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003868 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003869 self.assertTrue(issubclass(obj, Exception), name)
3870 elif not name.startswith("SEEK_"):
3871 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003872
Barry Warsaw40e82462008-11-20 20:14:50 +00003873 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003874 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003875 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003876 f.close()
3877
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003878 with support.check_warnings(('', DeprecationWarning)):
3879 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003880 self.assertEqual(f.name, support.TESTFN)
3881 self.assertEqual(f.buffer.name, support.TESTFN)
3882 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3883 self.assertEqual(f.mode, "U")
3884 self.assertEqual(f.buffer.mode, "rb")
3885 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003886 f.close()
3887
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003888 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003889 self.assertEqual(f.mode, "w+")
3890 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3891 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003892
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003893 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003894 self.assertEqual(g.mode, "wb")
3895 self.assertEqual(g.raw.mode, "wb")
3896 self.assertEqual(g.name, f.fileno())
3897 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003898 f.close()
3899 g.close()
3900
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003901 def test_io_after_close(self):
3902 for kwargs in [
3903 {"mode": "w"},
3904 {"mode": "wb"},
3905 {"mode": "w", "buffering": 1},
3906 {"mode": "w", "buffering": 2},
3907 {"mode": "wb", "buffering": 0},
3908 {"mode": "r"},
3909 {"mode": "rb"},
3910 {"mode": "r", "buffering": 1},
3911 {"mode": "r", "buffering": 2},
3912 {"mode": "rb", "buffering": 0},
3913 {"mode": "w+"},
3914 {"mode": "w+b"},
3915 {"mode": "w+", "buffering": 1},
3916 {"mode": "w+", "buffering": 2},
3917 {"mode": "w+b", "buffering": 0},
3918 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003919 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003920 f.close()
3921 self.assertRaises(ValueError, f.flush)
3922 self.assertRaises(ValueError, f.fileno)
3923 self.assertRaises(ValueError, f.isatty)
3924 self.assertRaises(ValueError, f.__iter__)
3925 if hasattr(f, "peek"):
3926 self.assertRaises(ValueError, f.peek, 1)
3927 self.assertRaises(ValueError, f.read)
3928 if hasattr(f, "read1"):
3929 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003930 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003931 if hasattr(f, "readall"):
3932 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003933 if hasattr(f, "readinto"):
3934 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003935 if hasattr(f, "readinto1"):
3936 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003937 self.assertRaises(ValueError, f.readline)
3938 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003939 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003940 self.assertRaises(ValueError, f.seek, 0)
3941 self.assertRaises(ValueError, f.tell)
3942 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003943 self.assertRaises(ValueError, f.write,
3944 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003945 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003946 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003947
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003948 def test_blockingioerror(self):
3949 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003950 class C(str):
3951 pass
3952 c = C("")
3953 b = self.BlockingIOError(1, c)
3954 c.b = b
3955 b.c = c
3956 wr = weakref.ref(c)
3957 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003958 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003959 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003960
3961 def test_abcs(self):
3962 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003963 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3964 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3965 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3966 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003967
3968 def _check_abc_inheritance(self, abcmodule):
3969 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003970 self.assertIsInstance(f, abcmodule.IOBase)
3971 self.assertIsInstance(f, abcmodule.RawIOBase)
3972 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3973 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003974 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003975 self.assertIsInstance(f, abcmodule.IOBase)
3976 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3977 self.assertIsInstance(f, abcmodule.BufferedIOBase)
3978 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003979 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003980 self.assertIsInstance(f, abcmodule.IOBase)
3981 self.assertNotIsInstance(f, abcmodule.RawIOBase)
3982 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3983 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003984
3985 def test_abc_inheritance(self):
3986 # Test implementations inherit from their respective ABCs
3987 self._check_abc_inheritance(self)
3988
3989 def test_abc_inheritance_official(self):
3990 # Test implementations inherit from the official ABCs of the
3991 # baseline "io" module.
3992 self._check_abc_inheritance(io)
3993
Antoine Pitroue033e062010-10-29 10:38:18 +00003994 def _check_warn_on_dealloc(self, *args, **kwargs):
3995 f = open(*args, **kwargs)
3996 r = repr(f)
3997 with self.assertWarns(ResourceWarning) as cm:
3998 f = None
3999 support.gc_collect()
4000 self.assertIn(r, str(cm.warning.args[0]))
4001
4002 def test_warn_on_dealloc(self):
4003 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4004 self._check_warn_on_dealloc(support.TESTFN, "wb")
4005 self._check_warn_on_dealloc(support.TESTFN, "w")
4006
4007 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4008 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004009 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004010 for fd in fds:
4011 try:
4012 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004013 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004014 if e.errno != errno.EBADF:
4015 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004016 self.addCleanup(cleanup_fds)
4017 r, w = os.pipe()
4018 fds += r, w
4019 self._check_warn_on_dealloc(r, *args, **kwargs)
4020 # When using closefd=False, there's no warning
4021 r, w = os.pipe()
4022 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004023 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004024 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004025
4026 def test_warn_on_dealloc_fd(self):
4027 self._check_warn_on_dealloc_fd("rb", buffering=0)
4028 self._check_warn_on_dealloc_fd("rb")
4029 self._check_warn_on_dealloc_fd("r")
4030
4031
Antoine Pitrou243757e2010-11-05 21:15:39 +00004032 def test_pickling(self):
4033 # Pickling file objects is forbidden
4034 for kwargs in [
4035 {"mode": "w"},
4036 {"mode": "wb"},
4037 {"mode": "wb", "buffering": 0},
4038 {"mode": "r"},
4039 {"mode": "rb"},
4040 {"mode": "rb", "buffering": 0},
4041 {"mode": "w+"},
4042 {"mode": "w+b"},
4043 {"mode": "w+b", "buffering": 0},
4044 ]:
4045 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4046 with self.open(support.TESTFN, **kwargs) as f:
4047 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4048
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004049 def test_nonblock_pipe_write_bigbuf(self):
4050 self._test_nonblock_pipe_write(16*1024)
4051
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004052 def test_nonblock_pipe_write_smallbuf(self):
4053 self._test_nonblock_pipe_write(1024)
4054
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004055 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4056 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004057 def _test_nonblock_pipe_write(self, bufsize):
4058 sent = []
4059 received = []
4060 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004061 os.set_blocking(r, False)
4062 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004063
4064 # To exercise all code paths in the C implementation we need
4065 # to play with buffer sizes. For instance, if we choose a
4066 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4067 # then we will never get a partial write of the buffer.
4068 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4069 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4070
4071 with rf, wf:
4072 for N in 9999, 73, 7574:
4073 try:
4074 i = 0
4075 while True:
4076 msg = bytes([i % 26 + 97]) * N
4077 sent.append(msg)
4078 wf.write(msg)
4079 i += 1
4080
4081 except self.BlockingIOError as e:
4082 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004083 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004084 sent[-1] = sent[-1][:e.characters_written]
4085 received.append(rf.read())
4086 msg = b'BLOCKED'
4087 wf.write(msg)
4088 sent.append(msg)
4089
4090 while True:
4091 try:
4092 wf.flush()
4093 break
4094 except self.BlockingIOError as e:
4095 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004096 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004097 self.assertEqual(e.characters_written, 0)
4098 received.append(rf.read())
4099
4100 received += iter(rf.read, None)
4101
4102 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004103 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004104 self.assertTrue(wf.closed)
4105 self.assertTrue(rf.closed)
4106
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004107 def test_create_fail(self):
4108 # 'x' mode fails if file is existing
4109 with self.open(support.TESTFN, 'w'):
4110 pass
4111 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4112
4113 def test_create_writes(self):
4114 # 'x' mode opens for writing
4115 with self.open(support.TESTFN, 'xb') as f:
4116 f.write(b"spam")
4117 with self.open(support.TESTFN, 'rb') as f:
4118 self.assertEqual(b"spam", f.read())
4119
Christian Heimes7b648752012-09-10 14:48:43 +02004120 def test_open_allargs(self):
4121 # there used to be a buffer overflow in the parser for rawmode
4122 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4123
4124
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004125class CMiscIOTest(MiscIOTest):
4126 io = io
4127
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004128 def test_readinto_buffer_overflow(self):
4129 # Issue #18025
4130 class BadReader(self.io.BufferedIOBase):
4131 def read(self, n=-1):
4132 return b'x' * 10**6
4133 bufio = BadReader()
4134 b = bytearray(2)
4135 self.assertRaises(ValueError, bufio.readinto, b)
4136
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004137 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4138 # Issue #23309: deadlocks at shutdown should be avoided when a
4139 # daemon thread and the main thread both write to a file.
4140 code = """if 1:
4141 import sys
4142 import time
4143 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004144 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004145
4146 file = sys.{stream_name}
4147
4148 def run():
4149 while True:
4150 file.write('.')
4151 file.flush()
4152
Victor Stinner2a1aed02017-04-21 17:59:23 +02004153 crash = SuppressCrashReport()
4154 crash.__enter__()
4155 # don't call __exit__(): the crash occurs at Python shutdown
4156
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004157 thread = threading.Thread(target=run)
4158 thread.daemon = True
4159 thread.start()
4160
4161 time.sleep(0.5)
4162 file.write('!')
4163 file.flush()
4164 """.format_map(locals())
4165 res, _ = run_python_until_end("-c", code)
4166 err = res.err.decode()
4167 if res.rc != 0:
4168 # Failure: should be a fatal error
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004169 pattern = (r"Fatal Python error: could not acquire lock "
4170 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4171 r"at interpreter shutdown, possibly due to "
4172 r"daemon threads".format_map(locals()))
4173 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004174 else:
4175 self.assertFalse(err.strip('.!'))
4176
4177 def test_daemon_threads_shutdown_stdout_deadlock(self):
4178 self.check_daemon_threads_shutdown_deadlock('stdout')
4179
4180 def test_daemon_threads_shutdown_stderr_deadlock(self):
4181 self.check_daemon_threads_shutdown_deadlock('stderr')
4182
4183
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004184class PyMiscIOTest(MiscIOTest):
4185 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004186
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004187
4188@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4189class SignalsTest(unittest.TestCase):
4190
4191 def setUp(self):
4192 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4193
4194 def tearDown(self):
4195 signal.signal(signal.SIGALRM, self.oldalrm)
4196
4197 def alarm_interrupt(self, sig, frame):
4198 1/0
4199
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004200 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4201 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004202 invokes the signal handler, and bubbles up the exception raised
4203 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004204 read_results = []
4205 def _read():
4206 s = os.read(r, 1)
4207 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004208
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004209 t = threading.Thread(target=_read)
4210 t.daemon = True
4211 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004212 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004213 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004214 try:
4215 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004216 if hasattr(signal, 'pthread_sigmask'):
4217 # create the thread with SIGALRM signal blocked
4218 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4219 t.start()
4220 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4221 else:
4222 t.start()
4223
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004224 # Fill the pipe enough that the write will be blocking.
4225 # It will be interrupted by the timer armed above. Since the
4226 # other thread has read one byte, the low-level write will
4227 # return with a successful (partial) result rather than an EINTR.
4228 # The buffered IO layer must check for pending signal
4229 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004230 signal.alarm(1)
4231 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004232 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004233 finally:
4234 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004235 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004236 # We got one byte, get another one and check that it isn't a
4237 # repeat of the first one.
4238 read_results.append(os.read(r, 1))
4239 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4240 finally:
4241 os.close(w)
4242 os.close(r)
4243 # This is deliberate. If we didn't close the file descriptor
4244 # before closing wio, wio would try to flush its internal
4245 # buffer, and block again.
4246 try:
4247 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004248 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004249 if e.errno != errno.EBADF:
4250 raise
4251
4252 def test_interrupted_write_unbuffered(self):
4253 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4254
4255 def test_interrupted_write_buffered(self):
4256 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4257
4258 def test_interrupted_write_text(self):
4259 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4260
Brett Cannon31f59292011-02-21 19:29:56 +00004261 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004262 def check_reentrant_write(self, data, **fdopen_kwargs):
4263 def on_alarm(*args):
4264 # Will be called reentrantly from the same thread
4265 wio.write(data)
4266 1/0
4267 signal.signal(signal.SIGALRM, on_alarm)
4268 r, w = os.pipe()
4269 wio = self.io.open(w, **fdopen_kwargs)
4270 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004271 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004272 # Either the reentrant call to wio.write() fails with RuntimeError,
4273 # or the signal handler raises ZeroDivisionError.
4274 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4275 while 1:
4276 for i in range(100):
4277 wio.write(data)
4278 wio.flush()
4279 # Make sure the buffer doesn't fill up and block further writes
4280 os.read(r, len(data) * 100)
4281 exc = cm.exception
4282 if isinstance(exc, RuntimeError):
4283 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4284 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004285 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004286 wio.close()
4287 os.close(r)
4288
4289 def test_reentrant_write_buffered(self):
4290 self.check_reentrant_write(b"xy", mode="wb")
4291
4292 def test_reentrant_write_text(self):
4293 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4294
Antoine Pitrou707ce822011-02-25 21:24:11 +00004295 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4296 """Check that a buffered read, when it gets interrupted (either
4297 returning a partial result or EINTR), properly invokes the signal
4298 handler and retries if the latter returned successfully."""
4299 r, w = os.pipe()
4300 fdopen_kwargs["closefd"] = False
4301 def alarm_handler(sig, frame):
4302 os.write(w, b"bar")
4303 signal.signal(signal.SIGALRM, alarm_handler)
4304 try:
4305 rio = self.io.open(r, **fdopen_kwargs)
4306 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004307 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004308 # Expected behaviour:
4309 # - first raw read() returns partial b"foo"
4310 # - second raw read() returns EINTR
4311 # - third raw read() returns b"bar"
4312 self.assertEqual(decode(rio.read(6)), "foobar")
4313 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004314 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004315 rio.close()
4316 os.close(w)
4317 os.close(r)
4318
Antoine Pitrou20db5112011-08-19 20:32:34 +02004319 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004320 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4321 mode="rb")
4322
Antoine Pitrou20db5112011-08-19 20:32:34 +02004323 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004324 self.check_interrupted_read_retry(lambda x: x,
4325 mode="r")
4326
Antoine Pitrou707ce822011-02-25 21:24:11 +00004327 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4328 """Check that a buffered write, when it gets interrupted (either
4329 returning a partial result or EINTR), properly invokes the signal
4330 handler and retries if the latter returned successfully."""
4331 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004332
Antoine Pitrou707ce822011-02-25 21:24:11 +00004333 # A quantity that exceeds the buffer size of an anonymous pipe's
4334 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004335 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004336 r, w = os.pipe()
4337 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004338
Antoine Pitrou707ce822011-02-25 21:24:11 +00004339 # We need a separate thread to read from the pipe and allow the
4340 # write() to finish. This thread is started after the SIGALRM is
4341 # received (forcing a first EINTR in write()).
4342 read_results = []
4343 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004344 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004345 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004346 try:
4347 while not write_finished:
4348 while r in select.select([r], [], [], 1.0)[0]:
4349 s = os.read(r, 1024)
4350 read_results.append(s)
4351 except BaseException as exc:
4352 nonlocal error
4353 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004354 t = threading.Thread(target=_read)
4355 t.daemon = True
4356 def alarm1(sig, frame):
4357 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004358 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004359 def alarm2(sig, frame):
4360 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004361
4362 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004363 signal.signal(signal.SIGALRM, alarm1)
4364 try:
4365 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004366 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004367 # Expected behaviour:
4368 # - first raw write() is partial (because of the limited pipe buffer
4369 # and the first alarm)
4370 # - second raw write() returns EINTR (because of the second alarm)
4371 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004372 written = wio.write(large_data)
4373 self.assertEqual(N, written)
4374
Antoine Pitrou707ce822011-02-25 21:24:11 +00004375 wio.flush()
4376 write_finished = True
4377 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004378
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004379 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004380 self.assertEqual(N, sum(len(x) for x in read_results))
4381 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004382 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004383 write_finished = True
4384 os.close(w)
4385 os.close(r)
4386 # This is deliberate. If we didn't close the file descriptor
4387 # before closing wio, wio would try to flush its internal
4388 # buffer, and could block (in case of failure).
4389 try:
4390 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004391 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004392 if e.errno != errno.EBADF:
4393 raise
4394
Antoine Pitrou20db5112011-08-19 20:32:34 +02004395 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004396 self.check_interrupted_write_retry(b"x", mode="wb")
4397
Antoine Pitrou20db5112011-08-19 20:32:34 +02004398 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004399 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4400
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004401
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004402class CSignalsTest(SignalsTest):
4403 io = io
4404
4405class PySignalsTest(SignalsTest):
4406 io = pyio
4407
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004408 # Handling reentrancy issues would slow down _pyio even more, so the
4409 # tests are disabled.
4410 test_reentrant_write_buffered = None
4411 test_reentrant_write_text = None
4412
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004413
Ezio Melottidaa42c72013-03-23 16:30:16 +02004414def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004415 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004416 CBufferedReaderTest, PyBufferedReaderTest,
4417 CBufferedWriterTest, PyBufferedWriterTest,
4418 CBufferedRWPairTest, PyBufferedRWPairTest,
4419 CBufferedRandomTest, PyBufferedRandomTest,
4420 StatefulIncrementalDecoderTest,
4421 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4422 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004423 CMiscIOTest, PyMiscIOTest,
4424 CSignalsTest, PySignalsTest,
4425 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004426
4427 # Put the namespaces of the IO module we are testing and some useful mock
4428 # classes in the __dict__ of each test.
4429 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004430 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4431 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004432 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4433 c_io_ns = {name : getattr(io, name) for name in all_members}
4434 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4435 globs = globals()
4436 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4437 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4438 # Avoid turning open into a bound method.
4439 py_io_ns["open"] = pyio.OpenWrapper
4440 for test in tests:
4441 if test.__name__.startswith("C"):
4442 for name, obj in c_io_ns.items():
4443 setattr(test, name, obj)
4444 elif test.__name__.startswith("Py"):
4445 for name, obj in py_io_ns.items():
4446 setattr(test, name, obj)
4447
Ezio Melottidaa42c72013-03-23 16:30:16 +02004448 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4449 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004450
4451if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004452 unittest.main()