blob: bea4342e7f0f9eb7be957e0d6eba503631981e1c [file] [log] [blame]
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001"""Unit tests for the io module."""
2
3# Tests of io are scattered over the test suite:
4# * test_bufio - tests file buffering
5# * test_memoryio - tests BytesIO and StringIO
6# * test_fileio - tests FileIO
7# * test_file - tests the file interface
8# * test_io - tests everything else in the io module
9# * test_univnewlines - tests universal newline support
10# * test_largefile - tests operations on a file greater than 2**32 bytes
11# (only enabled with -ulargefile)
12
13################################################################################
14# ATTENTION TEST WRITERS!!!
15################################################################################
16# When writing tests for io, it's important to test both the C and Python
17# implementations. This is usually done by writing a base test that refers to
Serhiy Storchakad65c9492015-11-02 14:10:23 +020018# the type it is testing as an attribute. Then it provides custom subclasses to
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000019# test both implementations. This file has lots of examples.
20################################################################################
Guido van Rossum68bbcd22007-02-27 17:19:33 +000021
Victor Stinnerf86a5e82012-06-05 13:43:22 +020022import abc
23import array
24import errno
25import locale
Guido van Rossum8358db22007-08-18 21:39:55 +000026import os
Victor Stinnerf86a5e82012-06-05 13:43:22 +020027import pickle
28import random
29import signal
Guido van Rossum34d69e52007-04-10 20:08:41 +000030import sys
Gregory P. Smithe5796c42018-12-30 20:17:57 -080031import sysconfig
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020032import threading
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +000033import time
Guido van Rossum28524c72007-02-27 05:47:44 +000034import unittest
Antoine Pitroue033e062010-10-29 10:38:18 +000035import warnings
Victor Stinnerf86a5e82012-06-05 13:43:22 +020036import weakref
Antoine Pitrou131a4892012-10-16 22:57:11 +020037from collections import deque, UserList
Victor Stinnerf86a5e82012-06-05 13:43:22 +020038from itertools import cycle, count
Benjamin Petersonee8712c2008-05-20 21:35:26 +000039from test import support
Berker Peksagce643912015-05-06 06:33:17 +030040from test.support.script_helper import assert_python_ok, run_python_until_end
Serhiy Storchakab21d1552018-03-02 11:53:51 +020041from test.support import FakePath
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000042
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +000043import codecs
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000044import io # C implementation of io
45import _pyio as pyio # Python implementation of io
Guido van Rossuma9e20242007-03-08 00:43:48 +000046
Martin Panter6bb91f32016-05-28 00:41:57 +000047try:
48 import ctypes
49except ImportError:
50 def byteslike(*pos, **kw):
51 return array.array("b", bytes(*pos, **kw))
52else:
53 def byteslike(*pos, **kw):
54 """Create a bytes-like object having no string or sequence methods"""
55 data = bytes(*pos, **kw)
56 obj = EmptyStruct()
57 ctypes.resize(obj, len(data))
58 memoryview(obj).cast("B")[:] = data
59 return obj
60 class EmptyStruct(ctypes.Structure):
61 pass
62
Gregory P. Smithe5796c42018-12-30 20:17:57 -080063_cflags = sysconfig.get_config_var('CFLAGS') or ''
64_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
65MEMORY_SANITIZER = (
66 '-fsanitize=memory' in _cflags or
67 '--with-memory-sanitizer' in _config_args
68)
69
Victor Stinnerbc2aa812019-05-23 03:45:09 +020070# Does io.IOBase finalizer log the exception if the close() method fails?
71# The exception is ignored silently by default in release build.
72IOBASE_EMITS_UNRAISABLE = (hasattr(sys, "gettotalrefcount") or sys.flags.dev_mode)
Victor Stinner44235042019-04-12 17:06:47 +020073
74
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000075def _default_chunk_size():
76 """Get the default TextIOWrapper chunk size"""
Marc-André Lemburg8f36af72011-02-25 15:42:01 +000077 with open(__file__, "r", encoding="latin-1") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000078 return f._CHUNK_SIZE
79
80
Antoine Pitrou328ec742010-09-14 18:37:24 +000081class MockRawIOWithoutRead:
82 """A RawIO implementation without read(), so as to exercise the default
83 RawIO.read() which calls readinto()."""
Guido van Rossuma9e20242007-03-08 00:43:48 +000084
Guido van Rossum76c5d4d2007-04-06 19:10:29 +000085 def __init__(self, read_stack=()):
86 self._read_stack = list(read_stack)
87 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000088 self._reads = 0
Antoine Pitrou32cfede2010-08-11 13:31:33 +000089 self._extraneous_reads = 0
Guido van Rossum68bbcd22007-02-27 17:19:33 +000090
Guido van Rossum01a27522007-03-07 01:00:12 +000091 def write(self, b):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +000092 self._write_stack.append(bytes(b))
Guido van Rossum01a27522007-03-07 01:00:12 +000093 return len(b)
94
95 def writable(self):
96 return True
97
Guido van Rossum68bbcd22007-02-27 17:19:33 +000098 def fileno(self):
99 return 42
100
101 def readable(self):
102 return True
103
Guido van Rossum01a27522007-03-07 01:00:12 +0000104 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000105 return True
106
Guido van Rossum01a27522007-03-07 01:00:12 +0000107 def seek(self, pos, whence):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000108 return 0 # wrong but we gotta return something
Guido van Rossum01a27522007-03-07 01:00:12 +0000109
110 def tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000111 return 0 # same comment as above
112
113 def readinto(self, buf):
114 self._reads += 1
115 max_len = len(buf)
116 try:
117 data = self._read_stack[0]
118 except IndexError:
Antoine Pitrou32cfede2010-08-11 13:31:33 +0000119 self._extraneous_reads += 1
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000120 return 0
121 if data is None:
122 del self._read_stack[0]
123 return None
124 n = len(data)
125 if len(data) <= max_len:
126 del self._read_stack[0]
127 buf[:n] = data
128 return n
129 else:
130 buf[:] = data[:max_len]
131 self._read_stack[0] = data[max_len:]
132 return max_len
133
134 def truncate(self, pos=None):
135 return pos
136
Antoine Pitrou328ec742010-09-14 18:37:24 +0000137class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
138 pass
139
140class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
141 pass
142
143
144class MockRawIO(MockRawIOWithoutRead):
145
146 def read(self, n=None):
147 self._reads += 1
148 try:
149 return self._read_stack.pop(0)
150 except:
151 self._extraneous_reads += 1
152 return b""
153
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000154class CMockRawIO(MockRawIO, io.RawIOBase):
155 pass
156
157class PyMockRawIO(MockRawIO, pyio.RawIOBase):
158 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000159
Guido van Rossuma9e20242007-03-08 00:43:48 +0000160
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000161class MisbehavedRawIO(MockRawIO):
162 def write(self, b):
163 return super().write(b) * 2
164
165 def read(self, n=None):
166 return super().read(n) * 2
167
168 def seek(self, pos, whence):
169 return -123
170
171 def tell(self):
172 return -456
173
174 def readinto(self, buf):
175 super().readinto(buf)
176 return len(buf) * 5
177
178class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
179 pass
180
181class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
182 pass
183
184
benfogle9703f092017-11-10 16:03:40 -0500185class SlowFlushRawIO(MockRawIO):
186 def __init__(self):
187 super().__init__()
188 self.in_flush = threading.Event()
189
190 def flush(self):
191 self.in_flush.set()
192 time.sleep(0.25)
193
194class CSlowFlushRawIO(SlowFlushRawIO, io.RawIOBase):
195 pass
196
197class PySlowFlushRawIO(SlowFlushRawIO, pyio.RawIOBase):
198 pass
199
200
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000201class CloseFailureIO(MockRawIO):
202 closed = 0
203
204 def close(self):
205 if not self.closed:
206 self.closed = 1
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200207 raise OSError
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000208
209class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
210 pass
211
212class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
213 pass
214
215
216class MockFileIO:
Guido van Rossum78892e42007-04-06 17:31:18 +0000217
218 def __init__(self, data):
219 self.read_history = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000220 super().__init__(data)
Guido van Rossum78892e42007-04-06 17:31:18 +0000221
222 def read(self, n=None):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000223 res = super().read(n)
Guido van Rossum78892e42007-04-06 17:31:18 +0000224 self.read_history.append(None if res is None else len(res))
225 return res
226
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000227 def readinto(self, b):
228 res = super().readinto(b)
229 self.read_history.append(res)
230 return res
Guido van Rossum78892e42007-04-06 17:31:18 +0000231
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000232class CMockFileIO(MockFileIO, io.BytesIO):
233 pass
Guido van Rossuma9e20242007-03-08 00:43:48 +0000234
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000235class PyMockFileIO(MockFileIO, pyio.BytesIO):
236 pass
237
238
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000239class MockUnseekableIO:
240 def seekable(self):
241 return False
242
243 def seek(self, *args):
244 raise self.UnsupportedOperation("not seekable")
245
246 def tell(self, *args):
247 raise self.UnsupportedOperation("not seekable")
248
Martin Panter754aab22016-03-31 07:21:56 +0000249 def truncate(self, *args):
250 raise self.UnsupportedOperation("not seekable")
251
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000252class CMockUnseekableIO(MockUnseekableIO, io.BytesIO):
253 UnsupportedOperation = io.UnsupportedOperation
254
255class PyMockUnseekableIO(MockUnseekableIO, pyio.BytesIO):
256 UnsupportedOperation = pyio.UnsupportedOperation
257
258
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000259class MockNonBlockWriterIO:
260
261 def __init__(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000262 self._write_stack = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000263 self._blocker_char = None
Guido van Rossuma9e20242007-03-08 00:43:48 +0000264
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000265 def pop_written(self):
266 s = b"".join(self._write_stack)
267 self._write_stack[:] = []
268 return s
269
270 def block_on(self, char):
271 """Block when a given char is encountered."""
272 self._blocker_char = char
273
274 def readable(self):
275 return True
276
277 def seekable(self):
278 return True
Guido van Rossuma9e20242007-03-08 00:43:48 +0000279
Victor Stinnerc15a6822019-06-13 00:23:49 +0200280 def seek(self, pos, whence=0):
281 # naive implementation, enough for tests
282 return 0
283
Guido van Rossum01a27522007-03-07 01:00:12 +0000284 def writable(self):
285 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000286
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000287 def write(self, b):
288 b = bytes(b)
289 n = -1
290 if self._blocker_char:
291 try:
292 n = b.index(self._blocker_char)
293 except ValueError:
294 pass
295 else:
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +0100296 if n > 0:
297 # write data up to the first blocker
298 self._write_stack.append(b[:n])
299 return n
300 else:
301 # cancel blocker and indicate would block
302 self._blocker_char = None
303 return None
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000304 self._write_stack.append(b)
305 return len(b)
306
307class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
308 BlockingIOError = io.BlockingIOError
309
310class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
311 BlockingIOError = pyio.BlockingIOError
312
Guido van Rossuma9e20242007-03-08 00:43:48 +0000313
Guido van Rossum28524c72007-02-27 05:47:44 +0000314class IOTest(unittest.TestCase):
315
Neal Norwitze7789b12008-03-24 06:18:09 +0000316 def setUp(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000317 support.unlink(support.TESTFN)
Neal Norwitze7789b12008-03-24 06:18:09 +0000318
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000319 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000320 support.unlink(support.TESTFN)
Guido van Rossum4d0f5a42007-03-07 22:59:39 +0000321
Guido van Rossum28524c72007-02-27 05:47:44 +0000322 def write_ops(self, f):
Guido van Rossum87429772007-04-10 21:06:59 +0000323 self.assertEqual(f.write(b"blah."), 5)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000324 f.truncate(0)
325 self.assertEqual(f.tell(), 5)
326 f.seek(0)
327
328 self.assertEqual(f.write(b"blah."), 5)
Guido van Rossum87429772007-04-10 21:06:59 +0000329 self.assertEqual(f.seek(0), 0)
330 self.assertEqual(f.write(b"Hello."), 6)
Guido van Rossum28524c72007-02-27 05:47:44 +0000331 self.assertEqual(f.tell(), 6)
Guido van Rossum87429772007-04-10 21:06:59 +0000332 self.assertEqual(f.seek(-1, 1), 5)
Guido van Rossum28524c72007-02-27 05:47:44 +0000333 self.assertEqual(f.tell(), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000334 buffer = bytearray(b" world\n\n\n")
335 self.assertEqual(f.write(buffer), 9)
336 buffer[:] = b"*" * 9 # Overwrite our copy of the data
Guido van Rossum87429772007-04-10 21:06:59 +0000337 self.assertEqual(f.seek(0), 0)
Guido van Rossum2b08b382007-05-08 20:18:39 +0000338 self.assertEqual(f.write(b"h"), 1)
Guido van Rossum87429772007-04-10 21:06:59 +0000339 self.assertEqual(f.seek(-1, 2), 13)
340 self.assertEqual(f.tell(), 13)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000341
Guido van Rossum87429772007-04-10 21:06:59 +0000342 self.assertEqual(f.truncate(12), 12)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000343 self.assertEqual(f.tell(), 13)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000344 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum28524c72007-02-27 05:47:44 +0000345
Guido van Rossum9b76da62007-04-11 01:09:03 +0000346 def read_ops(self, f, buffered=False):
347 data = f.read(5)
348 self.assertEqual(data, b"hello")
Martin Panter6bb91f32016-05-28 00:41:57 +0000349 data = byteslike(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000350 self.assertEqual(f.readinto(data), 5)
Martin Panter6bb91f32016-05-28 00:41:57 +0000351 self.assertEqual(bytes(data), b" worl")
352 data = bytearray(5)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000353 self.assertEqual(f.readinto(data), 2)
354 self.assertEqual(len(data), 5)
355 self.assertEqual(data[:2], b"d\n")
356 self.assertEqual(f.seek(0), 0)
357 self.assertEqual(f.read(20), b"hello world\n")
358 self.assertEqual(f.read(1), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000359 self.assertEqual(f.readinto(byteslike(b"x")), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000360 self.assertEqual(f.seek(-6, 2), 6)
361 self.assertEqual(f.read(5), b"world")
362 self.assertEqual(f.read(0), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000363 self.assertEqual(f.readinto(byteslike()), 0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000364 self.assertEqual(f.seek(-6, 1), 5)
365 self.assertEqual(f.read(5), b" worl")
366 self.assertEqual(f.tell(), 10)
Christian Heimes8e42a0a2007-11-08 18:04:45 +0000367 self.assertRaises(TypeError, f.seek, 0.0)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000368 if buffered:
369 f.seek(0)
370 self.assertEqual(f.read(), b"hello world\n")
371 f.seek(6)
372 self.assertEqual(f.read(), b"world\n")
373 self.assertEqual(f.read(), b"")
Martin Panter6bb91f32016-05-28 00:41:57 +0000374 f.seek(0)
375 data = byteslike(5)
376 self.assertEqual(f.readinto1(data), 5)
377 self.assertEqual(bytes(data), b"hello")
Guido van Rossum9b76da62007-04-11 01:09:03 +0000378
Guido van Rossum34d69e52007-04-10 20:08:41 +0000379 LARGE = 2**31
380
Guido van Rossum53807da2007-04-10 19:01:47 +0000381 def large_file_ops(self, f):
382 assert f.readable()
383 assert f.writable()
Xavier de Gaye877f0362016-11-17 09:20:28 +0100384 try:
385 self.assertEqual(f.seek(self.LARGE), self.LARGE)
386 except (OverflowError, ValueError):
387 self.skipTest("no largefile support")
Guido van Rossum34d69e52007-04-10 20:08:41 +0000388 self.assertEqual(f.tell(), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000389 self.assertEqual(f.write(b"xxx"), 3)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000390 self.assertEqual(f.tell(), self.LARGE + 3)
391 self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000392 self.assertEqual(f.truncate(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000393 self.assertEqual(f.tell(), self.LARGE + 2)
394 self.assertEqual(f.seek(0, 2), self.LARGE + 2)
Guido van Rossum87429772007-04-10 21:06:59 +0000395 self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +0000396 self.assertEqual(f.tell(), self.LARGE + 2)
Guido van Rossum34d69e52007-04-10 20:08:41 +0000397 self.assertEqual(f.seek(0, 2), self.LARGE + 1)
398 self.assertEqual(f.seek(-1, 2), self.LARGE)
Guido van Rossum53807da2007-04-10 19:01:47 +0000399 self.assertEqual(f.read(2), b"x")
400
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000401 def test_invalid_operations(self):
402 # Try writing on a file opened in read mode and vice-versa.
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000403 exc = self.UnsupportedOperation
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000404 for mode in ("w", "wb"):
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000405 with self.open(support.TESTFN, mode) as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000406 self.assertRaises(exc, fp.read)
407 self.assertRaises(exc, fp.readline)
408 with self.open(support.TESTFN, "wb", buffering=0) as fp:
409 self.assertRaises(exc, fp.read)
410 self.assertRaises(exc, fp.readline)
411 with self.open(support.TESTFN, "rb", buffering=0) as fp:
412 self.assertRaises(exc, fp.write, b"blah")
413 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000414 with self.open(support.TESTFN, "rb") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000415 self.assertRaises(exc, fp.write, b"blah")
416 self.assertRaises(exc, fp.writelines, [b"blah\n"])
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000417 with self.open(support.TESTFN, "r") as fp:
Antoine Pitrou0d739d72010-09-05 23:01:12 +0000418 self.assertRaises(exc, fp.write, "blah")
419 self.assertRaises(exc, fp.writelines, ["blah\n"])
420 # Non-zero seeking from current or end pos
421 self.assertRaises(exc, fp.seek, 1, self.SEEK_CUR)
422 self.assertRaises(exc, fp.seek, -1, self.SEEK_END)
Benjamin Peterson81971ea2009-05-14 22:01:31 +0000423
Martin Panter754aab22016-03-31 07:21:56 +0000424 def test_optional_abilities(self):
425 # Test for OSError when optional APIs are not supported
426 # The purpose of this test is to try fileno(), reading, writing and
427 # seeking operations with various objects that indicate they do not
428 # support these operations.
429
430 def pipe_reader():
431 [r, w] = os.pipe()
432 os.close(w) # So that read() is harmless
433 return self.FileIO(r, "r")
434
435 def pipe_writer():
436 [r, w] = os.pipe()
437 self.addCleanup(os.close, r)
438 # Guarantee that we can write into the pipe without blocking
439 thread = threading.Thread(target=os.read, args=(r, 100))
440 thread.start()
441 self.addCleanup(thread.join)
442 return self.FileIO(w, "w")
443
444 def buffered_reader():
445 return self.BufferedReader(self.MockUnseekableIO())
446
447 def buffered_writer():
448 return self.BufferedWriter(self.MockUnseekableIO())
449
450 def buffered_random():
451 return self.BufferedRandom(self.BytesIO())
452
453 def buffered_rw_pair():
454 return self.BufferedRWPair(self.MockUnseekableIO(),
455 self.MockUnseekableIO())
456
457 def text_reader():
458 class UnseekableReader(self.MockUnseekableIO):
459 writable = self.BufferedIOBase.writable
460 write = self.BufferedIOBase.write
461 return self.TextIOWrapper(UnseekableReader(), "ascii")
462
463 def text_writer():
464 class UnseekableWriter(self.MockUnseekableIO):
465 readable = self.BufferedIOBase.readable
466 read = self.BufferedIOBase.read
467 return self.TextIOWrapper(UnseekableWriter(), "ascii")
468
469 tests = (
470 (pipe_reader, "fr"), (pipe_writer, "fw"),
471 (buffered_reader, "r"), (buffered_writer, "w"),
472 (buffered_random, "rws"), (buffered_rw_pair, "rw"),
473 (text_reader, "r"), (text_writer, "w"),
474 (self.BytesIO, "rws"), (self.StringIO, "rws"),
475 )
476 for [test, abilities] in tests:
Martin Panter754aab22016-03-31 07:21:56 +0000477 with self.subTest(test), test() as obj:
478 readable = "r" in abilities
479 self.assertEqual(obj.readable(), readable)
480 writable = "w" in abilities
481 self.assertEqual(obj.writable(), writable)
Martin Panter754aab22016-03-31 07:21:56 +0000482
483 if isinstance(obj, self.TextIOBase):
484 data = "3"
485 elif isinstance(obj, (self.BufferedIOBase, self.RawIOBase)):
486 data = b"3"
487 else:
488 self.fail("Unknown base class")
489
490 if "f" in abilities:
491 obj.fileno()
492 else:
493 self.assertRaises(OSError, obj.fileno)
494
495 if readable:
496 obj.read(1)
497 obj.read()
498 else:
499 self.assertRaises(OSError, obj.read, 1)
500 self.assertRaises(OSError, obj.read)
501
502 if writable:
503 obj.write(data)
504 else:
505 self.assertRaises(OSError, obj.write, data)
506
Martin Panter3ee147f2016-03-31 21:05:31 +0000507 if sys.platform.startswith("win") and test in (
Martin Panter0950e6a2016-03-31 10:31:30 +0000508 pipe_reader, pipe_writer):
509 # Pipes seem to appear as seekable on Windows
510 continue
511 seekable = "s" in abilities
512 self.assertEqual(obj.seekable(), seekable)
513
Martin Panter754aab22016-03-31 07:21:56 +0000514 if seekable:
515 obj.tell()
516 obj.seek(0)
517 else:
518 self.assertRaises(OSError, obj.tell)
519 self.assertRaises(OSError, obj.seek, 0)
520
521 if writable and seekable:
522 obj.truncate()
523 obj.truncate(0)
524 else:
525 self.assertRaises(OSError, obj.truncate)
526 self.assertRaises(OSError, obj.truncate, 0)
527
Antoine Pitrou13348842012-01-29 18:36:34 +0100528 def test_open_handles_NUL_chars(self):
529 fn_with_NUL = 'foo\0bar'
Serhiy Storchakad8a14472014-09-06 20:07:17 +0300530 self.assertRaises(ValueError, self.open, fn_with_NUL, 'w')
Victor Stinner47b45572016-03-25 09:07:07 +0100531
532 bytes_fn = bytes(fn_with_NUL, 'ascii')
533 with warnings.catch_warnings():
534 warnings.simplefilter("ignore", DeprecationWarning)
535 self.assertRaises(ValueError, self.open, bytes_fn, 'w')
Antoine Pitrou13348842012-01-29 18:36:34 +0100536
Guido van Rossum28524c72007-02-27 05:47:44 +0000537 def test_raw_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000538 with self.open(support.TESTFN, "wb", buffering=0) as f:
539 self.assertEqual(f.readable(), False)
540 self.assertEqual(f.writable(), True)
541 self.assertEqual(f.seekable(), True)
542 self.write_ops(f)
543 with self.open(support.TESTFN, "rb", buffering=0) as f:
544 self.assertEqual(f.readable(), True)
545 self.assertEqual(f.writable(), False)
546 self.assertEqual(f.seekable(), True)
547 self.read_ops(f)
Guido van Rossum28524c72007-02-27 05:47:44 +0000548
Guido van Rossum87429772007-04-10 21:06:59 +0000549 def test_buffered_file_io(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000550 with self.open(support.TESTFN, "wb") as f:
551 self.assertEqual(f.readable(), False)
552 self.assertEqual(f.writable(), True)
553 self.assertEqual(f.seekable(), True)
554 self.write_ops(f)
555 with self.open(support.TESTFN, "rb") as f:
556 self.assertEqual(f.readable(), True)
557 self.assertEqual(f.writable(), False)
558 self.assertEqual(f.seekable(), True)
559 self.read_ops(f, True)
Guido van Rossum87429772007-04-10 21:06:59 +0000560
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000561 def test_readline(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000562 with self.open(support.TESTFN, "wb") as f:
563 f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
564 with self.open(support.TESTFN, "rb") as f:
565 self.assertEqual(f.readline(), b"abc\n")
566 self.assertEqual(f.readline(10), b"def\n")
567 self.assertEqual(f.readline(2), b"xy")
568 self.assertEqual(f.readline(4), b"zzy\n")
569 self.assertEqual(f.readline(), b"foo\x00bar\n")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +0000570 self.assertEqual(f.readline(None), b"another line")
Benjamin Peterson45cec322009-04-24 23:14:50 +0000571 self.assertRaises(TypeError, f.readline, 5.3)
572 with self.open(support.TESTFN, "r") as f:
573 self.assertRaises(TypeError, f.readline, 5.3)
Guido van Rossum48fc58a2007-06-07 23:45:37 +0000574
Serhiy Storchakabf623ae2017-04-19 20:03:52 +0300575 def test_readline_nonsizeable(self):
576 # Issue #30061
577 # Crash when readline() returns an object without __len__
578 class R(self.IOBase):
579 def readline(self):
580 return None
581 self.assertRaises((TypeError, StopIteration), next, R())
582
583 def test_next_nonsizeable(self):
584 # Issue #30061
585 # Crash when __next__() returns an object without __len__
586 class R(self.IOBase):
587 def __next__(self):
588 return None
589 self.assertRaises(TypeError, R().readlines, 1)
590
Guido van Rossum28524c72007-02-27 05:47:44 +0000591 def test_raw_bytes_io(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000592 f = self.BytesIO()
Guido van Rossum28524c72007-02-27 05:47:44 +0000593 self.write_ops(f)
594 data = f.getvalue()
595 self.assertEqual(data, b"hello world\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000596 f = self.BytesIO(data)
Guido van Rossum9b76da62007-04-11 01:09:03 +0000597 self.read_ops(f, True)
Guido van Rossum28524c72007-02-27 05:47:44 +0000598
Guido van Rossum53807da2007-04-10 19:01:47 +0000599 def test_large_file_ops(self):
Mike53f7a7c2017-12-14 14:04:53 +0300600 # On Windows and Mac OSX this test consumes large resources; It takes
Victor Stinner8c663fd2017-11-08 14:44:44 -0800601 # a long time to build the >2 GiB file and takes >2 GiB of disk space
Guido van Rossum34d69e52007-04-10 20:08:41 +0000602 # therefore the resource must be enabled to run this test.
Victor Stinner937ee9e2018-06-26 02:11:06 +0200603 if sys.platform[:3] == 'win' or sys.platform == 'darwin':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600604 support.requires(
605 'largefile',
606 'test requires %s bytes and a long time to run' % self.LARGE)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000607 with self.open(support.TESTFN, "w+b", 0) as f:
608 self.large_file_ops(f)
609 with self.open(support.TESTFN, "w+b") as f:
610 self.large_file_ops(f)
Guido van Rossum87429772007-04-10 21:06:59 +0000611
612 def test_with_open(self):
Alexey Izbysheva2670562018-10-20 03:22:31 +0300613 for bufsize in (0, 100):
Guido van Rossum87429772007-04-10 21:06:59 +0000614 f = None
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000615 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum1f2ca562007-08-27 20:44:15 +0000616 f.write(b"xxx")
Guido van Rossum87429772007-04-10 21:06:59 +0000617 self.assertEqual(f.closed, True)
618 f = None
619 try:
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000620 with self.open(support.TESTFN, "wb", bufsize) as f:
Guido van Rossum87429772007-04-10 21:06:59 +0000621 1/0
622 except ZeroDivisionError:
623 self.assertEqual(f.closed, True)
624 else:
625 self.fail("1/0 didn't raise an exception")
626
Antoine Pitrou08838b62009-01-21 00:55:13 +0000627 # issue 5008
628 def test_append_mode_tell(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000629 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000630 f.write(b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000631 with self.open(support.TESTFN, "ab", buffering=0) as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000632 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000633 with self.open(support.TESTFN, "ab") as f:
Antoine Pitrou08838b62009-01-21 00:55:13 +0000634 self.assertEqual(f.tell(), 3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000635 with self.open(support.TESTFN, "a") as f:
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300636 self.assertGreater(f.tell(), 0)
Antoine Pitrou08838b62009-01-21 00:55:13 +0000637
Guido van Rossum87429772007-04-10 21:06:59 +0000638 def test_destructor(self):
639 record = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000640 class MyFileIO(self.FileIO):
Guido van Rossum87429772007-04-10 21:06:59 +0000641 def __del__(self):
642 record.append(1)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000643 try:
644 f = super().__del__
645 except AttributeError:
646 pass
647 else:
648 f()
Guido van Rossum87429772007-04-10 21:06:59 +0000649 def close(self):
650 record.append(2)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000651 super().close()
Guido van Rossum87429772007-04-10 21:06:59 +0000652 def flush(self):
653 record.append(3)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000654 super().flush()
Brett Cannon5a9e91b2010-10-29 23:53:03 +0000655 with support.check_warnings(('', ResourceWarning)):
656 f = MyFileIO(support.TESTFN, "wb")
657 f.write(b"xxx")
658 del f
659 support.gc_collect()
660 self.assertEqual(record, [1, 2, 3])
661 with self.open(support.TESTFN, "rb") as f:
662 self.assertEqual(f.read(), b"xxx")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000663
664 def _check_base_destructor(self, base):
665 record = []
666 class MyIO(base):
667 def __init__(self):
668 # This exercises the availability of attributes on object
669 # destruction.
670 # (in the C version, close() is called by the tp_dealloc
671 # function, not by __del__)
672 self.on_del = 1
673 self.on_close = 2
674 self.on_flush = 3
675 def __del__(self):
676 record.append(self.on_del)
677 try:
678 f = super().__del__
679 except AttributeError:
680 pass
681 else:
682 f()
683 def close(self):
684 record.append(self.on_close)
685 super().close()
686 def flush(self):
687 record.append(self.on_flush)
688 super().flush()
689 f = MyIO()
Guido van Rossum87429772007-04-10 21:06:59 +0000690 del f
Benjamin Peterson24fb1d02009-04-24 23:26:21 +0000691 support.gc_collect()
Guido van Rossum87429772007-04-10 21:06:59 +0000692 self.assertEqual(record, [1, 2, 3])
693
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000694 def test_IOBase_destructor(self):
695 self._check_base_destructor(self.IOBase)
696
697 def test_RawIOBase_destructor(self):
698 self._check_base_destructor(self.RawIOBase)
699
700 def test_BufferedIOBase_destructor(self):
701 self._check_base_destructor(self.BufferedIOBase)
702
703 def test_TextIOBase_destructor(self):
704 self._check_base_destructor(self.TextIOBase)
705
Guido van Rossum87429772007-04-10 21:06:59 +0000706 def test_close_flushes(self):
Benjamin Peterson45cec322009-04-24 23:14:50 +0000707 with self.open(support.TESTFN, "wb") as f:
708 f.write(b"xxx")
709 with self.open(support.TESTFN, "rb") as f:
710 self.assertEqual(f.read(), b"xxx")
Guido van Rossuma9e20242007-03-08 00:43:48 +0000711
Guido van Rossumd4103952007-04-12 05:44:49 +0000712 def test_array_writes(self):
713 a = array.array('i', range(10))
Antoine Pitrou1ce3eb52010-09-01 20:29:34 +0000714 n = len(a.tobytes())
Martin Panter6bb91f32016-05-28 00:41:57 +0000715 def check(f):
716 with f:
717 self.assertEqual(f.write(a), n)
718 f.writelines((a,))
719 check(self.BytesIO())
720 check(self.FileIO(support.TESTFN, "w"))
721 check(self.BufferedWriter(self.MockRawIO()))
722 check(self.BufferedRandom(self.MockRawIO()))
723 check(self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()))
Guido van Rossumd4103952007-04-12 05:44:49 +0000724
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000725 def test_closefd(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000726 self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
Guido van Rossum2dced8b2007-10-30 17:27:30 +0000727 closefd=False)
Guido van Rossuma9e20242007-03-08 00:43:48 +0000728
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000729 def test_read_closed(self):
730 with self.open(support.TESTFN, "w") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000731 f.write("egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000732 with self.open(support.TESTFN, "r") as f:
733 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000734 self.assertEqual(file.read(), "egg\n")
735 file.seek(0)
736 file.close()
737 self.assertRaises(ValueError, file.read)
Miss Islington (bot)30e76932020-02-04 13:41:55 -0800738 with self.open(support.TESTFN, "rb") as f:
739 file = self.open(f.fileno(), "rb", closefd=False)
740 self.assertEqual(file.read()[:3], b"egg")
741 file.close()
742 self.assertRaises(ValueError, file.readinto, bytearray(1))
Christian Heimesecc42a22008-11-05 19:30:32 +0000743
744 def test_no_closefd_with_filename(self):
745 # can't use closefd in combination with a file name
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000746 self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000747
748 def test_closefd_attr(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000749 with self.open(support.TESTFN, "wb") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000750 f.write(b"egg\n")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000751 with self.open(support.TESTFN, "r") as f:
Christian Heimesecc42a22008-11-05 19:30:32 +0000752 self.assertEqual(f.buffer.raw.closefd, True)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000753 file = self.open(f.fileno(), "r", closefd=False)
Christian Heimesecc42a22008-11-05 19:30:32 +0000754 self.assertEqual(file.buffer.raw.closefd, False)
755
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000756 def test_garbage_collection(self):
757 # FileIO objects are collected, and collecting them flushes
758 # all data to disk.
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +0000759 with support.check_warnings(('', ResourceWarning)):
760 f = self.FileIO(support.TESTFN, "wb")
761 f.write(b"abcxxx")
762 f.f = f
763 wr = weakref.ref(f)
764 del f
765 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +0300766 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000767 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +0000768 self.assertEqual(f.read(), b"abcxxx")
Christian Heimesecc42a22008-11-05 19:30:32 +0000769
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000770 def test_unbounded_file(self):
771 # Issue #1174606: reading from an unbounded stream such as /dev/zero.
772 zero = "/dev/zero"
773 if not os.path.exists(zero):
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000774 self.skipTest("{0} does not exist".format(zero))
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000775 if sys.maxsize > 0x7FFFFFFF:
Antoine Pitrouc50cb8e2009-04-19 00:10:36 +0000776 self.skipTest("test can only run in a 32-bit address space")
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000777 if support.real_max_memuse < support._2G:
Victor Stinner8c663fd2017-11-08 14:44:44 -0800778 self.skipTest("test requires at least 2 GiB of memory")
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000779 with self.open(zero, "rb", buffering=0) as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000780 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000781 with self.open(zero, "rb") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000782 self.assertRaises(OverflowError, f.read)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +0000783 with self.open(zero, "r") as f:
Antoine Pitrou7d037a72009-03-29 18:55:12 +0000784 self.assertRaises(OverflowError, f.read)
785
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200786 def check_flush_error_on_close(self, *args, **kwargs):
787 # Test that the file is closed despite failed flush
788 # and that flush() is called before file closed.
789 f = self.open(*args, **kwargs)
790 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +0000791 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200792 closed[:] = [f.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200793 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +0000794 f.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +0200795 self.assertRaises(OSError, f.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -0600796 self.assertTrue(f.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200797 self.assertTrue(closed) # flush() called
798 self.assertFalse(closed[0]) # flush() called before file closed
Serhiy Storchakac26a1a42015-02-23 00:28:38 +0200799 f.flush = lambda: None # break reference loop
Serhiy Storchakaa3712a92015-02-21 00:35:09 +0200800
801 def test_flush_error_on_close(self):
802 # raw file
803 # Issue #5700: io.FileIO calls flush() after file closed
804 self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
805 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
806 self.check_flush_error_on_close(fd, 'wb', buffering=0)
807 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
808 self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
809 os.close(fd)
810 # buffered io
811 self.check_flush_error_on_close(support.TESTFN, 'wb')
812 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
813 self.check_flush_error_on_close(fd, 'wb')
814 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
815 self.check_flush_error_on_close(fd, 'wb', closefd=False)
816 os.close(fd)
817 # text io
818 self.check_flush_error_on_close(support.TESTFN, 'w')
819 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
820 self.check_flush_error_on_close(fd, 'w')
821 fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
822 self.check_flush_error_on_close(fd, 'w', closefd=False)
823 os.close(fd)
Antoine Pitrou6be88762010-05-03 16:48:20 +0000824
825 def test_multi_close(self):
826 f = self.open(support.TESTFN, "wb", buffering=0)
827 f.close()
828 f.close()
829 f.close()
830 self.assertRaises(ValueError, f.flush)
831
Antoine Pitrou328ec742010-09-14 18:37:24 +0000832 def test_RawIOBase_read(self):
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530833 # Exercise the default limited RawIOBase.read(n) implementation (which
834 # calls readinto() internally).
Antoine Pitrou328ec742010-09-14 18:37:24 +0000835 rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
836 self.assertEqual(rawio.read(2), b"ab")
837 self.assertEqual(rawio.read(2), b"c")
838 self.assertEqual(rawio.read(2), b"d")
839 self.assertEqual(rawio.read(2), None)
840 self.assertEqual(rawio.read(2), b"ef")
841 self.assertEqual(rawio.read(2), b"g")
842 self.assertEqual(rawio.read(2), None)
843 self.assertEqual(rawio.read(2), b"")
844
Benjamin Petersonf6f3a352011-09-03 09:26:20 -0400845 def test_types_have_dict(self):
846 test = (
847 self.IOBase(),
848 self.RawIOBase(),
849 self.TextIOBase(),
850 self.StringIO(),
851 self.BytesIO()
852 )
853 for obj in test:
854 self.assertTrue(hasattr(obj, "__dict__"))
855
Ross Lagerwall59142db2011-10-31 20:34:46 +0200856 def test_opener(self):
857 with self.open(support.TESTFN, "w") as f:
858 f.write("egg\n")
859 fd = os.open(support.TESTFN, os.O_RDONLY)
860 def opener(path, flags):
861 return fd
862 with self.open("non-existent", "r", opener=opener) as f:
863 self.assertEqual(f.read(), "egg\n")
864
Barry Warsaw480e2852016-06-08 17:47:26 -0400865 def test_bad_opener_negative_1(self):
866 # Issue #27066.
867 def badopener(fname, flags):
868 return -1
869 with self.assertRaises(ValueError) as cm:
870 open('non-existent', 'r', opener=badopener)
871 self.assertEqual(str(cm.exception), 'opener returned -1')
872
873 def test_bad_opener_other_negative(self):
874 # Issue #27066.
875 def badopener(fname, flags):
876 return -2
877 with self.assertRaises(ValueError) as cm:
878 open('non-existent', 'r', opener=badopener)
879 self.assertEqual(str(cm.exception), 'opener returned -2')
880
Hynek Schlawack2cc71562012-05-25 10:05:53 +0200881 def test_fileio_closefd(self):
882 # Issue #4841
883 with self.open(__file__, 'rb') as f1, \
884 self.open(__file__, 'rb') as f2:
885 fileio = self.FileIO(f1.fileno(), closefd=False)
886 # .__init__() must not close f1
887 fileio.__init__(f2.fileno(), closefd=False)
888 f1.readline()
889 # .close() must not close f2
890 fileio.close()
891 f2.readline()
892
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300893 def test_nonbuffered_textio(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200894 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300895 with self.assertRaises(ValueError):
896 self.open(support.TESTFN, 'w', buffering=0)
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300897
898 def test_invalid_newline(self):
Serhiy Storchaka94a619d2016-02-11 13:11:44 +0200899 with support.check_no_resource_warning(self):
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300900 with self.assertRaises(ValueError):
901 self.open(support.TESTFN, 'w', newline='invalid')
Serhiy Storchakaf10063e2014-06-09 13:32:34 +0300902
Martin Panter6bb91f32016-05-28 00:41:57 +0000903 def test_buffered_readinto_mixin(self):
904 # Test the implementation provided by BufferedIOBase
905 class Stream(self.BufferedIOBase):
906 def read(self, size):
907 return b"12345"
908 read1 = read
909 stream = Stream()
910 for method in ("readinto", "readinto1"):
911 with self.subTest(method):
912 buffer = byteslike(5)
913 self.assertEqual(getattr(stream, method)(buffer), 5)
914 self.assertEqual(bytes(buffer), b"12345")
915
Ethan Furmand62548a2016-06-04 14:38:43 -0700916 def test_fspath_support(self):
Ethan Furmand62548a2016-06-04 14:38:43 -0700917 def check_path_succeeds(path):
918 with self.open(path, "w") as f:
919 f.write("egg\n")
920
921 with self.open(path, "r") as f:
922 self.assertEqual(f.read(), "egg\n")
923
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200924 check_path_succeeds(FakePath(support.TESTFN))
925 check_path_succeeds(FakePath(support.TESTFN.encode('utf-8')))
Ethan Furmand62548a2016-06-04 14:38:43 -0700926
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200927 with self.open(support.TESTFN, "w") as f:
928 bad_path = FakePath(f.fileno())
929 with self.assertRaises(TypeError):
930 self.open(bad_path, 'w')
931
932 bad_path = FakePath(None)
Brett Cannonc78ca1e2016-06-24 12:03:43 -0700933 with self.assertRaises(TypeError):
Ethan Furmand62548a2016-06-04 14:38:43 -0700934 self.open(bad_path, 'w')
935
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200936 bad_path = FakePath(FloatingPointError)
937 with self.assertRaises(FloatingPointError):
938 self.open(bad_path, 'w')
939
Ethan Furmand62548a2016-06-04 14:38:43 -0700940 # ensure that refcounting is correct with some error conditions
941 with self.assertRaisesRegex(ValueError, 'read/write/append mode'):
Serhiy Storchakab21d1552018-03-02 11:53:51 +0200942 self.open(FakePath(support.TESTFN), 'rwxa')
Ethan Furmand62548a2016-06-04 14:38:43 -0700943
Sanyam Khurana1b74f9b2017-12-11 19:12:09 +0530944 def test_RawIOBase_readall(self):
945 # Exercise the default unlimited RawIOBase.read() and readall()
946 # implementations.
947 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
948 self.assertEqual(rawio.read(), b"abcdefg")
949 rawio = self.MockRawIOWithoutRead((b"abc", b"d", b"efg"))
950 self.assertEqual(rawio.readall(), b"abcdefg")
951
952 def test_BufferedIOBase_readinto(self):
953 # Exercise the default BufferedIOBase.readinto() and readinto1()
954 # implementations (which call read() or read1() internally).
955 class Reader(self.BufferedIOBase):
956 def __init__(self, avail):
957 self.avail = avail
958 def read(self, size):
959 result = self.avail[:size]
960 self.avail = self.avail[size:]
961 return result
962 def read1(self, size):
963 """Returns no more than 5 bytes at once"""
964 return self.read(min(size, 5))
965 tests = (
966 # (test method, total data available, read buffer size, expected
967 # read size)
968 ("readinto", 10, 5, 5),
969 ("readinto", 10, 6, 6), # More than read1() can return
970 ("readinto", 5, 6, 5), # Buffer larger than total available
971 ("readinto", 6, 7, 6),
972 ("readinto", 10, 0, 0), # Empty buffer
973 ("readinto1", 10, 5, 5), # Result limited to single read1() call
974 ("readinto1", 10, 6, 5), # Buffer larger than read1() can return
975 ("readinto1", 5, 6, 5), # Buffer larger than total available
976 ("readinto1", 6, 7, 5),
977 ("readinto1", 10, 0, 0), # Empty buffer
978 )
979 UNUSED_BYTE = 0x81
980 for test in tests:
981 with self.subTest(test):
982 method, avail, request, result = test
983 reader = Reader(bytes(range(avail)))
984 buffer = bytearray((UNUSED_BYTE,) * request)
985 method = getattr(reader, method)
986 self.assertEqual(method(buffer), result)
987 self.assertEqual(len(buffer), request)
988 self.assertSequenceEqual(buffer[:result], range(result))
989 unused = (UNUSED_BYTE,) * (request - result)
990 self.assertSequenceEqual(buffer[result:], unused)
991 self.assertEqual(len(reader.avail), avail - result)
992
Zackery Spytz28f07362018-07-17 00:31:44 -0600993 def test_close_assert(self):
994 class R(self.IOBase):
995 def __setattr__(self, name, value):
996 pass
997 def flush(self):
998 raise OSError()
999 f = R()
1000 # This would cause an assertion failure.
1001 self.assertRaises(OSError, f.close)
1002
Victor Stinner472f7942019-04-12 21:58:24 +02001003 # Silence destructor error
1004 R.flush = lambda self: None
1005
Hynek Schlawack2cc71562012-05-25 10:05:53 +02001006
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001007class CIOTest(IOTest):
Antoine Pitrou84f1b172011-07-12 21:57:15 +02001008
1009 def test_IOBase_finalize(self):
1010 # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
1011 # class which inherits IOBase and an object of this class are caught
1012 # in a reference cycle and close() is already in the method cache.
1013 class MyIO(self.IOBase):
1014 def close(self):
1015 pass
1016
1017 # create an instance to populate the method cache
1018 MyIO()
1019 obj = MyIO()
1020 obj.obj = obj
1021 wr = weakref.ref(obj)
1022 del MyIO
1023 del obj
1024 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001025 self.assertIsNone(wr(), wr)
Guido van Rossuma9e20242007-03-08 00:43:48 +00001026
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001027class PyIOTest(IOTest):
1028 pass
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001029
Guido van Rossuma9e20242007-03-08 00:43:48 +00001030
Gregory P. Smith1bef9072015-04-14 13:24:34 -07001031@support.cpython_only
1032class APIMismatchTest(unittest.TestCase):
Gregory P. Smith054b0652015-04-14 12:58:05 -07001033
Gregory P. Smith054b0652015-04-14 12:58:05 -07001034 def test_RawIOBase_io_in_pyio_match(self):
1035 """Test that pyio RawIOBase class has all c RawIOBase methods"""
Antoine Pitrou45d61562015-05-20 21:50:59 +02001036 mismatch = support.detect_api_mismatch(pyio.RawIOBase, io.RawIOBase,
1037 ignore=('__weakref__',))
Gregory P. Smith054b0652015-04-14 12:58:05 -07001038 self.assertEqual(mismatch, set(), msg='Python RawIOBase does not have all C RawIOBase methods')
1039
1040 def test_RawIOBase_pyio_in_io_match(self):
1041 """Test that c RawIOBase class has all pyio RawIOBase methods"""
1042 mismatch = support.detect_api_mismatch(io.RawIOBase, pyio.RawIOBase)
1043 self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
1044
1045
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001046class CommonBufferedTests:
1047 # Tests common to BufferedReader, BufferedWriter and BufferedRandom
1048
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001049 def test_detach(self):
1050 raw = self.MockRawIO()
1051 buf = self.tp(raw)
1052 self.assertIs(buf.detach(), raw)
1053 self.assertRaises(ValueError, buf.detach)
1054
Benjamin Peterson10e76b62014-12-21 20:51:50 -06001055 repr(buf) # Should still work
1056
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001057 def test_fileno(self):
1058 rawio = self.MockRawIO()
1059 bufio = self.tp(rawio)
1060
Ezio Melottib3aedd42010-11-20 19:04:17 +00001061 self.assertEqual(42, bufio.fileno())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001062
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001063 def test_invalid_args(self):
1064 rawio = self.MockRawIO()
1065 bufio = self.tp(rawio)
1066 # Invalid whence
1067 self.assertRaises(ValueError, bufio.seek, 0, -1)
Jesus Cea94363612012-06-22 18:32:07 +02001068 self.assertRaises(ValueError, bufio.seek, 0, 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001069
1070 def test_override_destructor(self):
1071 tp = self.tp
1072 record = []
1073 class MyBufferedIO(tp):
1074 def __del__(self):
1075 record.append(1)
1076 try:
1077 f = super().__del__
1078 except AttributeError:
1079 pass
1080 else:
1081 f()
1082 def close(self):
1083 record.append(2)
1084 super().close()
1085 def flush(self):
1086 record.append(3)
1087 super().flush()
1088 rawio = self.MockRawIO()
1089 bufio = MyBufferedIO(rawio)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001090 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001091 support.gc_collect()
Martin Panter754aab22016-03-31 07:21:56 +00001092 self.assertEqual(record, [1, 2, 3])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001093
1094 def test_context_manager(self):
1095 # Test usability as a context manager
1096 rawio = self.MockRawIO()
1097 bufio = self.tp(rawio)
1098 def _with():
1099 with bufio:
1100 pass
1101 _with()
1102 # bufio should now be closed, and using it a second time should raise
1103 # a ValueError.
1104 self.assertRaises(ValueError, _with)
1105
1106 def test_error_through_destructor(self):
1107 # Test that the exception state is not modified by a destructor,
1108 # even if close() fails.
1109 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02001110 with support.catch_unraisable_exception() as cm:
1111 with self.assertRaises(AttributeError):
1112 self.tp(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02001113
1114 if not IOBASE_EMITS_UNRAISABLE:
1115 self.assertIsNone(cm.unraisable)
1116 elif cm.unraisable is not None:
1117 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum78892e42007-04-06 17:31:18 +00001118
Antoine Pitrou716c4442009-05-23 19:04:03 +00001119 def test_repr(self):
1120 raw = self.MockRawIO()
1121 b = self.tp(raw)
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001122 clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
1123 self.assertRegex(repr(b), "<%s>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001124 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001125 self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001126 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07001127 self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00001128
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02001129 def test_recursive_repr(self):
1130 # Issue #25455
1131 raw = self.MockRawIO()
1132 b = self.tp(raw)
1133 with support.swap_attr(raw, 'name', b):
1134 try:
1135 repr(b) # Should not crash
1136 except RuntimeError:
1137 pass
1138
Antoine Pitrou6be88762010-05-03 16:48:20 +00001139 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001140 # Test that buffered file is closed despite failed flush
1141 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00001142 raw = self.MockRawIO()
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001143 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00001144 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001145 closed[:] = [b.closed, raw.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001146 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00001147 raw.flush = bad_flush
1148 b = self.tp(raw)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001149 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001150 self.assertTrue(b.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02001151 self.assertTrue(raw.closed)
1152 self.assertTrue(closed) # flush() called
1153 self.assertFalse(closed[0]) # flush() called before file closed
1154 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02001155 raw.flush = lambda: None # break reference loop
Benjamin Peterson68623612012-12-20 11:53:11 -06001156
1157 def test_close_error_on_close(self):
1158 raw = self.MockRawIO()
1159 def bad_flush():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001160 raise OSError('flush')
Benjamin Peterson68623612012-12-20 11:53:11 -06001161 def bad_close():
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001162 raise OSError('close')
Benjamin Peterson68623612012-12-20 11:53:11 -06001163 raw.close = bad_close
1164 b = self.tp(raw)
1165 b.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001166 with self.assertRaises(OSError) as err: # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001167 b.close()
1168 self.assertEqual(err.exception.args, ('close',))
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001169 self.assertIsInstance(err.exception.__context__, OSError)
Benjamin Peterson68623612012-12-20 11:53:11 -06001170 self.assertEqual(err.exception.__context__.args, ('flush',))
1171 self.assertFalse(b.closed)
Antoine Pitrou6be88762010-05-03 16:48:20 +00001172
Victor Stinner472f7942019-04-12 21:58:24 +02001173 # Silence destructor error
1174 raw.close = lambda: None
1175 b.flush = lambda: None
1176
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03001177 def test_nonnormalized_close_error_on_close(self):
1178 # Issue #21677
1179 raw = self.MockRawIO()
1180 def bad_flush():
1181 raise non_existing_flush
1182 def bad_close():
1183 raise non_existing_close
1184 raw.close = bad_close
1185 b = self.tp(raw)
1186 b.flush = bad_flush
1187 with self.assertRaises(NameError) as err: # exception not swallowed
1188 b.close()
1189 self.assertIn('non_existing_close', str(err.exception))
1190 self.assertIsInstance(err.exception.__context__, NameError)
1191 self.assertIn('non_existing_flush', str(err.exception.__context__))
1192 self.assertFalse(b.closed)
1193
Victor Stinner472f7942019-04-12 21:58:24 +02001194 # Silence destructor error
1195 b.flush = lambda: None
1196 raw.close = lambda: None
1197
Antoine Pitrou6be88762010-05-03 16:48:20 +00001198 def test_multi_close(self):
1199 raw = self.MockRawIO()
1200 b = self.tp(raw)
1201 b.close()
1202 b.close()
1203 b.close()
1204 self.assertRaises(ValueError, b.flush)
1205
Antoine Pitrou0d739d72010-09-05 23:01:12 +00001206 def test_unseekable(self):
1207 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1208 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1209 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1210
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00001211 def test_readonly_attributes(self):
1212 raw = self.MockRawIO()
1213 buf = self.tp(raw)
1214 x = self.MockRawIO()
1215 with self.assertRaises(AttributeError):
1216 buf.raw = x
1217
Guido van Rossum78892e42007-04-06 17:31:18 +00001218
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001219class SizeofTest:
1220
1221 @support.cpython_only
1222 def test_sizeof(self):
1223 bufsize1 = 4096
1224 bufsize2 = 8192
1225 rawio = self.MockRawIO()
1226 bufio = self.tp(rawio, buffer_size=bufsize1)
1227 size = sys.getsizeof(bufio) - bufsize1
1228 rawio = self.MockRawIO()
1229 bufio = self.tp(rawio, buffer_size=bufsize2)
1230 self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
1231
Jesus Ceadc469452012-10-04 12:37:56 +02001232 @support.cpython_only
1233 def test_buffer_freeing(self) :
1234 bufsize = 4096
1235 rawio = self.MockRawIO()
1236 bufio = self.tp(rawio, buffer_size=bufsize)
1237 size = sys.getsizeof(bufio) - bufsize
1238 bufio.close()
1239 self.assertEqual(sys.getsizeof(bufio), size)
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001240
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001241class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
1242 read_mode = "rb"
Guido van Rossum78892e42007-04-06 17:31:18 +00001243
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001244 def test_constructor(self):
1245 rawio = self.MockRawIO([b"abc"])
1246 bufio = self.tp(rawio)
1247 bufio.__init__(rawio)
1248 bufio.__init__(rawio, buffer_size=1024)
1249 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001250 self.assertEqual(b"abc", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001251 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1252 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1253 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1254 rawio = self.MockRawIO([b"abc"])
1255 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001256 self.assertEqual(b"abc", bufio.read())
Guido van Rossum78892e42007-04-06 17:31:18 +00001257
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001258 def test_uninitialized(self):
1259 bufio = self.tp.__new__(self.tp)
1260 del bufio
1261 bufio = self.tp.__new__(self.tp)
1262 self.assertRaisesRegex((ValueError, AttributeError),
1263 'uninitialized|has no attribute',
1264 bufio.read, 0)
1265 bufio.__init__(self.MockRawIO())
1266 self.assertEqual(bufio.read(0), b'')
1267
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001268 def test_read(self):
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001269 for arg in (None, 7):
1270 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1271 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001272 self.assertEqual(b"abcdefg", bufio.read(arg))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001273 # Invalid args
1274 self.assertRaises(ValueError, bufio.read, -2)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001275
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001276 def test_read1(self):
1277 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1278 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001279 self.assertEqual(b"a", bufio.read(1))
1280 self.assertEqual(b"b", bufio.read1(1))
1281 self.assertEqual(rawio._reads, 1)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001282 self.assertEqual(b"", bufio.read1(0))
Ezio Melottib3aedd42010-11-20 19:04:17 +00001283 self.assertEqual(b"c", bufio.read1(100))
1284 self.assertEqual(rawio._reads, 1)
1285 self.assertEqual(b"d", bufio.read1(100))
1286 self.assertEqual(rawio._reads, 2)
1287 self.assertEqual(b"efg", bufio.read1(100))
1288 self.assertEqual(rawio._reads, 3)
1289 self.assertEqual(b"", bufio.read1(100))
1290 self.assertEqual(rawio._reads, 4)
Martin Panterccb2c0e2016-10-20 23:48:14 +00001291
1292 def test_read1_arbitrary(self):
1293 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1294 bufio = self.tp(rawio)
1295 self.assertEqual(b"a", bufio.read(1))
1296 self.assertEqual(b"bc", bufio.read1())
1297 self.assertEqual(b"d", bufio.read1())
1298 self.assertEqual(b"efg", bufio.read1(-1))
1299 self.assertEqual(rawio._reads, 3)
1300 self.assertEqual(b"", bufio.read1())
1301 self.assertEqual(rawio._reads, 4)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001302
1303 def test_readinto(self):
1304 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1305 bufio = self.tp(rawio)
1306 b = bytearray(2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001307 self.assertEqual(bufio.readinto(b), 2)
1308 self.assertEqual(b, b"ab")
1309 self.assertEqual(bufio.readinto(b), 2)
1310 self.assertEqual(b, b"cd")
1311 self.assertEqual(bufio.readinto(b), 2)
1312 self.assertEqual(b, b"ef")
1313 self.assertEqual(bufio.readinto(b), 1)
1314 self.assertEqual(b, b"gf")
1315 self.assertEqual(bufio.readinto(b), 0)
1316 self.assertEqual(b, b"gf")
Antoine Pitrou3486a982011-05-12 01:57:53 +02001317 rawio = self.MockRawIO((b"abc", None))
1318 bufio = self.tp(rawio)
1319 self.assertEqual(bufio.readinto(b), 2)
1320 self.assertEqual(b, b"ab")
1321 self.assertEqual(bufio.readinto(b), 1)
1322 self.assertEqual(b, b"cb")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001323
Benjamin Petersona96fea02014-06-22 14:17:44 -07001324 def test_readinto1(self):
1325 buffer_size = 10
1326 rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
1327 bufio = self.tp(rawio, buffer_size=buffer_size)
1328 b = bytearray(2)
1329 self.assertEqual(bufio.peek(3), b'abc')
1330 self.assertEqual(rawio._reads, 1)
1331 self.assertEqual(bufio.readinto1(b), 2)
1332 self.assertEqual(b, b"ab")
1333 self.assertEqual(rawio._reads, 1)
1334 self.assertEqual(bufio.readinto1(b), 1)
1335 self.assertEqual(b[:1], b"c")
1336 self.assertEqual(rawio._reads, 1)
1337 self.assertEqual(bufio.readinto1(b), 2)
1338 self.assertEqual(b, b"de")
1339 self.assertEqual(rawio._reads, 2)
1340 b = bytearray(2*buffer_size)
1341 self.assertEqual(bufio.peek(3), b'fgh')
1342 self.assertEqual(rawio._reads, 3)
1343 self.assertEqual(bufio.readinto1(b), 6)
1344 self.assertEqual(b[:6], b"fghjkl")
1345 self.assertEqual(rawio._reads, 4)
1346
1347 def test_readinto_array(self):
1348 buffer_size = 60
1349 data = b"a" * 26
1350 rawio = self.MockRawIO((data,))
1351 bufio = self.tp(rawio, buffer_size=buffer_size)
1352
1353 # Create an array with element size > 1 byte
1354 b = array.array('i', b'x' * 32)
1355 assert len(b) != 16
1356
1357 # Read into it. We should get as many *bytes* as we can fit into b
1358 # (which is more than the number of elements)
1359 n = bufio.readinto(b)
1360 self.assertGreater(n, len(b))
1361
1362 # Check that old contents of b are preserved
1363 bm = memoryview(b).cast('B')
1364 self.assertLess(n, len(bm))
1365 self.assertEqual(bm[:n], data[:n])
1366 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1367
1368 def test_readinto1_array(self):
1369 buffer_size = 60
1370 data = b"a" * 26
1371 rawio = self.MockRawIO((data,))
1372 bufio = self.tp(rawio, buffer_size=buffer_size)
1373
1374 # Create an array with element size > 1 byte
1375 b = array.array('i', b'x' * 32)
1376 assert len(b) != 16
1377
1378 # Read into it. We should get as many *bytes* as we can fit into b
1379 # (which is more than the number of elements)
1380 n = bufio.readinto1(b)
1381 self.assertGreater(n, len(b))
1382
1383 # Check that old contents of b are preserved
1384 bm = memoryview(b).cast('B')
1385 self.assertLess(n, len(bm))
1386 self.assertEqual(bm[:n], data[:n])
1387 self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
1388
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001389 def test_readlines(self):
1390 def bufio():
1391 rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
1392 return self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001393 self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
1394 self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
1395 self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001396
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001397 def test_buffering(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00001398 data = b"abcdefghi"
1399 dlen = len(data)
1400
1401 tests = [
1402 [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
1403 [ 100, [ 3, 3, 3], [ dlen ] ],
1404 [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
1405 ]
1406
1407 for bufsize, buf_read_sizes, raw_read_sizes in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001408 rawio = self.MockFileIO(data)
1409 bufio = self.tp(rawio, buffer_size=bufsize)
Guido van Rossum78892e42007-04-06 17:31:18 +00001410 pos = 0
1411 for nbytes in buf_read_sizes:
Ezio Melottib3aedd42010-11-20 19:04:17 +00001412 self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
Guido van Rossum78892e42007-04-06 17:31:18 +00001413 pos += nbytes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001414 # this is mildly implementation-dependent
Ezio Melottib3aedd42010-11-20 19:04:17 +00001415 self.assertEqual(rawio.read_history, raw_read_sizes)
Guido van Rossum78892e42007-04-06 17:31:18 +00001416
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001417 def test_read_non_blocking(self):
Guido van Rossum01a27522007-03-07 01:00:12 +00001418 # Inject some None's in there to simulate EWOULDBLOCK
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001419 rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
1420 bufio = self.tp(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001421 self.assertEqual(b"abcd", bufio.read(6))
1422 self.assertEqual(b"e", bufio.read(1))
1423 self.assertEqual(b"fg", bufio.read())
1424 self.assertEqual(b"", bufio.peek(1))
Victor Stinnera80987f2011-05-25 22:47:16 +02001425 self.assertIsNone(bufio.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00001426 self.assertEqual(b"", bufio.read())
Guido van Rossum01a27522007-03-07 01:00:12 +00001427
Victor Stinnera80987f2011-05-25 22:47:16 +02001428 rawio = self.MockRawIO((b"a", None, None))
1429 self.assertEqual(b"a", rawio.readall())
1430 self.assertIsNone(rawio.readall())
1431
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001432 def test_read_past_eof(self):
1433 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1434 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001435
Ezio Melottib3aedd42010-11-20 19:04:17 +00001436 self.assertEqual(b"abcdefg", bufio.read(9000))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001437
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001438 def test_read_all(self):
1439 rawio = self.MockRawIO((b"abc", b"d", b"efg"))
1440 bufio = self.tp(rawio)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001441
Ezio Melottib3aedd42010-11-20 19:04:17 +00001442 self.assertEqual(b"abcdefg", bufio.read())
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001443
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001444 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001445 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001446 try:
1447 # Write out many bytes with exactly the same number of 0's,
1448 # 1's... 255's. This will help us check that concurrent reading
1449 # doesn't duplicate or forget contents.
1450 N = 1000
1451 l = list(range(256)) * N
1452 random.shuffle(l)
1453 s = bytes(bytearray(l))
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001454 with self.open(support.TESTFN, "wb") as f:
Antoine Pitrou87695762008-08-14 22:44:29 +00001455 f.write(s)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001456 with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001457 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001458 errors = []
1459 results = []
1460 def f():
1461 try:
1462 # Intra-buffer read then buffer-flushing read
1463 for n in cycle([1, 19]):
1464 s = bufio.read(n)
1465 if not s:
1466 break
1467 # list.append() is atomic
1468 results.append(s)
1469 except Exception as e:
1470 errors.append(e)
1471 raise
1472 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001473 with support.start_threads(threads):
1474 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001475 self.assertFalse(errors,
1476 "the following exceptions were caught: %r" % errors)
1477 s = b''.join(results)
1478 for i in range(256):
1479 c = bytes(bytearray([i]))
1480 self.assertEqual(s.count(c), N)
1481 finally:
1482 support.unlink(support.TESTFN)
1483
Antoine Pitrou1e44fec2011-10-04 12:26:20 +02001484 def test_unseekable(self):
1485 bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
1486 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1487 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1488 bufio.read(1)
1489 self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
1490 self.assertRaises(self.UnsupportedOperation, bufio.tell)
1491
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001492 def test_misbehaved_io(self):
1493 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1494 bufio = self.tp(rawio)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001495 self.assertRaises(OSError, bufio.seek, 0)
1496 self.assertRaises(OSError, bufio.tell)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001497
Victor Stinnerc15a6822019-06-13 00:23:49 +02001498 # Silence destructor error
1499 bufio.close = lambda: None
1500
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001501 def test_no_extraneous_read(self):
1502 # Issue #9550; when the raw IO object has satisfied the read request,
1503 # we should not issue any additional reads, otherwise it may block
1504 # (e.g. socket).
1505 bufsize = 16
1506 for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
1507 rawio = self.MockRawIO([b"x" * n])
1508 bufio = self.tp(rawio, bufsize)
1509 self.assertEqual(bufio.read(n), b"x" * n)
1510 # Simple case: one raw read is enough to satisfy the request.
1511 self.assertEqual(rawio._extraneous_reads, 0,
1512 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1513 # A more complex case where two raw reads are needed to satisfy
1514 # the request.
1515 rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
1516 bufio = self.tp(rawio, bufsize)
1517 self.assertEqual(bufio.read(n), b"x" * n)
1518 self.assertEqual(rawio._extraneous_reads, 0,
1519 "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
1520
Berker Peksagd10d6ae2015-05-12 17:01:05 +03001521 def test_read_on_closed(self):
1522 # Issue #23796
1523 b = io.BufferedReader(io.BytesIO(b"12"))
1524 b.read(1)
1525 b.close()
1526 self.assertRaises(ValueError, b.peek)
1527 self.assertRaises(ValueError, b.read1, 1)
1528
Antoine Pitrou32cfede2010-08-11 13:31:33 +00001529
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001530class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001531 tp = io.BufferedReader
1532
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001533 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1534 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001535 def test_constructor(self):
1536 BufferedReaderTest.test_constructor(self)
1537 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001538 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001539 if sys.maxsize > 0x7FFFFFFF:
1540 rawio = self.MockRawIO()
1541 bufio = self.tp(rawio)
1542 self.assertRaises((OverflowError, MemoryError, ValueError),
1543 bufio.__init__, rawio, sys.maxsize)
1544
1545 def test_initialization(self):
1546 rawio = self.MockRawIO([b"abc"])
1547 bufio = self.tp(rawio)
1548 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1549 self.assertRaises(ValueError, bufio.read)
1550 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1551 self.assertRaises(ValueError, bufio.read)
1552 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1553 self.assertRaises(ValueError, bufio.read)
1554
1555 def test_misbehaved_io_read(self):
1556 rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
1557 bufio = self.tp(rawio)
1558 # _pyio.BufferedReader seems to implement reading different, so that
1559 # checking this is not so easy.
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001560 self.assertRaises(OSError, bufio.read, 10)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001561
1562 def test_garbage_collection(self):
1563 # C BufferedReader objects are collected.
1564 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001565 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001566 with support.check_warnings(('', ResourceWarning)):
1567 rawio = self.FileIO(support.TESTFN, "w+b")
1568 f = self.tp(rawio)
1569 f.f = f
1570 wr = weakref.ref(f)
1571 del f
1572 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001573 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001574
R David Murray67bfe802013-02-23 21:51:05 -05001575 def test_args_error(self):
1576 # Issue #17275
1577 with self.assertRaisesRegex(TypeError, "BufferedReader"):
1578 self.tp(io.BytesIO(), 1024, 1024, 1024)
1579
1580
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001581class PyBufferedReaderTest(BufferedReaderTest):
1582 tp = pyio.BufferedReader
Antoine Pitrou87695762008-08-14 22:44:29 +00001583
Guido van Rossuma9e20242007-03-08 00:43:48 +00001584
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001585class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
1586 write_mode = "wb"
Guido van Rossuma9e20242007-03-08 00:43:48 +00001587
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001588 def test_constructor(self):
1589 rawio = self.MockRawIO()
1590 bufio = self.tp(rawio)
1591 bufio.__init__(rawio)
1592 bufio.__init__(rawio, buffer_size=1024)
1593 bufio.__init__(rawio, buffer_size=16)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001594 self.assertEqual(3, bufio.write(b"abc"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001595 bufio.flush()
1596 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1597 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1598 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1599 bufio.__init__(rawio)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001600 self.assertEqual(3, bufio.write(b"ghi"))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001601 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001602 self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001603
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001604 def test_uninitialized(self):
1605 bufio = self.tp.__new__(self.tp)
1606 del bufio
1607 bufio = self.tp.__new__(self.tp)
1608 self.assertRaisesRegex((ValueError, AttributeError),
1609 'uninitialized|has no attribute',
1610 bufio.write, b'')
1611 bufio.__init__(self.MockRawIO())
1612 self.assertEqual(bufio.write(b''), 0)
1613
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001614 def test_detach_flush(self):
1615 raw = self.MockRawIO()
1616 buf = self.tp(raw)
1617 buf.write(b"howdy!")
1618 self.assertFalse(raw._write_stack)
1619 buf.detach()
1620 self.assertEqual(raw._write_stack, [b"howdy!"])
1621
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001622 def test_write(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001623 # Write to the buffered IO but don't overflow the buffer.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001624 writer = self.MockRawIO()
1625 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001626 bufio.write(b"abc")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001627 self.assertFalse(writer._write_stack)
Martin Panter6bb91f32016-05-28 00:41:57 +00001628 buffer = bytearray(b"def")
1629 bufio.write(buffer)
1630 buffer[:] = b"***" # Overwrite our copy of the data
1631 bufio.flush()
1632 self.assertEqual(b"".join(writer._write_stack), b"abcdef")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001633
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001634 def test_write_overflow(self):
1635 writer = self.MockRawIO()
1636 bufio = self.tp(writer, 8)
1637 contents = b"abcdefghijklmnop"
1638 for n in range(0, len(contents), 3):
1639 bufio.write(contents[n:n+3])
1640 flushed = b"".join(writer._write_stack)
1641 # At least (total - 8) bytes were implicitly flushed, perhaps more
1642 # depending on the implementation.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00001643 self.assertTrue(flushed.startswith(contents[:-8]), flushed)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001644
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001645 def check_writes(self, intermediate_func):
1646 # Lots of writes, test the flushed output is as expected.
1647 contents = bytes(range(256)) * 1000
1648 n = 0
1649 writer = self.MockRawIO()
1650 bufio = self.tp(writer, 13)
1651 # Generator of write sizes: repeat each N 15 times then proceed to N+1
1652 def gen_sizes():
1653 for size in count(1):
1654 for i in range(15):
1655 yield size
1656 sizes = gen_sizes()
1657 while n < len(contents):
1658 size = min(next(sizes), len(contents) - n)
Ezio Melottib3aedd42010-11-20 19:04:17 +00001659 self.assertEqual(bufio.write(contents[n:n+size]), size)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001660 intermediate_func(bufio)
1661 n += size
1662 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001663 self.assertEqual(contents, b"".join(writer._write_stack))
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001664
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001665 def test_writes(self):
1666 self.check_writes(lambda bufio: None)
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001667
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001668 def test_writes_and_flushes(self):
1669 self.check_writes(lambda bufio: bufio.flush())
Guido van Rossum01a27522007-03-07 01:00:12 +00001670
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001671 def test_writes_and_seeks(self):
1672 def _seekabs(bufio):
1673 pos = bufio.tell()
1674 bufio.seek(pos + 1, 0)
1675 bufio.seek(pos - 1, 0)
1676 bufio.seek(pos, 0)
1677 self.check_writes(_seekabs)
1678 def _seekrel(bufio):
1679 pos = bufio.seek(0, 1)
1680 bufio.seek(+1, 1)
1681 bufio.seek(-1, 1)
1682 bufio.seek(pos, 0)
1683 self.check_writes(_seekrel)
Guido van Rossum01a27522007-03-07 01:00:12 +00001684
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001685 def test_writes_and_truncates(self):
1686 self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
Guido van Rossum01a27522007-03-07 01:00:12 +00001687
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001688 def test_write_non_blocking(self):
1689 raw = self.MockNonBlockWriterIO()
Benjamin Peterson59406a92009-03-26 17:10:29 +00001690 bufio = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00001691
Ezio Melottib3aedd42010-11-20 19:04:17 +00001692 self.assertEqual(bufio.write(b"abcd"), 4)
1693 self.assertEqual(bufio.write(b"efghi"), 5)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001694 # 1 byte will be written, the rest will be buffered
1695 raw.block_on(b"k")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001696 self.assertEqual(bufio.write(b"jklmn"), 5)
Guido van Rossum01a27522007-03-07 01:00:12 +00001697
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001698 # 8 bytes will be written, 8 will be buffered and the rest will be lost
1699 raw.block_on(b"0")
1700 try:
1701 bufio.write(b"opqrwxyz0123456789")
1702 except self.BlockingIOError as e:
1703 written = e.characters_written
1704 else:
1705 self.fail("BlockingIOError should have been raised")
Ezio Melottib3aedd42010-11-20 19:04:17 +00001706 self.assertEqual(written, 16)
1707 self.assertEqual(raw.pop_written(),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001708 b"abcdefghijklmnopqrwxyz")
Guido van Rossum01a27522007-03-07 01:00:12 +00001709
Ezio Melottib3aedd42010-11-20 19:04:17 +00001710 self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001711 s = raw.pop_written()
1712 # Previously buffered bytes were flushed
1713 self.assertTrue(s.startswith(b"01234567A"), s)
Guido van Rossum01a27522007-03-07 01:00:12 +00001714
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001715 def test_write_and_rewind(self):
1716 raw = io.BytesIO()
1717 bufio = self.tp(raw, 4)
1718 self.assertEqual(bufio.write(b"abcdef"), 6)
1719 self.assertEqual(bufio.tell(), 6)
1720 bufio.seek(0, 0)
1721 self.assertEqual(bufio.write(b"XY"), 2)
1722 bufio.seek(6, 0)
1723 self.assertEqual(raw.getvalue(), b"XYcdef")
1724 self.assertEqual(bufio.write(b"123456"), 6)
1725 bufio.flush()
1726 self.assertEqual(raw.getvalue(), b"XYcdef123456")
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001727
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001728 def test_flush(self):
1729 writer = self.MockRawIO()
1730 bufio = self.tp(writer, 8)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00001731 bufio.write(b"abc")
1732 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001733 self.assertEqual(b"abc", writer._write_stack[0])
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001734
Antoine Pitrou131a4892012-10-16 22:57:11 +02001735 def test_writelines(self):
1736 l = [b'ab', b'cd', b'ef']
1737 writer = self.MockRawIO()
1738 bufio = self.tp(writer, 8)
1739 bufio.writelines(l)
1740 bufio.flush()
1741 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1742
1743 def test_writelines_userlist(self):
1744 l = UserList([b'ab', b'cd', b'ef'])
1745 writer = self.MockRawIO()
1746 bufio = self.tp(writer, 8)
1747 bufio.writelines(l)
1748 bufio.flush()
1749 self.assertEqual(b''.join(writer._write_stack), b'abcdef')
1750
1751 def test_writelines_error(self):
1752 writer = self.MockRawIO()
1753 bufio = self.tp(writer, 8)
1754 self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
1755 self.assertRaises(TypeError, bufio.writelines, None)
1756 self.assertRaises(TypeError, bufio.writelines, 'abc')
1757
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001758 def test_destructor(self):
1759 writer = self.MockRawIO()
1760 bufio = self.tp(writer, 8)
1761 bufio.write(b"abc")
1762 del bufio
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00001763 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00001764 self.assertEqual(b"abc", writer._write_stack[0])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001765
1766 def test_truncate(self):
1767 # Truncate implicitly flushes the buffer.
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001768 self.addCleanup(support.unlink, support.TESTFN)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001769 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001770 bufio = self.tp(raw, 8)
1771 bufio.write(b"abcdef")
1772 self.assertEqual(bufio.truncate(3), 3)
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00001773 self.assertEqual(bufio.tell(), 6)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001774 with self.open(support.TESTFN, "rb", buffering=0) as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001775 self.assertEqual(f.read(), b"abc")
1776
Nitish Chandra059f58c2018-01-28 21:30:09 +05301777 def test_truncate_after_write(self):
1778 # Ensure that truncate preserves the file position after
1779 # writes longer than the buffer size.
1780 # Issue: https://bugs.python.org/issue32228
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001781 self.addCleanup(support.unlink, support.TESTFN)
Nitish Chandra059f58c2018-01-28 21:30:09 +05301782 with self.open(support.TESTFN, "wb") as f:
1783 # Fill with some buffer
1784 f.write(b'\x00' * 10000)
1785 buffer_sizes = [8192, 4096, 200]
1786 for buffer_size in buffer_sizes:
1787 with self.open(support.TESTFN, "r+b", buffering=buffer_size) as f:
1788 f.write(b'\x00' * (buffer_size + 1))
1789 # After write write_pos and write_end are set to 0
1790 f.read(1)
1791 # read operation makes sure that pos != raw_pos
1792 f.truncate()
1793 self.assertEqual(f.tell(), buffer_size + 2)
1794
Antoine Pitrou5bc4fa72010-10-14 15:34:31 +00001795 @support.requires_resource('cpu')
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001796 def test_threads(self):
Antoine Pitrou87695762008-08-14 22:44:29 +00001797 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001798 # Write out many bytes from many threads and test they were
1799 # all flushed.
1800 N = 1000
1801 contents = bytes(range(256)) * N
1802 sizes = cycle([1, 19])
1803 n = 0
1804 queue = deque()
1805 while n < len(contents):
1806 size = next(sizes)
1807 queue.append(contents[n:n+size])
1808 n += size
1809 del contents
Antoine Pitrou87695762008-08-14 22:44:29 +00001810 # We use a real file object because it allows us to
1811 # exercise situations where the GIL is released before
1812 # writing the buffer to the raw streams. This is in addition
1813 # to concurrency issues due to switching threads in the middle
1814 # of Python code.
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001815 with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001816 bufio = self.tp(raw, 8)
Antoine Pitrou87695762008-08-14 22:44:29 +00001817 errors = []
1818 def f():
1819 try:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001820 while True:
1821 try:
1822 s = queue.popleft()
1823 except IndexError:
1824 return
Antoine Pitrou87695762008-08-14 22:44:29 +00001825 bufio.write(s)
1826 except Exception as e:
1827 errors.append(e)
1828 raise
1829 threads = [threading.Thread(target=f) for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03001830 with support.start_threads(threads):
1831 time.sleep(0.02) # yield
Antoine Pitrou87695762008-08-14 22:44:29 +00001832 self.assertFalse(errors,
1833 "the following exceptions were caught: %r" % errors)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001834 bufio.close()
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001835 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001836 s = f.read()
1837 for i in range(256):
Ezio Melottib3aedd42010-11-20 19:04:17 +00001838 self.assertEqual(s.count(bytes([i])), N)
Antoine Pitrou87695762008-08-14 22:44:29 +00001839 finally:
1840 support.unlink(support.TESTFN)
1841
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001842 def test_misbehaved_io(self):
1843 rawio = self.MisbehavedRawIO()
1844 bufio = self.tp(rawio, 5)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001845 self.assertRaises(OSError, bufio.seek, 0)
1846 self.assertRaises(OSError, bufio.tell)
1847 self.assertRaises(OSError, bufio.write, b"abcdef")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001848
Victor Stinnerc15a6822019-06-13 00:23:49 +02001849 # Silence destructor error
1850 bufio.close = lambda: None
1851
Florent Xicluna109d5732012-07-07 17:03:22 +02001852 def test_max_buffer_size_removal(self):
1853 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001854 self.tp(self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001855
Benjamin Peterson68623612012-12-20 11:53:11 -06001856 def test_write_error_on_close(self):
1857 raw = self.MockRawIO()
1858 def bad_write(b):
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001859 raise OSError()
Benjamin Peterson68623612012-12-20 11:53:11 -06001860 raw.write = bad_write
1861 b = self.tp(raw)
1862 b.write(b'spam')
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001863 self.assertRaises(OSError, b.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06001864 self.assertTrue(b.closed)
1865
benfogle9703f092017-11-10 16:03:40 -05001866 def test_slow_close_from_thread(self):
1867 # Issue #31976
1868 rawio = self.SlowFlushRawIO()
1869 bufio = self.tp(rawio, 8)
1870 t = threading.Thread(target=bufio.close)
1871 t.start()
1872 rawio.in_flush.wait()
1873 self.assertRaises(ValueError, bufio.write, b'spam')
1874 self.assertTrue(bufio.closed)
1875 t.join()
1876
1877
Benjamin Peterson59406a92009-03-26 17:10:29 +00001878
Antoine Pitrou10f0c502012-07-29 19:02:46 +02001879class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001880 tp = io.BufferedWriter
1881
Gregory P. Smithe5796c42018-12-30 20:17:57 -08001882 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
1883 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001884 def test_constructor(self):
1885 BufferedWriterTest.test_constructor(self)
1886 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08001887 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001888 if sys.maxsize > 0x7FFFFFFF:
1889 rawio = self.MockRawIO()
1890 bufio = self.tp(rawio)
1891 self.assertRaises((OverflowError, MemoryError, ValueError),
1892 bufio.__init__, rawio, sys.maxsize)
1893
1894 def test_initialization(self):
1895 rawio = self.MockRawIO()
1896 bufio = self.tp(rawio)
1897 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
1898 self.assertRaises(ValueError, bufio.write, b"def")
1899 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
1900 self.assertRaises(ValueError, bufio.write, b"def")
1901 self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
1902 self.assertRaises(ValueError, bufio.write, b"def")
1903
1904 def test_garbage_collection(self):
1905 # C BufferedWriter objects are collected, and collecting them flushes
1906 # all data to disk.
1907 # The Python version has __del__, so it ends into gc.garbage instead
Serhiy Storchakae36837c2018-06-04 05:53:56 +03001908 self.addCleanup(support.unlink, support.TESTFN)
Antoine Pitrou796564c2013-07-30 19:59:21 +02001909 with support.check_warnings(('', ResourceWarning)):
1910 rawio = self.FileIO(support.TESTFN, "w+b")
1911 f = self.tp(rawio)
1912 f.write(b"123xxx")
1913 f.x = f
1914 wr = weakref.ref(f)
1915 del f
1916 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03001917 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00001918 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001919 self.assertEqual(f.read(), b"123xxx")
1920
R David Murray67bfe802013-02-23 21:51:05 -05001921 def test_args_error(self):
1922 # Issue #17275
1923 with self.assertRaisesRegex(TypeError, "BufferedWriter"):
1924 self.tp(io.BytesIO(), 1024, 1024, 1024)
1925
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00001926
1927class PyBufferedWriterTest(BufferedWriterTest):
1928 tp = pyio.BufferedWriter
Guido van Rossuma9e20242007-03-08 00:43:48 +00001929
Guido van Rossum01a27522007-03-07 01:00:12 +00001930class BufferedRWPairTest(unittest.TestCase):
Guido van Rossuma9e20242007-03-08 00:43:48 +00001931
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001932 def test_constructor(self):
1933 pair = self.tp(self.MockRawIO(), self.MockRawIO())
Benjamin Peterson92035012008-12-27 16:00:54 +00001934 self.assertFalse(pair.closed)
Guido van Rossum01a27522007-03-07 01:00:12 +00001935
Serhiy Storchaka61e24932014-02-12 10:52:35 +02001936 def test_uninitialized(self):
1937 pair = self.tp.__new__(self.tp)
1938 del pair
1939 pair = self.tp.__new__(self.tp)
1940 self.assertRaisesRegex((ValueError, AttributeError),
1941 'uninitialized|has no attribute',
1942 pair.read, 0)
1943 self.assertRaisesRegex((ValueError, AttributeError),
1944 'uninitialized|has no attribute',
1945 pair.write, b'')
1946 pair.__init__(self.MockRawIO(), self.MockRawIO())
1947 self.assertEqual(pair.read(0), b'')
1948 self.assertEqual(pair.write(b''), 0)
1949
Benjamin Petersond2e0c792009-05-01 20:40:59 +00001950 def test_detach(self):
1951 pair = self.tp(self.MockRawIO(), self.MockRawIO())
1952 self.assertRaises(self.UnsupportedOperation, pair.detach)
1953
Florent Xicluna109d5732012-07-07 17:03:22 +02001954 def test_constructor_max_buffer_size_removal(self):
1955 with self.assertRaises(TypeError):
Benjamin Peterson59406a92009-03-26 17:10:29 +00001956 self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
Benjamin Peterson59406a92009-03-26 17:10:29 +00001957
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001958 def test_constructor_with_not_readable(self):
1959 class NotReadable(MockRawIO):
1960 def readable(self):
1961 return False
1962
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001963 self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001964
1965 def test_constructor_with_not_writeable(self):
1966 class NotWriteable(MockRawIO):
1967 def writable(self):
1968 return False
1969
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02001970 self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001971
1972 def test_read(self):
1973 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1974
1975 self.assertEqual(pair.read(3), b"abc")
1976 self.assertEqual(pair.read(1), b"d")
1977 self.assertEqual(pair.read(), b"ef")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00001978 pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
1979 self.assertEqual(pair.read(None), b"abc")
1980
1981 def test_readlines(self):
1982 pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
1983 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1984 self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
1985 self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001986
1987 def test_read1(self):
1988 # .read1() is delegated to the underlying reader object, so this test
1989 # can be shallow.
1990 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
1991
1992 self.assertEqual(pair.read1(3), b"abc")
Martin Panterccb2c0e2016-10-20 23:48:14 +00001993 self.assertEqual(pair.read1(), b"def")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001994
1995 def test_readinto(self):
Martin Panter6bb91f32016-05-28 00:41:57 +00001996 for method in ("readinto", "readinto1"):
1997 with self.subTest(method):
1998 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
Antoine Pitroucf4c7492009-04-19 00:09:36 +00001999
Serhiy Storchaka5f1a5182016-09-11 14:41:02 +03002000 data = byteslike(b'\0' * 5)
Martin Panter6bb91f32016-05-28 00:41:57 +00002001 self.assertEqual(getattr(pair, method)(data), 5)
2002 self.assertEqual(bytes(data), b"abcde")
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002003
2004 def test_write(self):
2005 w = self.MockRawIO()
2006 pair = self.tp(self.MockRawIO(), w)
2007
2008 pair.write(b"abc")
2009 pair.flush()
Martin Panter6bb91f32016-05-28 00:41:57 +00002010 buffer = bytearray(b"def")
2011 pair.write(buffer)
2012 buffer[:] = b"***" # Overwrite our copy of the data
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002013 pair.flush()
2014 self.assertEqual(w._write_stack, [b"abc", b"def"])
2015
2016 def test_peek(self):
2017 pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
2018
2019 self.assertTrue(pair.peek(3).startswith(b"abc"))
2020 self.assertEqual(pair.read(3), b"abc")
2021
2022 def test_readable(self):
2023 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2024 self.assertTrue(pair.readable())
2025
2026 def test_writeable(self):
2027 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2028 self.assertTrue(pair.writable())
2029
2030 def test_seekable(self):
2031 # BufferedRWPairs are never seekable, even if their readers and writers
2032 # are.
2033 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2034 self.assertFalse(pair.seekable())
2035
2036 # .flush() is delegated to the underlying writer object and has been
2037 # tested in the test_write method.
2038
2039 def test_close_and_closed(self):
2040 pair = self.tp(self.MockRawIO(), self.MockRawIO())
2041 self.assertFalse(pair.closed)
2042 pair.close()
2043 self.assertTrue(pair.closed)
2044
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002045 def test_reader_close_error_on_close(self):
2046 def reader_close():
2047 reader_non_existing
2048 reader = self.MockRawIO()
2049 reader.close = reader_close
2050 writer = self.MockRawIO()
2051 pair = self.tp(reader, writer)
2052 with self.assertRaises(NameError) as err:
2053 pair.close()
2054 self.assertIn('reader_non_existing', str(err.exception))
2055 self.assertTrue(pair.closed)
2056 self.assertFalse(reader.closed)
2057 self.assertTrue(writer.closed)
2058
Victor Stinner472f7942019-04-12 21:58:24 +02002059 # Silence destructor error
2060 reader.close = lambda: None
2061
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002062 def test_writer_close_error_on_close(self):
2063 def writer_close():
2064 writer_non_existing
2065 reader = self.MockRawIO()
2066 writer = self.MockRawIO()
2067 writer.close = writer_close
2068 pair = self.tp(reader, writer)
2069 with self.assertRaises(NameError) as err:
2070 pair.close()
2071 self.assertIn('writer_non_existing', str(err.exception))
2072 self.assertFalse(pair.closed)
2073 self.assertTrue(reader.closed)
2074 self.assertFalse(writer.closed)
2075
Victor Stinner472f7942019-04-12 21:58:24 +02002076 # Silence destructor error
2077 writer.close = lambda: None
Victor Stinnerc15a6822019-06-13 00:23:49 +02002078 writer = None
2079
Miss Islington (bot)3b976d12019-06-14 09:59:54 -07002080 # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
Victor Stinnerc15a6822019-06-13 00:23:49 +02002081 with support.catch_unraisable_exception():
Miss Islington (bot)3b976d12019-06-14 09:59:54 -07002082 # Ignore BufferedRWPair unraisable exception
2083 with support.catch_unraisable_exception():
2084 pair = None
2085 support.gc_collect()
Victor Stinnerc15a6822019-06-13 00:23:49 +02002086 support.gc_collect()
Victor Stinner472f7942019-04-12 21:58:24 +02002087
Serhiy Storchaka7665be62015-03-24 23:21:57 +02002088 def test_reader_writer_close_error_on_close(self):
2089 def reader_close():
2090 reader_non_existing
2091 def writer_close():
2092 writer_non_existing
2093 reader = self.MockRawIO()
2094 reader.close = reader_close
2095 writer = self.MockRawIO()
2096 writer.close = writer_close
2097 pair = self.tp(reader, writer)
2098 with self.assertRaises(NameError) as err:
2099 pair.close()
2100 self.assertIn('reader_non_existing', str(err.exception))
2101 self.assertIsInstance(err.exception.__context__, NameError)
2102 self.assertIn('writer_non_existing', str(err.exception.__context__))
2103 self.assertFalse(pair.closed)
2104 self.assertFalse(reader.closed)
2105 self.assertFalse(writer.closed)
2106
Victor Stinner472f7942019-04-12 21:58:24 +02002107 # Silence destructor error
2108 reader.close = lambda: None
2109 writer.close = lambda: None
2110
Antoine Pitroucf4c7492009-04-19 00:09:36 +00002111 def test_isatty(self):
2112 class SelectableIsAtty(MockRawIO):
2113 def __init__(self, isatty):
2114 MockRawIO.__init__(self)
2115 self._isatty = isatty
2116
2117 def isatty(self):
2118 return self._isatty
2119
2120 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
2121 self.assertFalse(pair.isatty())
2122
2123 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
2124 self.assertTrue(pair.isatty())
2125
2126 pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
2127 self.assertTrue(pair.isatty())
2128
2129 pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
2130 self.assertTrue(pair.isatty())
Guido van Rossum01a27522007-03-07 01:00:12 +00002131
Benjamin Petersonbbd0a322014-09-29 22:46:57 -04002132 def test_weakref_clearing(self):
2133 brw = self.tp(self.MockRawIO(), self.MockRawIO())
2134 ref = weakref.ref(brw)
2135 brw = None
2136 ref = None # Shouldn't segfault.
2137
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002138class CBufferedRWPairTest(BufferedRWPairTest):
2139 tp = io.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002140
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002141class PyBufferedRWPairTest(BufferedRWPairTest):
2142 tp = pyio.BufferedRWPair
Guido van Rossuma9e20242007-03-08 00:43:48 +00002143
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002144
2145class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
2146 read_mode = "rb+"
2147 write_mode = "wb+"
2148
2149 def test_constructor(self):
2150 BufferedReaderTest.test_constructor(self)
2151 BufferedWriterTest.test_constructor(self)
2152
Serhiy Storchaka61e24932014-02-12 10:52:35 +02002153 def test_uninitialized(self):
2154 BufferedReaderTest.test_uninitialized(self)
2155 BufferedWriterTest.test_uninitialized(self)
2156
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002157 def test_read_and_write(self):
2158 raw = self.MockRawIO((b"asdf", b"ghjk"))
Benjamin Peterson59406a92009-03-26 17:10:29 +00002159 rw = self.tp(raw, 8)
Guido van Rossum01a27522007-03-07 01:00:12 +00002160
2161 self.assertEqual(b"as", rw.read(2))
2162 rw.write(b"ddd")
2163 rw.write(b"eee")
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002164 self.assertFalse(raw._write_stack) # Buffer writes
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002165 self.assertEqual(b"ghjk", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002166 self.assertEqual(b"dddeee", raw._write_stack[0])
Guido van Rossum01a27522007-03-07 01:00:12 +00002167
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002168 def test_seek_and_tell(self):
2169 raw = self.BytesIO(b"asdfghjkl")
2170 rw = self.tp(raw)
Guido van Rossum01a27522007-03-07 01:00:12 +00002171
Ezio Melottib3aedd42010-11-20 19:04:17 +00002172 self.assertEqual(b"as", rw.read(2))
2173 self.assertEqual(2, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002174 rw.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002175 self.assertEqual(b"asdf", rw.read(4))
Guido van Rossum01a27522007-03-07 01:00:12 +00002176
Antoine Pitroue05565e2011-08-20 14:39:23 +02002177 rw.write(b"123f")
Guido van Rossum01a27522007-03-07 01:00:12 +00002178 rw.seek(0, 0)
Antoine Pitroue05565e2011-08-20 14:39:23 +02002179 self.assertEqual(b"asdf123fl", rw.read())
Ezio Melottib3aedd42010-11-20 19:04:17 +00002180 self.assertEqual(9, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002181 rw.seek(-4, 2)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002182 self.assertEqual(5, rw.tell())
Guido van Rossum01a27522007-03-07 01:00:12 +00002183 rw.seek(2, 1)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002184 self.assertEqual(7, rw.tell())
2185 self.assertEqual(b"fl", rw.read(11))
Antoine Pitroue05565e2011-08-20 14:39:23 +02002186 rw.flush()
2187 self.assertEqual(b"asdf123fl", raw.getvalue())
2188
Christian Heimes8e42a0a2007-11-08 18:04:45 +00002189 self.assertRaises(TypeError, rw.seek, 0.0)
Guido van Rossum01a27522007-03-07 01:00:12 +00002190
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002191 def check_flush_and_read(self, read_func):
2192 raw = self.BytesIO(b"abcdefghi")
2193 bufio = self.tp(raw)
2194
Ezio Melottib3aedd42010-11-20 19:04:17 +00002195 self.assertEqual(b"ab", read_func(bufio, 2))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002196 bufio.write(b"12")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002197 self.assertEqual(b"ef", read_func(bufio, 2))
2198 self.assertEqual(6, bufio.tell())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002199 bufio.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002200 self.assertEqual(6, bufio.tell())
2201 self.assertEqual(b"ghi", read_func(bufio))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002202 raw.seek(0, 0)
2203 raw.write(b"XYZ")
2204 # flush() resets the read buffer
2205 bufio.flush()
2206 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002207 self.assertEqual(b"XYZ", read_func(bufio, 3))
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002208
2209 def test_flush_and_read(self):
2210 self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
2211
2212 def test_flush_and_readinto(self):
2213 def _readinto(bufio, n=-1):
2214 b = bytearray(n if n >= 0 else 9999)
2215 n = bufio.readinto(b)
2216 return bytes(b[:n])
2217 self.check_flush_and_read(_readinto)
2218
2219 def test_flush_and_peek(self):
2220 def _peek(bufio, n=-1):
2221 # This relies on the fact that the buffer can contain the whole
2222 # raw stream, otherwise peek() can return less.
2223 b = bufio.peek(n)
2224 if n != -1:
2225 b = b[:n]
2226 bufio.seek(len(b), 1)
2227 return b
2228 self.check_flush_and_read(_peek)
2229
2230 def test_flush_and_write(self):
2231 raw = self.BytesIO(b"abcdefghi")
2232 bufio = self.tp(raw)
2233
2234 bufio.write(b"123")
2235 bufio.flush()
2236 bufio.write(b"45")
2237 bufio.flush()
2238 bufio.seek(0, 0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002239 self.assertEqual(b"12345fghi", raw.getvalue())
2240 self.assertEqual(b"12345fghi", bufio.read())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002241
2242 def test_threads(self):
2243 BufferedReaderTest.test_threads(self)
2244 BufferedWriterTest.test_threads(self)
2245
2246 def test_writes_and_peek(self):
2247 def _peek(bufio):
2248 bufio.peek(1)
2249 self.check_writes(_peek)
2250 def _peek(bufio):
2251 pos = bufio.tell()
2252 bufio.seek(-1, 1)
2253 bufio.peek(1)
2254 bufio.seek(pos, 0)
2255 self.check_writes(_peek)
2256
2257 def test_writes_and_reads(self):
2258 def _read(bufio):
2259 bufio.seek(-1, 1)
2260 bufio.read(1)
2261 self.check_writes(_read)
2262
2263 def test_writes_and_read1s(self):
2264 def _read1(bufio):
2265 bufio.seek(-1, 1)
2266 bufio.read1(1)
2267 self.check_writes(_read1)
2268
2269 def test_writes_and_readintos(self):
2270 def _read(bufio):
2271 bufio.seek(-1, 1)
2272 bufio.readinto(bytearray(1))
2273 self.check_writes(_read)
2274
Antoine Pitroua0ceb732009-08-06 20:29:56 +00002275 def test_write_after_readahead(self):
2276 # Issue #6629: writing after the buffer was filled by readahead should
2277 # first rewind the raw stream.
2278 for overwrite_size in [1, 5]:
2279 raw = self.BytesIO(b"A" * 10)
2280 bufio = self.tp(raw, 4)
2281 # Trigger readahead
2282 self.assertEqual(bufio.read(1), b"A")
2283 self.assertEqual(bufio.tell(), 1)
2284 # Overwriting should rewind the raw stream if it needs so
2285 bufio.write(b"B" * overwrite_size)
2286 self.assertEqual(bufio.tell(), overwrite_size + 1)
2287 # If the write size was smaller than the buffer size, flush() and
2288 # check that rewind happens.
2289 bufio.flush()
2290 self.assertEqual(bufio.tell(), overwrite_size + 1)
2291 s = raw.getvalue()
2292 self.assertEqual(s,
2293 b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
2294
Antoine Pitrou7c404892011-05-13 00:13:33 +02002295 def test_write_rewind_write(self):
2296 # Various combinations of reading / writing / seeking backwards / writing again
2297 def mutate(bufio, pos1, pos2):
2298 assert pos2 >= pos1
2299 # Fill the buffer
2300 bufio.seek(pos1)
2301 bufio.read(pos2 - pos1)
2302 bufio.write(b'\x02')
2303 # This writes earlier than the previous write, but still inside
2304 # the buffer.
2305 bufio.seek(pos1)
2306 bufio.write(b'\x01')
2307
2308 b = b"\x80\x81\x82\x83\x84"
2309 for i in range(0, len(b)):
2310 for j in range(i, len(b)):
2311 raw = self.BytesIO(b)
2312 bufio = self.tp(raw, 100)
2313 mutate(bufio, i, j)
2314 bufio.flush()
2315 expected = bytearray(b)
2316 expected[j] = 2
2317 expected[i] = 1
2318 self.assertEqual(raw.getvalue(), expected,
2319 "failed result for i=%d, j=%d" % (i, j))
2320
Antoine Pitrou905a2ff2010-01-31 22:47:27 +00002321 def test_truncate_after_read_or_write(self):
2322 raw = self.BytesIO(b"A" * 10)
2323 bufio = self.tp(raw, 100)
2324 self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
2325 self.assertEqual(bufio.truncate(), 2)
2326 self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
2327 self.assertEqual(bufio.truncate(), 4)
2328
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002329 def test_misbehaved_io(self):
2330 BufferedReaderTest.test_misbehaved_io(self)
2331 BufferedWriterTest.test_misbehaved_io(self)
2332
Antoine Pitroue05565e2011-08-20 14:39:23 +02002333 def test_interleaved_read_write(self):
2334 # Test for issue #12213
2335 with self.BytesIO(b'abcdefgh') as raw:
2336 with self.tp(raw, 100) as f:
2337 f.write(b"1")
2338 self.assertEqual(f.read(1), b'b')
2339 f.write(b'2')
2340 self.assertEqual(f.read1(1), b'd')
2341 f.write(b'3')
2342 buf = bytearray(1)
2343 f.readinto(buf)
2344 self.assertEqual(buf, b'f')
2345 f.write(b'4')
2346 self.assertEqual(f.peek(1), b'h')
2347 f.flush()
2348 self.assertEqual(raw.getvalue(), b'1b2d3f4h')
2349
2350 with self.BytesIO(b'abc') as raw:
2351 with self.tp(raw, 100) as f:
2352 self.assertEqual(f.read(1), b'a')
2353 f.write(b"2")
2354 self.assertEqual(f.read(1), b'c')
2355 f.flush()
2356 self.assertEqual(raw.getvalue(), b'a2c')
2357
2358 def test_interleaved_readline_write(self):
2359 with self.BytesIO(b'ab\ncdef\ng\n') as raw:
2360 with self.tp(raw) as f:
2361 f.write(b'1')
2362 self.assertEqual(f.readline(), b'b\n')
2363 f.write(b'2')
2364 self.assertEqual(f.readline(), b'def\n')
2365 f.write(b'3')
2366 self.assertEqual(f.readline(), b'\n')
2367 f.flush()
2368 self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
2369
Antoine Pitrou0d739d72010-09-05 23:01:12 +00002370 # You can't construct a BufferedRandom over a non-seekable stream.
2371 test_unseekable = None
2372
R David Murray67bfe802013-02-23 21:51:05 -05002373
Antoine Pitrou10f0c502012-07-29 19:02:46 +02002374class CBufferedRandomTest(BufferedRandomTest, SizeofTest):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002375 tp = io.BufferedRandom
2376
Gregory P. Smithe5796c42018-12-30 20:17:57 -08002377 @unittest.skipIf(MEMORY_SANITIZER, "MSan defaults to crashing "
2378 "instead of returning NULL for malloc failure.")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002379 def test_constructor(self):
2380 BufferedRandomTest.test_constructor(self)
2381 # The allocation can succeed on 32-bit builds, e.g. with more
Victor Stinner8c663fd2017-11-08 14:44:44 -08002382 # than 2 GiB RAM and a 64-bit kernel.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002383 if sys.maxsize > 0x7FFFFFFF:
2384 rawio = self.MockRawIO()
2385 bufio = self.tp(rawio)
2386 self.assertRaises((OverflowError, MemoryError, ValueError),
2387 bufio.__init__, rawio, sys.maxsize)
2388
2389 def test_garbage_collection(self):
2390 CBufferedReaderTest.test_garbage_collection(self)
2391 CBufferedWriterTest.test_garbage_collection(self)
2392
R David Murray67bfe802013-02-23 21:51:05 -05002393 def test_args_error(self):
2394 # Issue #17275
2395 with self.assertRaisesRegex(TypeError, "BufferedRandom"):
2396 self.tp(io.BytesIO(), 1024, 1024, 1024)
2397
2398
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002399class PyBufferedRandomTest(BufferedRandomTest):
2400 tp = pyio.BufferedRandom
2401
2402
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002403# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
2404# properties:
2405# - A single output character can correspond to many bytes of input.
2406# - The number of input bytes to complete the character can be
2407# undetermined until the last input byte is received.
2408# - The number of input bytes can vary depending on previous input.
2409# - A single input byte can correspond to many characters of output.
2410# - The number of output characters can be undetermined until the
2411# last input byte is received.
2412# - The number of output characters can vary depending on previous input.
2413
2414class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
2415 """
2416 For testing seek/tell behavior with a stateful, buffering decoder.
2417
2418 Input is a sequence of words. Words may be fixed-length (length set
2419 by input) or variable-length (period-terminated). In variable-length
2420 mode, extra periods are ignored. Possible words are:
2421 - 'i' followed by a number sets the input length, I (maximum 99).
2422 When I is set to 0, words are space-terminated.
2423 - 'o' followed by a number sets the output length, O (maximum 99).
2424 - Any other word is converted into a word followed by a period on
2425 the output. The output word consists of the input word truncated
2426 or padded out with hyphens to make its length equal to O. If O
2427 is 0, the word is output verbatim without truncating or padding.
2428 I and O are initially set to 1. When I changes, any buffered input is
2429 re-scanned according to the new I. EOF also terminates the last word.
2430 """
2431
2432 def __init__(self, errors='strict'):
Christian Heimesab568872008-03-23 02:11:13 +00002433 codecs.IncrementalDecoder.__init__(self, errors)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002434 self.reset()
2435
2436 def __repr__(self):
2437 return '<SID %x>' % id(self)
2438
2439 def reset(self):
2440 self.i = 1
2441 self.o = 1
2442 self.buffer = bytearray()
2443
2444 def getstate(self):
2445 i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
2446 return bytes(self.buffer), i*100 + o
2447
2448 def setstate(self, state):
2449 buffer, io = state
2450 self.buffer = bytearray(buffer)
2451 i, o = divmod(io, 100)
2452 self.i, self.o = i ^ 1, o ^ 1
2453
2454 def decode(self, input, final=False):
2455 output = ''
2456 for b in input:
2457 if self.i == 0: # variable-length, terminated with period
2458 if b == ord('.'):
2459 if self.buffer:
2460 output += self.process_word()
2461 else:
2462 self.buffer.append(b)
2463 else: # fixed-length, terminate after self.i bytes
2464 self.buffer.append(b)
2465 if len(self.buffer) == self.i:
2466 output += self.process_word()
2467 if final and self.buffer: # EOF terminates the last word
2468 output += self.process_word()
2469 return output
2470
2471 def process_word(self):
2472 output = ''
2473 if self.buffer[0] == ord('i'):
2474 self.i = min(99, int(self.buffer[1:] or 0)) # set input length
2475 elif self.buffer[0] == ord('o'):
2476 self.o = min(99, int(self.buffer[1:] or 0)) # set output length
2477 else:
2478 output = self.buffer.decode('ascii')
2479 if len(output) < self.o:
2480 output += '-'*self.o # pad out with hyphens
2481 if self.o:
2482 output = output[:self.o] # truncate to output length
2483 output += '.'
2484 self.buffer = bytearray()
2485 return output
2486
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002487 codecEnabled = False
2488
2489 @classmethod
2490 def lookupTestDecoder(cls, name):
2491 if cls.codecEnabled and name == 'test_decoder':
Antoine Pitrou180a3362008-12-14 16:36:46 +00002492 latin1 = codecs.lookup('latin-1')
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002493 return codecs.CodecInfo(
Antoine Pitrou180a3362008-12-14 16:36:46 +00002494 name='test_decoder', encode=latin1.encode, decode=None,
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00002495 incrementalencoder=None,
2496 streamreader=None, streamwriter=None,
2497 incrementaldecoder=cls)
2498
2499# Register the previous decoder for testing.
2500# Disabled by default, tests will enable it.
2501codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
2502
2503
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002504class StatefulIncrementalDecoderTest(unittest.TestCase):
2505 """
2506 Make sure the StatefulIncrementalDecoder actually works.
2507 """
2508
2509 test_cases = [
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002510 # I=1, O=1 (fixed-length input == fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002511 (b'abcd', False, 'a.b.c.d.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002512 # I=0, O=0 (variable-length input, variable-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002513 (b'oiabcd', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002514 # I=0, O=0 (should ignore extra periods)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002515 (b'oi...abcd...', True, 'abcd.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002516 # I=0, O=6 (variable-length input, fixed-length output)
2517 (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
2518 # I=2, O=6 (fixed-length input < fixed-length output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002519 (b'i.i2.o6xyz', True, 'xy----.z-----.'),
Ka-Ping Yeed24a5b62008-03-20 10:51:27 +00002520 # I=6, O=3 (fixed-length input > fixed-length output)
2521 (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
2522 # I=0, then 3; O=29, then 15 (with longer output)
2523 (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
2524 'a----------------------------.' +
2525 'b----------------------------.' +
2526 'cde--------------------------.' +
2527 'abcdefghijabcde.' +
2528 'a.b------------.' +
2529 '.c.------------.' +
2530 'd.e------------.' +
2531 'k--------------.' +
2532 'l--------------.' +
2533 'm--------------.')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002534 ]
2535
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002536 def test_decoder(self):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002537 # Try a few one-shot test cases.
2538 for input, eof, output in self.test_cases:
2539 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002540 self.assertEqual(d.decode(input, eof), output)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002541
2542 # Also test an unfinished decode, followed by forcing EOF.
2543 d = StatefulIncrementalDecoder()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002544 self.assertEqual(d.decode(b'oiabcd'), '')
2545 self.assertEqual(d.decode(b'', 1), 'abcd.')
Guido van Rossum78892e42007-04-06 17:31:18 +00002546
2547class TextIOWrapperTest(unittest.TestCase):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002548
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002549 def setUp(self):
2550 self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
2551 self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002552 support.unlink(support.TESTFN)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00002553
Guido van Rossumd0712812007-04-11 16:32:43 +00002554 def tearDown(self):
Benjamin Petersonee8712c2008-05-20 21:35:26 +00002555 support.unlink(support.TESTFN)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002556
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002557 def test_constructor(self):
2558 r = self.BytesIO(b"\xc3\xa9\n\n")
2559 b = self.BufferedReader(r, 1000)
2560 t = self.TextIOWrapper(b)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002561 t.__init__(b, encoding="latin-1", newline="\r\n")
2562 self.assertEqual(t.encoding, "latin-1")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002563 self.assertEqual(t.line_buffering, False)
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002564 t.__init__(b, encoding="utf-8", line_buffering=True)
2565 self.assertEqual(t.encoding, "utf-8")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002566 self.assertEqual(t.line_buffering, True)
2567 self.assertEqual("\xe9\n", t.readline())
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002568 self.assertRaises(TypeError, t.__init__, b, newline=42)
2569 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
2570
Serhiy Storchakaf24131f2015-04-16 11:19:43 +03002571 def test_uninitialized(self):
2572 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2573 del t
2574 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
2575 self.assertRaises(Exception, repr, t)
2576 self.assertRaisesRegex((ValueError, AttributeError),
2577 'uninitialized|has no attribute',
2578 t.read, 0)
2579 t.__init__(self.MockRawIO())
2580 self.assertEqual(t.read(0), '')
2581
Nick Coghlana9b15242014-02-04 22:11:18 +10002582 def test_non_text_encoding_codecs_are_rejected(self):
2583 # Ensure the constructor complains if passed a codec that isn't
2584 # marked as a text encoding
2585 # http://bugs.python.org/issue20404
2586 r = self.BytesIO()
2587 b = self.BufferedWriter(r)
2588 with self.assertRaisesRegex(LookupError, "is not a text encoding"):
2589 self.TextIOWrapper(b, encoding="hex")
2590
Benjamin Petersond2e0c792009-05-01 20:40:59 +00002591 def test_detach(self):
2592 r = self.BytesIO()
2593 b = self.BufferedWriter(r)
2594 t = self.TextIOWrapper(b)
2595 self.assertIs(t.detach(), b)
2596
2597 t = self.TextIOWrapper(b, encoding="ascii")
2598 t.write("howdy")
2599 self.assertFalse(r.getvalue())
2600 t.detach()
2601 self.assertEqual(r.getvalue(), b"howdy")
2602 self.assertRaises(ValueError, t.detach)
2603
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002604 # Operations independent of the detached stream should still work
2605 repr(t)
2606 self.assertEqual(t.encoding, "ascii")
2607 self.assertEqual(t.errors, "strict")
2608 self.assertFalse(t.line_buffering)
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002609 self.assertFalse(t.write_through)
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002610
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002611 def test_repr(self):
2612 raw = self.BytesIO("hello".encode("utf-8"))
2613 b = self.BufferedReader(raw)
2614 t = self.TextIOWrapper(b, encoding="utf-8")
Antoine Pitrou716c4442009-05-23 19:04:03 +00002615 modname = self.TextIOWrapper.__module__
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002616 self.assertRegex(repr(t),
2617 r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002618 raw.name = "dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002619 self.assertRegex(repr(t),
2620 r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
Antoine Pitroua4815ca2011-01-09 20:38:15 +00002621 t.mode = "r"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002622 self.assertRegex(repr(t),
2623 r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
Antoine Pitrou716c4442009-05-23 19:04:03 +00002624 raw.name = b"dummy"
Max Bernsteinccb7ca72019-05-21 10:09:21 -07002625 self.assertRegex(repr(t),
2626 r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
Benjamin Petersonc4c0eae2009-03-09 00:07:03 +00002627
Benjamin Peterson10e76b62014-12-21 20:51:50 -06002628 t.buffer.detach()
2629 repr(t) # Should not raise an exception
2630
Serhiy Storchakaa5af6e12017-03-19 19:25:29 +02002631 def test_recursive_repr(self):
2632 # Issue #25455
2633 raw = self.BytesIO()
2634 t = self.TextIOWrapper(raw)
2635 with support.swap_attr(raw, 'name', t):
2636 try:
2637 repr(t) # Should not crash
2638 except RuntimeError:
2639 pass
2640
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002641 def test_line_buffering(self):
2642 r = self.BytesIO()
2643 b = self.BufferedWriter(r, 1000)
2644 t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002645 t.write("X")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002646 self.assertEqual(r.getvalue(), b"") # No flush happened
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002647 t.write("Y\nZ")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002648 self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002649 t.write("A\rB")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002650 self.assertEqual(r.getvalue(), b"XY\nZA\rB")
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002651
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02002652 def test_reconfigure_line_buffering(self):
2653 r = self.BytesIO()
2654 b = self.BufferedWriter(r, 1000)
2655 t = self.TextIOWrapper(b, newline="\n", line_buffering=False)
2656 t.write("AB\nC")
2657 self.assertEqual(r.getvalue(), b"")
2658
2659 t.reconfigure(line_buffering=True) # implicit flush
2660 self.assertEqual(r.getvalue(), b"AB\nC")
2661 t.write("DEF\nG")
2662 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2663 t.write("H")
2664 self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
2665 t.reconfigure(line_buffering=False) # implicit flush
2666 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2667 t.write("IJ")
2668 self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
2669
2670 # Keeping default value
2671 t.reconfigure()
2672 t.reconfigure(line_buffering=None)
2673 self.assertEqual(t.line_buffering, False)
2674 t.reconfigure(line_buffering=True)
2675 t.reconfigure()
2676 t.reconfigure(line_buffering=None)
2677 self.assertEqual(t.line_buffering, True)
2678
Victor Stinner91106cd2017-12-13 12:29:09 +01002679 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Victor Stinnerf86a5e82012-06-05 13:43:22 +02002680 def test_default_encoding(self):
2681 old_environ = dict(os.environ)
2682 try:
2683 # try to get a user preferred encoding different than the current
2684 # locale encoding to check that TextIOWrapper() uses the current
2685 # locale encoding and not the user preferred encoding
2686 for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
2687 if key in os.environ:
2688 del os.environ[key]
2689
2690 current_locale_encoding = locale.getpreferredencoding(False)
2691 b = self.BytesIO()
2692 t = self.TextIOWrapper(b)
2693 self.assertEqual(t.encoding, current_locale_encoding)
2694 finally:
2695 os.environ.clear()
2696 os.environ.update(old_environ)
2697
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002698 @support.cpython_only
Victor Stinner91106cd2017-12-13 12:29:09 +01002699 @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
Serhiy Storchaka78980432013-01-15 01:12:17 +02002700 def test_device_encoding(self):
Serhiy Storchaka5cfc79d2014-02-07 10:06:39 +02002701 # Issue 15989
2702 import _testcapi
Serhiy Storchaka78980432013-01-15 01:12:17 +02002703 b = self.BytesIO()
2704 b.fileno = lambda: _testcapi.INT_MAX + 1
2705 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2706 b.fileno = lambda: _testcapi.UINT_MAX + 1
2707 self.assertRaises(OverflowError, self.TextIOWrapper, b)
2708
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002709 def test_encoding(self):
2710 # Check the encoding attribute is always set, and valid
2711 b = self.BytesIO()
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002712 t = self.TextIOWrapper(b, encoding="utf-8")
2713 self.assertEqual(t.encoding, "utf-8")
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002714 t = self.TextIOWrapper(b)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03002715 self.assertIsNotNone(t.encoding)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002716 codecs.lookup(t.encoding)
2717
2718 def test_encoding_errors_reading(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002719 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002720 b = self.BytesIO(b"abc\n\xff\n")
2721 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002722 self.assertRaises(UnicodeError, t.read)
2723 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002724 b = self.BytesIO(b"abc\n\xff\n")
2725 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002726 self.assertRaises(UnicodeError, t.read)
2727 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002728 b = self.BytesIO(b"abc\n\xff\n")
2729 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002730 self.assertEqual(t.read(), "abc\n\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002731 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002732 b = self.BytesIO(b"abc\n\xff\n")
2733 t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
Ezio Melottib3aedd42010-11-20 19:04:17 +00002734 self.assertEqual(t.read(), "abc\n\ufffd\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002735
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002736 def test_encoding_errors_writing(self):
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002737 # (1) default
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002738 b = self.BytesIO()
2739 t = self.TextIOWrapper(b, encoding="ascii")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002740 self.assertRaises(UnicodeError, t.write, "\xff")
2741 # (2) explicit strict
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002742 b = self.BytesIO()
2743 t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002744 self.assertRaises(UnicodeError, t.write, "\xff")
2745 # (3) ignore
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002746 b = self.BytesIO()
2747 t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002748 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002749 t.write("abc\xffdef\n")
2750 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002751 self.assertEqual(b.getvalue(), b"abcdef\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002752 # (4) replace
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002753 b = self.BytesIO()
2754 t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
Guido van Rossumf64db9f2007-12-06 01:04:26 +00002755 newline="\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002756 t.write("abc\xffdef\n")
2757 t.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002758 self.assertEqual(b.getvalue(), b"abc?def\n")
Guido van Rossume7fc50f2007-12-03 22:54:21 +00002759
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002760 def test_newlines(self):
Guido van Rossum78892e42007-04-06 17:31:18 +00002761 input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
2762
2763 tests = [
2764 [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
Guido van Rossum8358db22007-08-18 21:39:55 +00002765 [ '', input_lines ],
2766 [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
2767 [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
2768 [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
Guido van Rossum78892e42007-04-06 17:31:18 +00002769 ]
Antoine Pitrou180a3362008-12-14 16:36:46 +00002770 encodings = (
2771 'utf-8', 'latin-1',
2772 'utf-16', 'utf-16-le', 'utf-16-be',
2773 'utf-32', 'utf-32-le', 'utf-32-be',
2774 )
Guido van Rossum78892e42007-04-06 17:31:18 +00002775
Guido van Rossum8358db22007-08-18 21:39:55 +00002776 # Try a range of buffer sizes to test the case where \r is the last
Guido van Rossum78892e42007-04-06 17:31:18 +00002777 # character in TextIOWrapper._pending_line.
2778 for encoding in encodings:
Guido van Rossum8358db22007-08-18 21:39:55 +00002779 # XXX: str.encode() should return bytes
2780 data = bytes(''.join(input_lines).encode(encoding))
Guido van Rossum78892e42007-04-06 17:31:18 +00002781 for do_reads in (False, True):
Guido van Rossum8358db22007-08-18 21:39:55 +00002782 for bufsize in range(1, 10):
2783 for newline, exp_lines in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002784 bufio = self.BufferedReader(self.BytesIO(data), bufsize)
2785 textio = self.TextIOWrapper(bufio, newline=newline,
Guido van Rossum78892e42007-04-06 17:31:18 +00002786 encoding=encoding)
2787 if do_reads:
2788 got_lines = []
2789 while True:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002790 c2 = textio.read(2)
Guido van Rossum78892e42007-04-06 17:31:18 +00002791 if c2 == '':
2792 break
Ezio Melottib3aedd42010-11-20 19:04:17 +00002793 self.assertEqual(len(c2), 2)
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002794 got_lines.append(c2 + textio.readline())
Guido van Rossum78892e42007-04-06 17:31:18 +00002795 else:
Guido van Rossum76c5d4d2007-04-06 19:10:29 +00002796 got_lines = list(textio)
Guido van Rossum78892e42007-04-06 17:31:18 +00002797
2798 for got_line, exp_line in zip(got_lines, exp_lines):
Ezio Melottib3aedd42010-11-20 19:04:17 +00002799 self.assertEqual(got_line, exp_line)
2800 self.assertEqual(len(got_lines), len(exp_lines))
Guido van Rossum78892e42007-04-06 17:31:18 +00002801
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002802 def test_newlines_input(self):
2803 testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
Guido van Rossum8358db22007-08-18 21:39:55 +00002804 normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2805 for newline, expected in [
Ezio Melottid8b509b2011-09-28 17:37:55 +03002806 (None, normalized.decode("ascii").splitlines(keepends=True)),
2807 ("", testdata.decode("ascii").splitlines(keepends=True)),
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002808 ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2809 ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
2810 ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
Guido van Rossum8358db22007-08-18 21:39:55 +00002811 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002812 buf = self.BytesIO(testdata)
2813 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002814 self.assertEqual(txt.readlines(), expected)
Guido van Rossum8358db22007-08-18 21:39:55 +00002815 txt.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002816 self.assertEqual(txt.read(), "".join(expected))
Guido van Rossum8358db22007-08-18 21:39:55 +00002817
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002818 def test_newlines_output(self):
2819 testdict = {
2820 "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2821 "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
2822 "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
2823 "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
2824 }
2825 tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
2826 for newline, expected in tests:
2827 buf = self.BytesIO()
2828 txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
2829 txt.write("AAA\nB")
2830 txt.write("BB\nCCC\n")
2831 txt.write("X\rY\r\nZ")
2832 txt.flush()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002833 self.assertEqual(buf.closed, False)
2834 self.assertEqual(buf.getvalue(), expected)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002835
2836 def test_destructor(self):
2837 l = []
2838 base = self.BytesIO
2839 class MyBytesIO(base):
2840 def close(self):
2841 l.append(self.getvalue())
2842 base.close(self)
2843 b = MyBytesIO()
2844 t = self.TextIOWrapper(b, encoding="ascii")
2845 t.write("abc")
2846 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002847 support.gc_collect()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002848 self.assertEqual([b"abc"], l)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002849
2850 def test_override_destructor(self):
2851 record = []
2852 class MyTextIO(self.TextIOWrapper):
2853 def __del__(self):
2854 record.append(1)
2855 try:
2856 f = super().__del__
2857 except AttributeError:
2858 pass
2859 else:
2860 f()
2861 def close(self):
2862 record.append(2)
2863 super().close()
2864 def flush(self):
2865 record.append(3)
2866 super().flush()
2867 b = self.BytesIO()
2868 t = MyTextIO(b, encoding="ascii")
2869 del t
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00002870 support.gc_collect()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002871 self.assertEqual(record, [1, 2, 3])
2872
2873 def test_error_through_destructor(self):
2874 # Test that the exception state is not modified by a destructor,
2875 # even if close() fails.
2876 rawio = self.CloseFailureIO()
Victor Stinner6dbbe742019-05-25 00:09:38 +02002877 with support.catch_unraisable_exception() as cm:
2878 with self.assertRaises(AttributeError):
2879 self.TextIOWrapper(rawio).xyzzy
Victor Stinnerbc2aa812019-05-23 03:45:09 +02002880
2881 if not IOBASE_EMITS_UNRAISABLE:
2882 self.assertIsNone(cm.unraisable)
2883 elif cm.unraisable is not None:
2884 self.assertEqual(cm.unraisable.exc_type, OSError)
Guido van Rossum8358db22007-08-18 21:39:55 +00002885
Guido van Rossum9b76da62007-04-11 01:09:03 +00002886 # Systematic tests of the text I/O API
2887
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002888 def test_basic_io(self):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002889 for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002890 for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002891 f = self.open(support.TESTFN, "w+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002892 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002893 self.assertEqual(f.write("abc"), 3)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002894 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002895 f = self.open(support.TESTFN, "r+", encoding=enc)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002896 f._CHUNK_SIZE = chunksize
Ezio Melottib3aedd42010-11-20 19:04:17 +00002897 self.assertEqual(f.tell(), 0)
2898 self.assertEqual(f.read(), "abc")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002899 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00002900 self.assertEqual(f.seek(0), 0)
2901 self.assertEqual(f.read(None), "abc")
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00002902 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002903 self.assertEqual(f.read(2), "ab")
2904 self.assertEqual(f.read(1), "c")
2905 self.assertEqual(f.read(1), "")
2906 self.assertEqual(f.read(), "")
2907 self.assertEqual(f.tell(), cookie)
2908 self.assertEqual(f.seek(0), 0)
2909 self.assertEqual(f.seek(0, 2), cookie)
2910 self.assertEqual(f.write("def"), 3)
2911 self.assertEqual(f.seek(cookie), cookie)
2912 self.assertEqual(f.read(), "def")
Guido van Rossum9b76da62007-04-11 01:09:03 +00002913 if enc.startswith("utf"):
2914 self.multi_line_test(f, enc)
2915 f.close()
2916
2917 def multi_line_test(self, f, enc):
2918 f.seek(0)
2919 f.truncate()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002920 sample = "s\xff\u0fff\uffff"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002921 wlines = []
Guido van Rossumcba608c2007-04-11 14:19:59 +00002922 for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002923 chars = []
Guido van Rossum805365e2007-05-07 22:24:25 +00002924 for i in range(size):
Guido van Rossum9b76da62007-04-11 01:09:03 +00002925 chars.append(sample[i % len(sample)])
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002926 line = "".join(chars) + "\n"
Guido van Rossum9b76da62007-04-11 01:09:03 +00002927 wlines.append((f.tell(), line))
2928 f.write(line)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002929 f.seek(0)
2930 rlines = []
2931 while True:
2932 pos = f.tell()
2933 line = f.readline()
2934 if not line:
Guido van Rossum9b76da62007-04-11 01:09:03 +00002935 break
2936 rlines.append((pos, line))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002937 self.assertEqual(rlines, wlines)
Guido van Rossum9b76da62007-04-11 01:09:03 +00002938
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002939 def test_telling(self):
Marc-André Lemburg8f36af72011-02-25 15:42:01 +00002940 f = self.open(support.TESTFN, "w+", encoding="utf-8")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002941 p0 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002942 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002943 p1 = f.tell()
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002944 f.write("\xff\n")
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002945 p2 = f.tell()
2946 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002947 self.assertEqual(f.tell(), p0)
2948 self.assertEqual(f.readline(), "\xff\n")
2949 self.assertEqual(f.tell(), p1)
2950 self.assertEqual(f.readline(), "\xff\n")
2951 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002952 f.seek(0)
2953 for line in f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00002954 self.assertEqual(line, "\xff\n")
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002955 self.assertRaises(OSError, f.tell)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002956 self.assertEqual(f.tell(), p2)
Guido van Rossumb9c4c3e2007-04-11 16:07:50 +00002957 f.close()
2958
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002959 def test_seeking(self):
2960 chunk_size = _default_chunk_size()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002961 prefix_size = chunk_size - 2
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002962 u_prefix = "a" * prefix_size
Guido van Rossumd76e7792007-04-17 02:38:04 +00002963 prefix = bytes(u_prefix.encode("utf-8"))
Ezio Melottib3aedd42010-11-20 19:04:17 +00002964 self.assertEqual(len(u_prefix), len(prefix))
Guido van Rossumef87d6e2007-05-02 19:09:54 +00002965 u_suffix = "\u8888\n"
Guido van Rossumd76e7792007-04-17 02:38:04 +00002966 suffix = bytes(u_suffix.encode("utf-8"))
2967 line = prefix + suffix
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002968 with self.open(support.TESTFN, "wb") as f:
2969 f.write(line*2)
2970 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2971 s = f.read(prefix_size)
Ezio Melottib3aedd42010-11-20 19:04:17 +00002972 self.assertEqual(s, str(prefix, "ascii"))
2973 self.assertEqual(f.tell(), prefix_size)
2974 self.assertEqual(f.readline(), u_suffix)
Guido van Rossumd76e7792007-04-17 02:38:04 +00002975
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002976 def test_seeking_too(self):
Guido van Rossumd76e7792007-04-17 02:38:04 +00002977 # Regression test for a specific bug
2978 data = b'\xe0\xbf\xbf\n'
Benjamin Petersonebe5d8a2010-10-31 01:30:11 +00002979 with self.open(support.TESTFN, "wb") as f:
2980 f.write(data)
2981 with self.open(support.TESTFN, "r", encoding="utf-8") as f:
2982 f._CHUNK_SIZE # Just test that it exists
2983 f._CHUNK_SIZE = 2
2984 f.readline()
2985 f.tell()
Guido van Rossumd76e7792007-04-17 02:38:04 +00002986
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002987 def test_seek_and_tell(self):
2988 #Test seek/tell using the StatefulIncrementalDecoder.
2989 # Make test faster by doing smaller seeks
2990 CHUNK_SIZE = 128
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002991
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00002992 def test_seek_and_tell_with_data(data, min_pos=0):
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002993 """Tell/seek to various points within a data stream and ensure
2994 that the decoded data returned by read() is consistent."""
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002995 f = self.open(support.TESTFN, 'wb')
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00002996 f.write(data)
2997 f.close()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00002998 f = self.open(support.TESTFN, encoding='test_decoder')
2999 f._CHUNK_SIZE = CHUNK_SIZE
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003000 decoded = f.read()
3001 f.close()
3002
Neal Norwitze2b07052008-03-18 19:52:05 +00003003 for i in range(min_pos, len(decoded) + 1): # seek positions
3004 for j in [1, 5, len(decoded) - i]: # read lengths
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003005 f = self.open(support.TESTFN, encoding='test_decoder')
Ezio Melottib3aedd42010-11-20 19:04:17 +00003006 self.assertEqual(f.read(i), decoded[:i])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003007 cookie = f.tell()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003008 self.assertEqual(f.read(j), decoded[i:i + j])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003009 f.seek(cookie)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003010 self.assertEqual(f.read(), decoded[i:])
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003011 f.close()
3012
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003013 # Enable the test decoder.
3014 StatefulIncrementalDecoder.codecEnabled = 1
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003015
3016 # Run the tests.
3017 try:
3018 # Try each test case.
3019 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003020 test_seek_and_tell_with_data(input)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003021
3022 # Position each test case so that it crosses a chunk boundary.
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003023 for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
3024 offset = CHUNK_SIZE - len(input)//2
3025 prefix = b'.'*offset
3026 # Don't bother seeking into the prefix (takes too long).
3027 min_pos = offset*2
Benjamin Peterson5fd871d2009-03-05 00:49:53 +00003028 test_seek_and_tell_with_data(prefix + input, min_pos)
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003029
3030 # Ensure our test decoder won't interfere with subsequent tests.
3031 finally:
Benjamin Petersonad9d48d2008-04-02 21:49:44 +00003032 StatefulIncrementalDecoder.codecEnabled = 0
Ka-Ping Yeef44c7e82008-03-18 04:51:32 +00003033
Christopher Thorneac22f6a2018-11-01 10:48:49 +00003034 def test_multibyte_seek_and_tell(self):
3035 f = self.open(support.TESTFN, "w", encoding="euc_jp")
3036 f.write("AB\n\u3046\u3048\n")
3037 f.close()
3038
3039 f = self.open(support.TESTFN, "r", encoding="euc_jp")
3040 self.assertEqual(f.readline(), "AB\n")
3041 p0 = f.tell()
3042 self.assertEqual(f.readline(), "\u3046\u3048\n")
3043 p1 = f.tell()
3044 f.seek(p0)
3045 self.assertEqual(f.readline(), "\u3046\u3048\n")
3046 self.assertEqual(f.tell(), p1)
3047 f.close()
3048
3049 def test_seek_with_encoder_state(self):
3050 f = self.open(support.TESTFN, "w", encoding="euc_jis_2004")
3051 f.write("\u00e6\u0300")
3052 p0 = f.tell()
3053 f.write("\u00e6")
3054 f.seek(p0)
3055 f.write("\u0300")
3056 f.close()
3057
3058 f = self.open(support.TESTFN, "r", encoding="euc_jis_2004")
3059 self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
3060 f.close()
3061
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003062 def test_encoded_writes(self):
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003063 data = "1234567890"
3064 tests = ("utf-16",
3065 "utf-16-le",
3066 "utf-16-be",
3067 "utf-32",
3068 "utf-32-le",
3069 "utf-32-be")
3070 for encoding in tests:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003071 buf = self.BytesIO()
3072 f = self.TextIOWrapper(buf, encoding=encoding)
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003073 # Check if the BOM is written only once (see issue1753).
3074 f.write(data)
3075 f.write(data)
3076 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003077 self.assertEqual(f.read(), data * 2)
Benjamin Peterson9363a652009-03-05 00:42:09 +00003078 f.seek(0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003079 self.assertEqual(f.read(), data * 2)
3080 self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
Alexandre Vassalottia38f73b2008-01-07 18:30:48 +00003081
Benjamin Petersona1b49012009-03-31 23:11:32 +00003082 def test_unreadable(self):
3083 class UnReadable(self.BytesIO):
3084 def readable(self):
3085 return False
3086 txt = self.TextIOWrapper(UnReadable())
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003087 self.assertRaises(OSError, txt.read)
Benjamin Petersona1b49012009-03-31 23:11:32 +00003088
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003089 def test_read_one_by_one(self):
3090 txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003091 reads = ""
3092 while True:
3093 c = txt.read(1)
3094 if not c:
3095 break
3096 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003097 self.assertEqual(reads, "AA\nBB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003098
Benjamin Petersonbf5ff762009-12-13 19:25:34 +00003099 def test_readlines(self):
3100 txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
3101 self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
3102 txt.seek(0)
3103 self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
3104 txt.seek(0)
3105 self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
3106
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003107 # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003108 def test_read_by_chunk(self):
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003109 # make sure "\r\n" straddles 128 char boundary.
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003110 txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003111 reads = ""
3112 while True:
3113 c = txt.read(128)
3114 if not c:
3115 break
3116 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003117 self.assertEqual(reads, "A"*127+"\nB")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003118
Antoine Pitrou3ed2cb52012-10-16 23:02:27 +02003119 def test_writelines(self):
3120 l = ['ab', 'cd', 'ef']
3121 buf = self.BytesIO()
3122 txt = self.TextIOWrapper(buf)
3123 txt.writelines(l)
3124 txt.flush()
3125 self.assertEqual(buf.getvalue(), b'abcdef')
3126
3127 def test_writelines_userlist(self):
3128 l = UserList(['ab', 'cd', 'ef'])
3129 buf = self.BytesIO()
3130 txt = self.TextIOWrapper(buf)
3131 txt.writelines(l)
3132 txt.flush()
3133 self.assertEqual(buf.getvalue(), b'abcdef')
3134
3135 def test_writelines_error(self):
3136 txt = self.TextIOWrapper(self.BytesIO())
3137 self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
3138 self.assertRaises(TypeError, txt.writelines, None)
3139 self.assertRaises(TypeError, txt.writelines, b'abc')
3140
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003141 def test_issue1395_1(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003142 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003143
3144 # read one char at a time
3145 reads = ""
3146 while True:
3147 c = txt.read(1)
3148 if not c:
3149 break
3150 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003151 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003152
3153 def test_issue1395_2(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003154 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003155 txt._CHUNK_SIZE = 4
3156
3157 reads = ""
3158 while True:
3159 c = txt.read(4)
3160 if not c:
3161 break
3162 reads += c
Ezio Melottib3aedd42010-11-20 19:04:17 +00003163 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003164
3165 def test_issue1395_3(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003166 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003167 txt._CHUNK_SIZE = 4
3168
3169 reads = txt.read(4)
3170 reads += txt.read(4)
3171 reads += txt.readline()
3172 reads += txt.readline()
3173 reads += txt.readline()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003174 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003175
3176 def test_issue1395_4(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003177 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003178 txt._CHUNK_SIZE = 4
3179
3180 reads = txt.read(4)
3181 reads += txt.read()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003182 self.assertEqual(reads, self.normalized)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003183
3184 def test_issue1395_5(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003185 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003186 txt._CHUNK_SIZE = 4
3187
3188 reads = txt.read(4)
3189 pos = txt.tell()
3190 txt.seek(0)
3191 txt.seek(pos)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003192 self.assertEqual(txt.read(4), "BBB\n")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003193
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003194 def test_issue2282(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003195 buffer = self.BytesIO(self.testdata)
3196 txt = self.TextIOWrapper(buffer, encoding="ascii")
Ka-Ping Yeeddaa7062008-03-17 20:35:15 +00003197
3198 self.assertEqual(buffer.seekable(), txt.seekable())
3199
Antoine Pitroue4501852009-05-14 18:55:55 +00003200 def test_append_bom(self):
3201 # The BOM is not written again when appending to a non-empty file
3202 filename = support.TESTFN
3203 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3204 with self.open(filename, 'w', encoding=charset) as f:
3205 f.write('aaa')
3206 pos = f.tell()
3207 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003208 self.assertEqual(f.read(), 'aaa'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003209
3210 with self.open(filename, 'a', encoding=charset) as f:
3211 f.write('xxx')
3212 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003213 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003214
3215 def test_seek_bom(self):
3216 # Same test, but when seeking manually
3217 filename = support.TESTFN
3218 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3219 with self.open(filename, 'w', encoding=charset) as f:
3220 f.write('aaa')
3221 pos = f.tell()
3222 with self.open(filename, 'r+', encoding=charset) as f:
3223 f.seek(pos)
3224 f.write('zzz')
3225 f.seek(0)
3226 f.write('bbb')
3227 with self.open(filename, 'rb') as f:
Ezio Melottib3aedd42010-11-20 19:04:17 +00003228 self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
Antoine Pitroue4501852009-05-14 18:55:55 +00003229
Antoine Pitrou85e3ee72015-04-13 20:01:21 +02003230 def test_seek_append_bom(self):
3231 # Same test, but first seek to the start and then to the end
3232 filename = support.TESTFN
3233 for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
3234 with self.open(filename, 'w', encoding=charset) as f:
3235 f.write('aaa')
3236 with self.open(filename, 'a', encoding=charset) as f:
3237 f.seek(0)
3238 f.seek(0, self.SEEK_END)
3239 f.write('xxx')
3240 with self.open(filename, 'rb') as f:
3241 self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
3242
Benjamin Peterson0926ad12009-06-06 18:02:12 +00003243 def test_errors_property(self):
3244 with self.open(support.TESTFN, "w") as f:
3245 self.assertEqual(f.errors, "strict")
3246 with self.open(support.TESTFN, "w", errors="replace") as f:
3247 self.assertEqual(f.errors, "replace")
3248
Brett Cannon31f59292011-02-21 19:29:56 +00003249 @support.no_tracing
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003250 def test_threads_write(self):
3251 # Issue6750: concurrent writes could duplicate data
3252 event = threading.Event()
3253 with self.open(support.TESTFN, "w", buffering=1) as f:
3254 def run(n):
3255 text = "Thread%03d\n" % n
3256 event.wait()
3257 f.write(text)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003258 threads = [threading.Thread(target=run, args=(x,))
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003259 for x in range(20)]
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03003260 with support.start_threads(threads, event.set):
3261 time.sleep(0.02)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003262 with self.open(support.TESTFN) as f:
3263 content = f.read()
3264 for n in range(20):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003265 self.assertEqual(content.count("Thread%03d\n" % n), 1)
Amaury Forgeot d'Arcccd686a2009-08-29 23:00:38 +00003266
Antoine Pitrou6be88762010-05-03 16:48:20 +00003267 def test_flush_error_on_close(self):
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003268 # Test that text file is closed despite failed flush
3269 # and that flush() is called before file closed.
Antoine Pitrou6be88762010-05-03 16:48:20 +00003270 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003271 closed = []
Antoine Pitrou6be88762010-05-03 16:48:20 +00003272 def bad_flush():
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003273 closed[:] = [txt.closed, txt.buffer.closed]
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003274 raise OSError()
Antoine Pitrou6be88762010-05-03 16:48:20 +00003275 txt.flush = bad_flush
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003276 self.assertRaises(OSError, txt.close) # exception not swallowed
Benjamin Peterson68623612012-12-20 11:53:11 -06003277 self.assertTrue(txt.closed)
Serhiy Storchakaa3712a92015-02-21 00:35:09 +02003278 self.assertTrue(txt.buffer.closed)
3279 self.assertTrue(closed) # flush() called
3280 self.assertFalse(closed[0]) # flush() called before file closed
3281 self.assertFalse(closed[1])
Serhiy Storchakac26a1a42015-02-23 00:28:38 +02003282 txt.flush = lambda: None # break reference loop
Antoine Pitrou6be88762010-05-03 16:48:20 +00003283
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003284 def test_close_error_on_close(self):
3285 buffer = self.BytesIO(self.testdata)
3286 def bad_flush():
3287 raise OSError('flush')
3288 def bad_close():
3289 raise OSError('close')
3290 buffer.close = bad_close
3291 txt = self.TextIOWrapper(buffer, encoding="ascii")
3292 txt.flush = bad_flush
3293 with self.assertRaises(OSError) as err: # exception not swallowed
3294 txt.close()
3295 self.assertEqual(err.exception.args, ('close',))
3296 self.assertIsInstance(err.exception.__context__, OSError)
3297 self.assertEqual(err.exception.__context__.args, ('flush',))
3298 self.assertFalse(txt.closed)
3299
Victor Stinner472f7942019-04-12 21:58:24 +02003300 # Silence destructor error
3301 buffer.close = lambda: None
3302 txt.flush = lambda: None
3303
Serhiy Storchaka8a8f7f92014-06-09 09:13:04 +03003304 def test_nonnormalized_close_error_on_close(self):
3305 # Issue #21677
3306 buffer = self.BytesIO(self.testdata)
3307 def bad_flush():
3308 raise non_existing_flush
3309 def bad_close():
3310 raise non_existing_close
3311 buffer.close = bad_close
3312 txt = self.TextIOWrapper(buffer, encoding="ascii")
3313 txt.flush = bad_flush
3314 with self.assertRaises(NameError) as err: # exception not swallowed
3315 txt.close()
3316 self.assertIn('non_existing_close', str(err.exception))
3317 self.assertIsInstance(err.exception.__context__, NameError)
3318 self.assertIn('non_existing_flush', str(err.exception.__context__))
3319 self.assertFalse(txt.closed)
3320
Victor Stinner472f7942019-04-12 21:58:24 +02003321 # Silence destructor error
3322 buffer.close = lambda: None
3323 txt.flush = lambda: None
3324
Antoine Pitrou6be88762010-05-03 16:48:20 +00003325 def test_multi_close(self):
3326 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3327 txt.close()
3328 txt.close()
3329 txt.close()
3330 self.assertRaises(ValueError, txt.flush)
3331
Antoine Pitrou0d739d72010-09-05 23:01:12 +00003332 def test_unseekable(self):
3333 txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata))
3334 self.assertRaises(self.UnsupportedOperation, txt.tell)
3335 self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
3336
Antoine Pitrou7f8f4182010-12-21 21:20:59 +00003337 def test_readonly_attributes(self):
3338 txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
3339 buf = self.BytesIO(self.testdata)
3340 with self.assertRaises(AttributeError):
3341 txt.buffer = buf
3342
Antoine Pitroue96ec682011-07-23 21:46:35 +02003343 def test_rawio(self):
3344 # Issue #12591: TextIOWrapper must work with raw I/O objects, so
3345 # that subprocess.Popen() can have the required unbuffered
3346 # semantics with universal_newlines=True.
3347 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3348 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3349 # Reads
3350 self.assertEqual(txt.read(4), 'abcd')
3351 self.assertEqual(txt.readline(), 'efghi\n')
3352 self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
3353
3354 def test_rawio_write_through(self):
3355 # Issue #12591: with write_through=True, writes don't need a flush
3356 raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
3357 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
3358 write_through=True)
3359 txt.write('1')
3360 txt.write('23\n4')
3361 txt.write('5')
3362 self.assertEqual(b''.join(raw._write_stack), b'123\n45')
3363
Antoine Pitrouc644e7c2014-05-09 00:24:50 +02003364 def test_bufio_write_through(self):
3365 # Issue #21396: write_through=True doesn't force a flush()
3366 # on the underlying binary buffered object.
3367 flush_called, write_called = [], []
3368 class BufferedWriter(self.BufferedWriter):
3369 def flush(self, *args, **kwargs):
3370 flush_called.append(True)
3371 return super().flush(*args, **kwargs)
3372 def write(self, *args, **kwargs):
3373 write_called.append(True)
3374 return super().write(*args, **kwargs)
3375
3376 rawio = self.BytesIO()
3377 data = b"a"
3378 bufio = BufferedWriter(rawio, len(data)*2)
3379 textio = self.TextIOWrapper(bufio, encoding='ascii',
3380 write_through=True)
3381 # write to the buffered io but don't overflow the buffer
3382 text = data.decode('ascii')
3383 textio.write(text)
3384
3385 # buffer.flush is not called with write_through=True
3386 self.assertFalse(flush_called)
3387 # buffer.write *is* called with write_through=True
3388 self.assertTrue(write_called)
3389 self.assertEqual(rawio.getvalue(), b"") # no flush
3390
3391 write_called = [] # reset
3392 textio.write(text * 10) # total content is larger than bufio buffer
3393 self.assertTrue(write_called)
3394 self.assertEqual(rawio.getvalue(), data * 11) # all flushed
3395
Antoine Pitrou3c2817b2017-06-03 12:32:28 +02003396 def test_reconfigure_write_through(self):
3397 raw = self.MockRawIO([])
3398 t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3399 t.write('1')
3400 t.reconfigure(write_through=True) # implied flush
3401 self.assertEqual(t.write_through, True)
3402 self.assertEqual(b''.join(raw._write_stack), b'1')
3403 t.write('23')
3404 self.assertEqual(b''.join(raw._write_stack), b'123')
3405 t.reconfigure(write_through=False)
3406 self.assertEqual(t.write_through, False)
3407 t.write('45')
3408 t.flush()
3409 self.assertEqual(b''.join(raw._write_stack), b'12345')
3410 # Keeping default value
3411 t.reconfigure()
3412 t.reconfigure(write_through=None)
3413 self.assertEqual(t.write_through, False)
3414 t.reconfigure(write_through=True)
3415 t.reconfigure()
3416 t.reconfigure(write_through=None)
3417 self.assertEqual(t.write_through, True)
3418
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003419 def test_read_nonbytes(self):
3420 # Issue #17106
3421 # Crash when underlying read() returns non-bytes
3422 t = self.TextIOWrapper(self.StringIO('a'))
3423 self.assertRaises(TypeError, t.read, 1)
3424 t = self.TextIOWrapper(self.StringIO('a'))
3425 self.assertRaises(TypeError, t.readline)
3426 t = self.TextIOWrapper(self.StringIO('a'))
3427 self.assertRaises(TypeError, t.read)
3428
Oren Milmana5b4ea12017-08-25 21:14:54 +03003429 def test_illegal_encoder(self):
3430 # Issue 31271: Calling write() while the return value of encoder's
3431 # encode() is invalid shouldn't cause an assertion failure.
3432 rot13 = codecs.lookup("rot13")
3433 with support.swap_attr(rot13, '_is_text_encoding', True):
3434 t = io.TextIOWrapper(io.BytesIO(b'foo'), encoding="rot13")
3435 self.assertRaises(TypeError, t.write, 'bar')
3436
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003437 def test_illegal_decoder(self):
3438 # Issue #17106
Nick Coghlana9b15242014-02-04 22:11:18 +10003439 # Bypass the early encoding check added in issue 20404
3440 def _make_illegal_wrapper():
3441 quopri = codecs.lookup("quopri")
3442 quopri._is_text_encoding = True
3443 try:
3444 t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
3445 newline='\n', encoding="quopri")
3446 finally:
3447 quopri._is_text_encoding = False
3448 return t
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003449 # Crash when decoder returns non-string
Nick Coghlana9b15242014-02-04 22:11:18 +10003450 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003451 self.assertRaises(TypeError, t.read, 1)
Nick Coghlana9b15242014-02-04 22:11:18 +10003452 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003453 self.assertRaises(TypeError, t.readline)
Nick Coghlana9b15242014-02-04 22:11:18 +10003454 t = _make_illegal_wrapper()
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003455 self.assertRaises(TypeError, t.read)
3456
Oren Milmanba7d7362017-08-29 11:58:27 +03003457 # Issue 31243: calling read() while the return value of decoder's
3458 # getstate() is invalid should neither crash the interpreter nor
3459 # raise a SystemError.
3460 def _make_very_illegal_wrapper(getstate_ret_val):
3461 class BadDecoder:
3462 def getstate(self):
3463 return getstate_ret_val
3464 def _get_bad_decoder(dummy):
3465 return BadDecoder()
3466 quopri = codecs.lookup("quopri")
3467 with support.swap_attr(quopri, 'incrementaldecoder',
3468 _get_bad_decoder):
3469 return _make_illegal_wrapper()
3470 t = _make_very_illegal_wrapper(42)
3471 self.assertRaises(TypeError, t.read, 42)
3472 t = _make_very_illegal_wrapper(())
3473 self.assertRaises(TypeError, t.read, 42)
3474 t = _make_very_illegal_wrapper((1, 2))
3475 self.assertRaises(TypeError, t.read, 42)
3476
Antoine Pitrou712cb732013-12-21 15:51:54 +01003477 def _check_create_at_shutdown(self, **kwargs):
3478 # Issue #20037: creating a TextIOWrapper at shutdown
3479 # shouldn't crash the interpreter.
3480 iomod = self.io.__name__
3481 code = """if 1:
3482 import codecs
3483 import {iomod} as io
3484
3485 # Avoid looking up codecs at shutdown
3486 codecs.lookup('utf-8')
3487
3488 class C:
3489 def __init__(self):
3490 self.buf = io.BytesIO()
3491 def __del__(self):
3492 io.TextIOWrapper(self.buf, **{kwargs})
3493 print("ok")
3494 c = C()
3495 """.format(iomod=iomod, kwargs=kwargs)
3496 return assert_python_ok("-c", code)
3497
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003498 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003499 def test_create_at_shutdown_without_encoding(self):
3500 rc, out, err = self._check_create_at_shutdown()
3501 if err:
3502 # Can error out with a RuntimeError if the module state
3503 # isn't found.
Nick Coghlana9b15242014-02-04 22:11:18 +10003504 self.assertIn(self.shutdown_error, err.decode())
Antoine Pitrou712cb732013-12-21 15:51:54 +01003505 else:
3506 self.assertEqual("ok", out.decode().strip())
3507
Serhiy Storchakaa7930372016-07-03 22:27:26 +03003508 @support.requires_type_collecting
Antoine Pitrou712cb732013-12-21 15:51:54 +01003509 def test_create_at_shutdown_with_encoding(self):
3510 rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
3511 errors='strict')
3512 self.assertFalse(err)
3513 self.assertEqual("ok", out.decode().strip())
3514
Antoine Pitroub8503892014-04-29 10:14:02 +02003515 def test_read_byteslike(self):
3516 r = MemviewBytesIO(b'Just some random string\n')
3517 t = self.TextIOWrapper(r, 'utf-8')
3518
3519 # TextIOwrapper will not read the full string, because
3520 # we truncate it to a multiple of the native int size
3521 # so that we can construct a more complex memoryview.
3522 bytes_val = _to_memoryview(r.getvalue()).tobytes()
3523
3524 self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
3525
Benjamin Peterson6c14f232014-11-12 10:19:46 -05003526 def test_issue22849(self):
3527 class F(object):
3528 def readable(self): return True
3529 def writable(self): return True
3530 def seekable(self): return True
3531
3532 for i in range(10):
3533 try:
3534 self.TextIOWrapper(F(), encoding='utf-8')
3535 except Exception:
3536 pass
3537
3538 F.tell = lambda x: 0
3539 t = self.TextIOWrapper(F(), encoding='utf-8')
3540
INADA Naoki507434f2017-12-21 09:59:53 +09003541 def test_reconfigure_encoding_read(self):
3542 # latin1 -> utf8
3543 # (latin1 can decode utf-8 encoded string)
3544 data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
3545 raw = self.BytesIO(data)
3546 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3547 self.assertEqual(txt.readline(), 'abc\xe9\n')
3548 with self.assertRaises(self.UnsupportedOperation):
3549 txt.reconfigure(encoding='utf-8')
3550 with self.assertRaises(self.UnsupportedOperation):
3551 txt.reconfigure(newline=None)
3552
3553 def test_reconfigure_write_fromascii(self):
3554 # ascii has a specific encodefunc in the C implementation,
3555 # but utf-8-sig has not. Make sure that we get rid of the
3556 # cached encodefunc when we switch encoders.
3557 raw = self.BytesIO()
3558 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3559 txt.write('foo\n')
3560 txt.reconfigure(encoding='utf-8-sig')
3561 txt.write('\xe9\n')
3562 txt.flush()
3563 self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
3564
3565 def test_reconfigure_write(self):
3566 # latin -> utf8
3567 raw = self.BytesIO()
3568 txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
3569 txt.write('abc\xe9\n')
3570 txt.reconfigure(encoding='utf-8')
3571 self.assertEqual(raw.getvalue(), b'abc\xe9\n')
3572 txt.write('d\xe9f\n')
3573 txt.flush()
3574 self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
3575
3576 # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
3577 # the file
3578 raw = self.BytesIO()
3579 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3580 txt.write('abc\n')
3581 txt.reconfigure(encoding='utf-8-sig')
3582 txt.write('d\xe9f\n')
3583 txt.flush()
3584 self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
3585
3586 def test_reconfigure_write_non_seekable(self):
3587 raw = self.BytesIO()
3588 raw.seekable = lambda: False
3589 raw.seek = None
3590 txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
3591 txt.write('abc\n')
3592 txt.reconfigure(encoding='utf-8-sig')
3593 txt.write('d\xe9f\n')
3594 txt.flush()
3595
3596 # If the raw stream is not seekable, there'll be a BOM
3597 self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
3598
3599 def test_reconfigure_defaults(self):
3600 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
3601 txt.reconfigure(encoding=None)
3602 self.assertEqual(txt.encoding, 'ascii')
3603 self.assertEqual(txt.errors, 'replace')
3604 txt.write('LF\n')
3605
3606 txt.reconfigure(newline='\r\n')
3607 self.assertEqual(txt.encoding, 'ascii')
3608 self.assertEqual(txt.errors, 'replace')
3609
3610 txt.reconfigure(errors='ignore')
3611 self.assertEqual(txt.encoding, 'ascii')
3612 self.assertEqual(txt.errors, 'ignore')
3613 txt.write('CRLF\n')
3614
3615 txt.reconfigure(encoding='utf-8', newline=None)
3616 self.assertEqual(txt.errors, 'strict')
3617 txt.seek(0)
3618 self.assertEqual(txt.read(), 'LF\nCRLF\n')
3619
3620 self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
3621
3622 def test_reconfigure_newline(self):
3623 raw = self.BytesIO(b'CR\rEOF')
3624 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3625 txt.reconfigure(newline=None)
3626 self.assertEqual(txt.readline(), 'CR\n')
3627 raw = self.BytesIO(b'CR\rEOF')
3628 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3629 txt.reconfigure(newline='')
3630 self.assertEqual(txt.readline(), 'CR\r')
3631 raw = self.BytesIO(b'CR\rLF\nEOF')
3632 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3633 txt.reconfigure(newline='\n')
3634 self.assertEqual(txt.readline(), 'CR\rLF\n')
3635 raw = self.BytesIO(b'LF\nCR\rEOF')
3636 txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
3637 txt.reconfigure(newline='\r')
3638 self.assertEqual(txt.readline(), 'LF\nCR\r')
3639 raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
3640 txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
3641 txt.reconfigure(newline='\r\n')
3642 self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
3643
3644 txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
3645 txt.reconfigure(newline=None)
3646 txt.write('linesep\n')
3647 txt.reconfigure(newline='')
3648 txt.write('LF\n')
3649 txt.reconfigure(newline='\n')
3650 txt.write('LF\n')
3651 txt.reconfigure(newline='\r')
3652 txt.write('CR\n')
3653 txt.reconfigure(newline='\r\n')
3654 txt.write('CRLF\n')
3655 expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
3656 self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
3657
Zackery Spytz23db9352018-06-29 04:14:58 -06003658 def test_issue25862(self):
3659 # Assertion failures occurred in tell() after read() and write().
3660 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3661 t.read(1)
3662 t.read()
3663 t.tell()
3664 t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
3665 t.read(1)
3666 t.write('x')
3667 t.tell()
3668
Serhiy Storchaka94dc6732013-02-03 17:03:31 +02003669
Antoine Pitroub8503892014-04-29 10:14:02 +02003670class MemviewBytesIO(io.BytesIO):
3671 '''A BytesIO object whose read method returns memoryviews
3672 rather than bytes'''
3673
3674 def read1(self, len_):
3675 return _to_memoryview(super().read1(len_))
3676
3677 def read(self, len_):
3678 return _to_memoryview(super().read(len_))
3679
3680def _to_memoryview(buf):
3681 '''Convert bytes-object *buf* to a non-trivial memoryview'''
3682
3683 arr = array.array('i')
3684 idx = len(buf) - len(buf) % arr.itemsize
3685 arr.frombytes(buf[:idx])
3686 return memoryview(arr)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +00003687
Benjamin Peterson6fd113c2014-11-12 10:23:44 -05003688
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003689class CTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003690 io = io
Steve Dowerfe8f4c92016-09-07 09:31:52 -07003691 shutdown_error = "RuntimeError: could not find io module state"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003692
3693 def test_initialization(self):
3694 r = self.BytesIO(b"\xc3\xa9\n\n")
3695 b = self.BufferedReader(r, 1000)
3696 t = self.TextIOWrapper(b)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003697 self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
3698 self.assertRaises(ValueError, t.read)
3699
Benjamin Peterson10e76b62014-12-21 20:51:50 -06003700 t = self.TextIOWrapper.__new__(self.TextIOWrapper)
3701 self.assertRaises(Exception, repr, t)
3702
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003703 def test_garbage_collection(self):
3704 # C TextIOWrapper objects are collected, and collecting them flushes
3705 # all data to disk.
3706 # The Python version has __del__, so it ends in gc.garbage instead.
Antoine Pitrou796564c2013-07-30 19:59:21 +02003707 with support.check_warnings(('', ResourceWarning)):
3708 rawio = io.FileIO(support.TESTFN, "wb")
3709 b = self.BufferedWriter(rawio)
3710 t = self.TextIOWrapper(b, encoding="ascii")
3711 t.write("456def")
3712 t.x = t
3713 wr = weakref.ref(t)
3714 del t
3715 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003716 self.assertIsNone(wr(), wr)
Hirokazu Yamamotoc7d6aa42009-06-18 00:07:14 +00003717 with self.open(support.TESTFN, "rb") as f:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003718 self.assertEqual(f.read(), b"456def")
3719
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003720 def test_rwpair_cleared_before_textio(self):
3721 # Issue 13070: TextIOWrapper's finalization would crash when called
3722 # after the reference to the underlying BufferedRWPair's writer got
3723 # cleared by the GC.
3724 for i in range(1000):
3725 b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3726 t1 = self.TextIOWrapper(b1, encoding="ascii")
3727 b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
3728 t2 = self.TextIOWrapper(b2, encoding="ascii")
3729 # circular references
3730 t1.buddy = t2
3731 t2.buddy = t1
3732 support.gc_collect()
3733
Zackery Spytz842acaa2018-12-17 07:52:45 -07003734 def test_del__CHUNK_SIZE_SystemError(self):
3735 t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
3736 with self.assertRaises(AttributeError):
3737 del t._CHUNK_SIZE
3738
Charles-François Natali42c28cd2011-10-05 19:53:43 +02003739
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003740class PyTextIOWrapperTest(TextIOWrapperTest):
Antoine Pitrou712cb732013-12-21 15:51:54 +01003741 io = pyio
Steve Dower6d5cf8a2016-09-09 09:21:01 -07003742 shutdown_error = "LookupError: unknown encoding: ascii"
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003743
3744
3745class IncrementalNewlineDecoderTest(unittest.TestCase):
3746
3747 def check_newline_decoding_utf8(self, decoder):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003748 # UTF-8 specific tests for a newline decoder
3749 def _check_decode(b, s, **kwargs):
3750 # We exercise getstate() / setstate() as well as decode()
3751 state = decoder.getstate()
Ezio Melottib3aedd42010-11-20 19:04:17 +00003752 self.assertEqual(decoder.decode(b, **kwargs), s)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003753 decoder.setstate(state)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003754 self.assertEqual(decoder.decode(b, **kwargs), s)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003755
Antoine Pitrou180a3362008-12-14 16:36:46 +00003756 _check_decode(b'\xe8\xa2\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003757
Antoine Pitrou180a3362008-12-14 16:36:46 +00003758 _check_decode(b'\xe8', "")
3759 _check_decode(b'\xa2', "")
3760 _check_decode(b'\x88', "\u8888")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003761
Antoine Pitrou180a3362008-12-14 16:36:46 +00003762 _check_decode(b'\xe8', "")
3763 _check_decode(b'\xa2', "")
3764 _check_decode(b'\x88', "\u8888")
3765
3766 _check_decode(b'\xe8', "")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003767 self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
3768
Antoine Pitrou180a3362008-12-14 16:36:46 +00003769 decoder.reset()
3770 _check_decode(b'\n', "\n")
3771 _check_decode(b'\r', "")
3772 _check_decode(b'', "\n", final=True)
3773 _check_decode(b'\r', "\n", final=True)
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003774
Antoine Pitrou180a3362008-12-14 16:36:46 +00003775 _check_decode(b'\r', "")
3776 _check_decode(b'a', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003777
Antoine Pitrou180a3362008-12-14 16:36:46 +00003778 _check_decode(b'\r\r\n', "\n\n")
3779 _check_decode(b'\r', "")
3780 _check_decode(b'\r', "\n")
3781 _check_decode(b'\na', "\na")
Amaury Forgeot d'Arc1ff99102007-11-19 20:34:10 +00003782
Antoine Pitrou180a3362008-12-14 16:36:46 +00003783 _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
3784 _check_decode(b'\xe8\xa2\x88', "\u8888")
3785 _check_decode(b'\n', "\n")
3786 _check_decode(b'\xe8\xa2\x88\r', "\u8888")
3787 _check_decode(b'\n', "\n")
Guido van Rossum9b76da62007-04-11 01:09:03 +00003788
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003789 def check_newline_decoding(self, decoder, encoding):
Antoine Pitrou180a3362008-12-14 16:36:46 +00003790 result = []
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003791 if encoding is not None:
3792 encoder = codecs.getincrementalencoder(encoding)()
3793 def _decode_bytewise(s):
3794 # Decode one byte at a time
3795 for b in encoder.encode(s):
3796 result.append(decoder.decode(bytes([b])))
3797 else:
3798 encoder = None
3799 def _decode_bytewise(s):
3800 # Decode one char at a time
3801 for c in s:
3802 result.append(decoder.decode(c))
Ezio Melottib3aedd42010-11-20 19:04:17 +00003803 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003804 _decode_bytewise("abc\n\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003805 self.assertEqual(decoder.newlines, '\n')
Antoine Pitrou180a3362008-12-14 16:36:46 +00003806 _decode_bytewise("\nabc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003807 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003808 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003809 self.assertEqual(decoder.newlines, ('\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003810 _decode_bytewise("abc")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003811 self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
Antoine Pitrou180a3362008-12-14 16:36:46 +00003812 _decode_bytewise("abc\r")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003813 self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
Antoine Pitrou180a3362008-12-14 16:36:46 +00003814 decoder.reset()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003815 input = "abc"
3816 if encoder is not None:
3817 encoder.reset()
3818 input = encoder.encode(input)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003819 self.assertEqual(decoder.decode(input), "abc")
3820 self.assertEqual(decoder.newlines, None)
Antoine Pitrou180a3362008-12-14 16:36:46 +00003821
3822 def test_newline_decoder(self):
3823 encodings = (
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003824 # None meaning the IncrementalNewlineDecoder takes unicode input
3825 # rather than bytes input
3826 None, 'utf-8', 'latin-1',
Antoine Pitrou180a3362008-12-14 16:36:46 +00003827 'utf-16', 'utf-16-le', 'utf-16-be',
3828 'utf-32', 'utf-32-le', 'utf-32-be',
3829 )
3830 for enc in encodings:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003831 decoder = enc and codecs.getincrementaldecoder(enc)()
3832 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3833 self.check_newline_decoding(decoder, enc)
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003834 decoder = codecs.getincrementaldecoder("utf-8")()
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003835 decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
3836 self.check_newline_decoding_utf8(decoder)
Oren Milman1d1d3e92017-08-20 18:35:36 +03003837 self.assertRaises(TypeError, decoder.setstate, 42)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003838
Antoine Pitrou66913e22009-03-06 23:40:56 +00003839 def test_newline_bytes(self):
3840 # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
3841 def _check(dec):
Ezio Melottib3aedd42010-11-20 19:04:17 +00003842 self.assertEqual(dec.newlines, None)
3843 self.assertEqual(dec.decode("\u0D00"), "\u0D00")
3844 self.assertEqual(dec.newlines, None)
3845 self.assertEqual(dec.decode("\u0A00"), "\u0A00")
3846 self.assertEqual(dec.newlines, None)
Antoine Pitrou66913e22009-03-06 23:40:56 +00003847 dec = self.IncrementalNewlineDecoder(None, translate=False)
3848 _check(dec)
3849 dec = self.IncrementalNewlineDecoder(None, translate=True)
3850 _check(dec)
3851
Xiang Zhangb08746b2018-10-31 19:49:16 +08003852 def test_translate(self):
3853 # issue 35062
3854 for translate in (-2, -1, 1, 2):
3855 decoder = codecs.getincrementaldecoder("utf-8")()
3856 decoder = self.IncrementalNewlineDecoder(decoder, translate)
3857 self.check_newline_decoding_utf8(decoder)
3858 decoder = codecs.getincrementaldecoder("utf-8")()
3859 decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
3860 self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
3861
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003862class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3863 pass
3864
3865class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
3866 pass
Antoine Pitrou180a3362008-12-14 16:36:46 +00003867
Alexandre Vassalotti472f07d2008-01-06 00:34:32 +00003868
Guido van Rossum01a27522007-03-07 01:00:12 +00003869# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +00003870
Guido van Rossum5abbf752007-08-27 17:39:33 +00003871class MiscIOTest(unittest.TestCase):
3872
Barry Warsaw40e82462008-11-20 20:14:50 +00003873 def tearDown(self):
3874 support.unlink(support.TESTFN)
3875
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003876 def test___all__(self):
3877 for name in self.io.__all__:
3878 obj = getattr(self.io, name, None)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003879 self.assertIsNotNone(obj, name)
Steve Dowerb82e17e2019-05-23 08:45:22 -07003880 if name in ("open", "open_code"):
Guido van Rossum5abbf752007-08-27 17:39:33 +00003881 continue
Benjamin Peterson6a52a9c2009-04-29 22:00:44 +00003882 elif "error" in name.lower() or name == "UnsupportedOperation":
Benjamin Petersonbfb95942009-04-02 01:13:40 +00003883 self.assertTrue(issubclass(obj, Exception), name)
3884 elif not name.startswith("SEEK_"):
3885 self.assertTrue(issubclass(obj, self.IOBase))
Benjamin Peterson65676e42008-11-05 21:42:45 +00003886
Barry Warsaw40e82462008-11-20 20:14:50 +00003887 def test_attributes(self):
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003888 f = self.open(support.TESTFN, "wb", buffering=0)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003889 self.assertEqual(f.mode, "wb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003890 f.close()
3891
Serhiy Storchaka2480c2e2013-11-24 23:13:26 +02003892 with support.check_warnings(('', DeprecationWarning)):
3893 f = self.open(support.TESTFN, "U")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003894 self.assertEqual(f.name, support.TESTFN)
3895 self.assertEqual(f.buffer.name, support.TESTFN)
3896 self.assertEqual(f.buffer.raw.name, support.TESTFN)
3897 self.assertEqual(f.mode, "U")
3898 self.assertEqual(f.buffer.mode, "rb")
3899 self.assertEqual(f.buffer.raw.mode, "rb")
Barry Warsaw40e82462008-11-20 20:14:50 +00003900 f.close()
3901
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003902 f = self.open(support.TESTFN, "w+")
Ezio Melottib3aedd42010-11-20 19:04:17 +00003903 self.assertEqual(f.mode, "w+")
3904 self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
3905 self.assertEqual(f.buffer.raw.mode, "rb+")
Barry Warsaw40e82462008-11-20 20:14:50 +00003906
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003907 g = self.open(f.fileno(), "wb", closefd=False)
Ezio Melottib3aedd42010-11-20 19:04:17 +00003908 self.assertEqual(g.mode, "wb")
3909 self.assertEqual(g.raw.mode, "wb")
3910 self.assertEqual(g.name, f.fileno())
3911 self.assertEqual(g.raw.name, f.fileno())
Barry Warsaw40e82462008-11-20 20:14:50 +00003912 f.close()
3913 g.close()
3914
Benjamin Peterson9788f972019-11-12 15:54:19 -08003915 def test_open_pipe_with_append(self):
3916 # bpo-27805: Ignore ESPIPE from lseek() in open().
3917 r, w = os.pipe()
3918 self.addCleanup(os.close, r)
3919 f = self.open(w, 'a')
3920 self.addCleanup(f.close)
3921 # Check that the file is marked non-seekable. On Windows, however, lseek
3922 # somehow succeeds on pipes.
3923 if sys.platform != 'win32':
3924 self.assertFalse(f.seekable())
3925
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003926 def test_io_after_close(self):
3927 for kwargs in [
3928 {"mode": "w"},
3929 {"mode": "wb"},
3930 {"mode": "w", "buffering": 1},
3931 {"mode": "w", "buffering": 2},
3932 {"mode": "wb", "buffering": 0},
3933 {"mode": "r"},
3934 {"mode": "rb"},
3935 {"mode": "r", "buffering": 1},
3936 {"mode": "r", "buffering": 2},
3937 {"mode": "rb", "buffering": 0},
3938 {"mode": "w+"},
3939 {"mode": "w+b"},
3940 {"mode": "w+", "buffering": 1},
3941 {"mode": "w+", "buffering": 2},
3942 {"mode": "w+b", "buffering": 0},
3943 ]:
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003944 f = self.open(support.TESTFN, **kwargs)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003945 f.close()
3946 self.assertRaises(ValueError, f.flush)
3947 self.assertRaises(ValueError, f.fileno)
3948 self.assertRaises(ValueError, f.isatty)
3949 self.assertRaises(ValueError, f.__iter__)
3950 if hasattr(f, "peek"):
3951 self.assertRaises(ValueError, f.peek, 1)
3952 self.assertRaises(ValueError, f.read)
3953 if hasattr(f, "read1"):
3954 self.assertRaises(ValueError, f.read1, 1024)
Martin Panterccb2c0e2016-10-20 23:48:14 +00003955 self.assertRaises(ValueError, f.read1)
Victor Stinnerb79f28c2011-05-25 22:09:03 +02003956 if hasattr(f, "readall"):
3957 self.assertRaises(ValueError, f.readall)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003958 if hasattr(f, "readinto"):
3959 self.assertRaises(ValueError, f.readinto, bytearray(1024))
Benjamin Petersona96fea02014-06-22 14:17:44 -07003960 if hasattr(f, "readinto1"):
3961 self.assertRaises(ValueError, f.readinto1, bytearray(1024))
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003962 self.assertRaises(ValueError, f.readline)
3963 self.assertRaises(ValueError, f.readlines)
Xiang Zhang026435c2017-04-15 12:47:28 +08003964 self.assertRaises(ValueError, f.readlines, 1)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003965 self.assertRaises(ValueError, f.seek, 0)
3966 self.assertRaises(ValueError, f.tell)
3967 self.assertRaises(ValueError, f.truncate)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003968 self.assertRaises(ValueError, f.write,
3969 b"" if "b" in kwargs['mode'] else "")
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003970 self.assertRaises(ValueError, f.writelines, [])
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003971 self.assertRaises(ValueError, next, f)
Antoine Pitrou8043cf82009-01-09 19:54:29 +00003972
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003973 def test_blockingioerror(self):
3974 # Various BlockingIOError issues
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003975 class C(str):
3976 pass
3977 c = C("")
3978 b = self.BlockingIOError(1, c)
3979 c.b = b
3980 b.c = c
3981 wr = weakref.ref(c)
3982 del c, b
Benjamin Peterson24fb1d02009-04-24 23:26:21 +00003983 support.gc_collect()
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03003984 self.assertIsNone(wr(), wr)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003985
3986 def test_abcs(self):
3987 # Test the visible base classes are ABCs.
Ezio Melottie9615932010-01-24 19:26:24 +00003988 self.assertIsInstance(self.IOBase, abc.ABCMeta)
3989 self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
3990 self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
3991 self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003992
3993 def _check_abc_inheritance(self, abcmodule):
3994 with self.open(support.TESTFN, "wb", buffering=0) as f:
Ezio Melottie9615932010-01-24 19:26:24 +00003995 self.assertIsInstance(f, abcmodule.IOBase)
3996 self.assertIsInstance(f, abcmodule.RawIOBase)
3997 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
3998 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00003999 with self.open(support.TESTFN, "wb") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004000 self.assertIsInstance(f, abcmodule.IOBase)
4001 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4002 self.assertIsInstance(f, abcmodule.BufferedIOBase)
4003 self.assertNotIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004004 with self.open(support.TESTFN, "w") as f:
Ezio Melottie9615932010-01-24 19:26:24 +00004005 self.assertIsInstance(f, abcmodule.IOBase)
4006 self.assertNotIsInstance(f, abcmodule.RawIOBase)
4007 self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
4008 self.assertIsInstance(f, abcmodule.TextIOBase)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004009
4010 def test_abc_inheritance(self):
4011 # Test implementations inherit from their respective ABCs
4012 self._check_abc_inheritance(self)
4013
4014 def test_abc_inheritance_official(self):
4015 # Test implementations inherit from the official ABCs of the
4016 # baseline "io" module.
4017 self._check_abc_inheritance(io)
4018
Antoine Pitroue033e062010-10-29 10:38:18 +00004019 def _check_warn_on_dealloc(self, *args, **kwargs):
4020 f = open(*args, **kwargs)
4021 r = repr(f)
4022 with self.assertWarns(ResourceWarning) as cm:
4023 f = None
4024 support.gc_collect()
4025 self.assertIn(r, str(cm.warning.args[0]))
4026
4027 def test_warn_on_dealloc(self):
4028 self._check_warn_on_dealloc(support.TESTFN, "wb", buffering=0)
4029 self._check_warn_on_dealloc(support.TESTFN, "wb")
4030 self._check_warn_on_dealloc(support.TESTFN, "w")
4031
4032 def _check_warn_on_dealloc_fd(self, *args, **kwargs):
4033 fds = []
Benjamin Peterson556c7352010-10-31 01:35:43 +00004034 def cleanup_fds():
Antoine Pitroue033e062010-10-29 10:38:18 +00004035 for fd in fds:
4036 try:
4037 os.close(fd)
Andrew Svetlov3438fa42012-12-17 23:35:18 +02004038 except OSError as e:
Antoine Pitroue033e062010-10-29 10:38:18 +00004039 if e.errno != errno.EBADF:
4040 raise
Benjamin Peterson556c7352010-10-31 01:35:43 +00004041 self.addCleanup(cleanup_fds)
4042 r, w = os.pipe()
4043 fds += r, w
4044 self._check_warn_on_dealloc(r, *args, **kwargs)
4045 # When using closefd=False, there's no warning
4046 r, w = os.pipe()
4047 fds += r, w
Serhiy Storchaka94a619d2016-02-11 13:11:44 +02004048 with support.check_no_resource_warning(self):
Benjamin Peterson556c7352010-10-31 01:35:43 +00004049 open(r, *args, closefd=False, **kwargs)
Antoine Pitroue033e062010-10-29 10:38:18 +00004050
4051 def test_warn_on_dealloc_fd(self):
4052 self._check_warn_on_dealloc_fd("rb", buffering=0)
4053 self._check_warn_on_dealloc_fd("rb")
4054 self._check_warn_on_dealloc_fd("r")
4055
4056
Antoine Pitrou243757e2010-11-05 21:15:39 +00004057 def test_pickling(self):
4058 # Pickling file objects is forbidden
4059 for kwargs in [
4060 {"mode": "w"},
4061 {"mode": "wb"},
4062 {"mode": "wb", "buffering": 0},
4063 {"mode": "r"},
4064 {"mode": "rb"},
4065 {"mode": "rb", "buffering": 0},
4066 {"mode": "w+"},
4067 {"mode": "w+b"},
4068 {"mode": "w+b", "buffering": 0},
4069 ]:
4070 for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
4071 with self.open(support.TESTFN, **kwargs) as f:
4072 self.assertRaises(TypeError, pickle.dumps, f, protocol)
4073
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004074 def test_nonblock_pipe_write_bigbuf(self):
4075 self._test_nonblock_pipe_write(16*1024)
4076
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004077 def test_nonblock_pipe_write_smallbuf(self):
4078 self._test_nonblock_pipe_write(1024)
4079
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004080 @unittest.skipUnless(hasattr(os, 'set_blocking'),
4081 'os.set_blocking() required for this test')
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004082 def _test_nonblock_pipe_write(self, bufsize):
4083 sent = []
4084 received = []
4085 r, w = os.pipe()
Victor Stinner1db9e7b2014-07-29 22:32:47 +02004086 os.set_blocking(r, False)
4087 os.set_blocking(w, False)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004088
4089 # To exercise all code paths in the C implementation we need
4090 # to play with buffer sizes. For instance, if we choose a
4091 # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
4092 # then we will never get a partial write of the buffer.
4093 rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
4094 wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
4095
4096 with rf, wf:
4097 for N in 9999, 73, 7574:
4098 try:
4099 i = 0
4100 while True:
4101 msg = bytes([i % 26 + 97]) * N
4102 sent.append(msg)
4103 wf.write(msg)
4104 i += 1
4105
4106 except self.BlockingIOError as e:
4107 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004108 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004109 sent[-1] = sent[-1][:e.characters_written]
4110 received.append(rf.read())
4111 msg = b'BLOCKED'
4112 wf.write(msg)
4113 sent.append(msg)
4114
4115 while True:
4116 try:
4117 wf.flush()
4118 break
4119 except self.BlockingIOError as e:
4120 self.assertEqual(e.args[0], errno.EAGAIN)
Antoine Pitrou7fe601c2011-11-21 20:22:01 +01004121 self.assertEqual(e.args[2], e.characters_written)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004122 self.assertEqual(e.characters_written, 0)
4123 received.append(rf.read())
4124
4125 received += iter(rf.read, None)
4126
4127 sent, received = b''.join(sent), b''.join(received)
Serhiy Storchaka0dcd80a2015-08-02 15:17:49 +03004128 self.assertEqual(sent, received)
Antoine Pitrou58fcf9f2011-11-21 20:16:44 +01004129 self.assertTrue(wf.closed)
4130 self.assertTrue(rf.closed)
4131
Charles-François Natalidc3044c2012-01-09 22:40:02 +01004132 def test_create_fail(self):
4133 # 'x' mode fails if file is existing
4134 with self.open(support.TESTFN, 'w'):
4135 pass
4136 self.assertRaises(FileExistsError, self.open, support.TESTFN, 'x')
4137
4138 def test_create_writes(self):
4139 # 'x' mode opens for writing
4140 with self.open(support.TESTFN, 'xb') as f:
4141 f.write(b"spam")
4142 with self.open(support.TESTFN, 'rb') as f:
4143 self.assertEqual(b"spam", f.read())
4144
Christian Heimes7b648752012-09-10 14:48:43 +02004145 def test_open_allargs(self):
4146 # there used to be a buffer overflow in the parser for rawmode
4147 self.assertRaises(ValueError, self.open, support.TESTFN, 'rwax+')
4148
4149
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004150class CMiscIOTest(MiscIOTest):
4151 io = io
4152
Serhiy Storchaka37a79a12013-05-28 16:24:45 +03004153 def test_readinto_buffer_overflow(self):
4154 # Issue #18025
4155 class BadReader(self.io.BufferedIOBase):
4156 def read(self, n=-1):
4157 return b'x' * 10**6
4158 bufio = BadReader()
4159 b = bytearray(2)
4160 self.assertRaises(ValueError, bufio.readinto, b)
4161
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004162 def check_daemon_threads_shutdown_deadlock(self, stream_name):
4163 # Issue #23309: deadlocks at shutdown should be avoided when a
4164 # daemon thread and the main thread both write to a file.
4165 code = """if 1:
4166 import sys
4167 import time
4168 import threading
Victor Stinner2a1aed02017-04-21 17:59:23 +02004169 from test.support import SuppressCrashReport
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004170
4171 file = sys.{stream_name}
4172
4173 def run():
4174 while True:
4175 file.write('.')
4176 file.flush()
4177
Victor Stinner2a1aed02017-04-21 17:59:23 +02004178 crash = SuppressCrashReport()
4179 crash.__enter__()
4180 # don't call __exit__(): the crash occurs at Python shutdown
4181
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004182 thread = threading.Thread(target=run)
4183 thread.daemon = True
4184 thread.start()
4185
4186 time.sleep(0.5)
4187 file.write('!')
4188 file.flush()
4189 """.format_map(locals())
4190 res, _ = run_python_until_end("-c", code)
4191 err = res.err.decode()
4192 if res.rc != 0:
4193 # Failure: should be a fatal error
Max Bernsteinccb7ca72019-05-21 10:09:21 -07004194 pattern = (r"Fatal Python error: could not acquire lock "
4195 r"for <(_io\.)?BufferedWriter name='<{stream_name}>'> "
4196 r"at interpreter shutdown, possibly due to "
4197 r"daemon threads".format_map(locals()))
4198 self.assertRegex(err, pattern)
Antoine Pitrou25f85d42015-04-13 19:41:47 +02004199 else:
4200 self.assertFalse(err.strip('.!'))
4201
4202 def test_daemon_threads_shutdown_stdout_deadlock(self):
4203 self.check_daemon_threads_shutdown_deadlock('stdout')
4204
4205 def test_daemon_threads_shutdown_stderr_deadlock(self):
4206 self.check_daemon_threads_shutdown_deadlock('stderr')
4207
4208
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004209class PyMiscIOTest(MiscIOTest):
4210 io = pyio
Barry Warsaw40e82462008-11-20 20:14:50 +00004211
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004212
4213@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
4214class SignalsTest(unittest.TestCase):
4215
4216 def setUp(self):
4217 self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
4218
4219 def tearDown(self):
4220 signal.signal(signal.SIGALRM, self.oldalrm)
4221
4222 def alarm_interrupt(self, sig, frame):
4223 1/0
4224
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004225 def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
4226 """Check that a partial write, when it gets interrupted, properly
Antoine Pitrou707ce822011-02-25 21:24:11 +00004227 invokes the signal handler, and bubbles up the exception raised
4228 in the latter."""
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004229 read_results = []
4230 def _read():
4231 s = os.read(r, 1)
4232 read_results.append(s)
Victor Stinner05c9d312018-12-18 23:52:39 +01004233
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004234 t = threading.Thread(target=_read)
4235 t.daemon = True
4236 r, w = os.pipe()
Benjamin Petersond8fc2e12010-10-31 01:19:53 +00004237 fdopen_kwargs["closefd"] = False
Victor Stinner9c182a12015-03-16 18:03:06 +01004238 large_data = item * (support.PIPE_MAX_SIZE // len(item) + 1)
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004239 try:
4240 wio = self.io.open(w, **fdopen_kwargs)
Victor Stinner05c9d312018-12-18 23:52:39 +01004241 if hasattr(signal, 'pthread_sigmask'):
4242 # create the thread with SIGALRM signal blocked
4243 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGALRM])
4244 t.start()
4245 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGALRM])
4246 else:
4247 t.start()
4248
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004249 # Fill the pipe enough that the write will be blocking.
4250 # It will be interrupted by the timer armed above. Since the
4251 # other thread has read one byte, the low-level write will
4252 # return with a successful (partial) result rather than an EINTR.
4253 # The buffered IO layer must check for pending signal
4254 # handlers, which in this case will invoke alarm_interrupt().
Victor Stinner775b2dd2013-07-15 19:53:13 +02004255 signal.alarm(1)
4256 try:
Victor Stinner9c182a12015-03-16 18:03:06 +01004257 self.assertRaises(ZeroDivisionError, wio.write, large_data)
Victor Stinner775b2dd2013-07-15 19:53:13 +02004258 finally:
4259 signal.alarm(0)
Serhiy Storchaka263dcd22015-04-01 13:01:14 +03004260 t.join()
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004261 # We got one byte, get another one and check that it isn't a
4262 # repeat of the first one.
4263 read_results.append(os.read(r, 1))
4264 self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
4265 finally:
4266 os.close(w)
4267 os.close(r)
4268 # This is deliberate. If we didn't close the file descriptor
4269 # before closing wio, wio would try to flush its internal
4270 # buffer, and block again.
4271 try:
4272 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004273 except OSError as e:
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004274 if e.errno != errno.EBADF:
4275 raise
4276
4277 def test_interrupted_write_unbuffered(self):
4278 self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
4279
4280 def test_interrupted_write_buffered(self):
4281 self.check_interrupted_write(b"xy", b"xy", mode="wb")
4282
4283 def test_interrupted_write_text(self):
4284 self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
4285
Brett Cannon31f59292011-02-21 19:29:56 +00004286 @support.no_tracing
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004287 def check_reentrant_write(self, data, **fdopen_kwargs):
4288 def on_alarm(*args):
4289 # Will be called reentrantly from the same thread
4290 wio.write(data)
4291 1/0
4292 signal.signal(signal.SIGALRM, on_alarm)
4293 r, w = os.pipe()
4294 wio = self.io.open(w, **fdopen_kwargs)
4295 try:
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004296 signal.alarm(1)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004297 # Either the reentrant call to wio.write() fails with RuntimeError,
4298 # or the signal handler raises ZeroDivisionError.
4299 with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
4300 while 1:
4301 for i in range(100):
4302 wio.write(data)
4303 wio.flush()
4304 # Make sure the buffer doesn't fill up and block further writes
4305 os.read(r, len(data) * 100)
4306 exc = cm.exception
4307 if isinstance(exc, RuntimeError):
4308 self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
4309 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004310 signal.alarm(0)
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004311 wio.close()
4312 os.close(r)
4313
4314 def test_reentrant_write_buffered(self):
4315 self.check_reentrant_write(b"xy", mode="wb")
4316
4317 def test_reentrant_write_text(self):
4318 self.check_reentrant_write("xy", mode="w", encoding="ascii")
4319
Antoine Pitrou707ce822011-02-25 21:24:11 +00004320 def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
4321 """Check that a buffered read, when it gets interrupted (either
4322 returning a partial result or EINTR), properly invokes the signal
4323 handler and retries if the latter returned successfully."""
4324 r, w = os.pipe()
4325 fdopen_kwargs["closefd"] = False
4326 def alarm_handler(sig, frame):
4327 os.write(w, b"bar")
4328 signal.signal(signal.SIGALRM, alarm_handler)
4329 try:
4330 rio = self.io.open(r, **fdopen_kwargs)
4331 os.write(w, b"foo")
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004332 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004333 # Expected behaviour:
4334 # - first raw read() returns partial b"foo"
4335 # - second raw read() returns EINTR
4336 # - third raw read() returns b"bar"
4337 self.assertEqual(decode(rio.read(6)), "foobar")
4338 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004339 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004340 rio.close()
4341 os.close(w)
4342 os.close(r)
4343
Antoine Pitrou20db5112011-08-19 20:32:34 +02004344 def test_interrupted_read_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004345 self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
4346 mode="rb")
4347
Antoine Pitrou20db5112011-08-19 20:32:34 +02004348 def test_interrupted_read_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004349 self.check_interrupted_read_retry(lambda x: x,
4350 mode="r")
4351
Antoine Pitrou707ce822011-02-25 21:24:11 +00004352 def check_interrupted_write_retry(self, item, **fdopen_kwargs):
4353 """Check that a buffered write, when it gets interrupted (either
4354 returning a partial result or EINTR), properly invokes the signal
4355 handler and retries if the latter returned successfully."""
4356 select = support.import_module("select")
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004357
Antoine Pitrou707ce822011-02-25 21:24:11 +00004358 # A quantity that exceeds the buffer size of an anonymous pipe's
4359 # write end.
Antoine Pitroue1a16742013-04-24 23:31:38 +02004360 N = support.PIPE_MAX_SIZE
Antoine Pitrou707ce822011-02-25 21:24:11 +00004361 r, w = os.pipe()
4362 fdopen_kwargs["closefd"] = False
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004363
Antoine Pitrou707ce822011-02-25 21:24:11 +00004364 # We need a separate thread to read from the pipe and allow the
4365 # write() to finish. This thread is started after the SIGALRM is
4366 # received (forcing a first EINTR in write()).
4367 read_results = []
4368 write_finished = False
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004369 error = None
Antoine Pitrou707ce822011-02-25 21:24:11 +00004370 def _read():
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004371 try:
4372 while not write_finished:
4373 while r in select.select([r], [], [], 1.0)[0]:
4374 s = os.read(r, 1024)
4375 read_results.append(s)
4376 except BaseException as exc:
4377 nonlocal error
4378 error = exc
Antoine Pitrou707ce822011-02-25 21:24:11 +00004379 t = threading.Thread(target=_read)
4380 t.daemon = True
4381 def alarm1(sig, frame):
4382 signal.signal(signal.SIGALRM, alarm2)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004383 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004384 def alarm2(sig, frame):
4385 t.start()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004386
4387 large_data = item * N
Antoine Pitrou707ce822011-02-25 21:24:11 +00004388 signal.signal(signal.SIGALRM, alarm1)
4389 try:
4390 wio = self.io.open(w, **fdopen_kwargs)
Gregory P. Smithb5ba2032012-06-25 01:13:32 -07004391 signal.alarm(1)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004392 # Expected behaviour:
4393 # - first raw write() is partial (because of the limited pipe buffer
4394 # and the first alarm)
4395 # - second raw write() returns EINTR (because of the second alarm)
4396 # - subsequent write()s are successful (either partial or complete)
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004397 written = wio.write(large_data)
4398 self.assertEqual(N, written)
4399
Antoine Pitrou707ce822011-02-25 21:24:11 +00004400 wio.flush()
4401 write_finished = True
4402 t.join()
Victor Stinner6d3c09f2015-03-16 17:57:49 +01004403
Serhiy Storchaka9db55002015-03-28 20:38:37 +02004404 self.assertIsNone(error)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004405 self.assertEqual(N, sum(len(x) for x in read_results))
4406 finally:
Victor Stinner9abee722017-09-19 09:36:54 -07004407 signal.alarm(0)
Antoine Pitrou707ce822011-02-25 21:24:11 +00004408 write_finished = True
4409 os.close(w)
4410 os.close(r)
4411 # This is deliberate. If we didn't close the file descriptor
4412 # before closing wio, wio would try to flush its internal
4413 # buffer, and could block (in case of failure).
4414 try:
4415 wio.close()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004416 except OSError as e:
Antoine Pitrou707ce822011-02-25 21:24:11 +00004417 if e.errno != errno.EBADF:
4418 raise
4419
Antoine Pitrou20db5112011-08-19 20:32:34 +02004420 def test_interrupted_write_retry_buffered(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004421 self.check_interrupted_write_retry(b"x", mode="wb")
4422
Antoine Pitrou20db5112011-08-19 20:32:34 +02004423 def test_interrupted_write_retry_text(self):
Antoine Pitrou707ce822011-02-25 21:24:11 +00004424 self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
4425
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004426
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004427class CSignalsTest(SignalsTest):
4428 io = io
4429
4430class PySignalsTest(SignalsTest):
4431 io = pyio
4432
Antoine Pitrouf3b68b32010-12-03 18:41:39 +00004433 # Handling reentrancy issues would slow down _pyio even more, so the
4434 # tests are disabled.
4435 test_reentrant_write_buffered = None
4436 test_reentrant_write_text = None
4437
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004438
Ezio Melottidaa42c72013-03-23 16:30:16 +02004439def load_tests(*args):
Gregory P. Smith1bef9072015-04-14 13:24:34 -07004440 tests = (CIOTest, PyIOTest, APIMismatchTest,
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004441 CBufferedReaderTest, PyBufferedReaderTest,
4442 CBufferedWriterTest, PyBufferedWriterTest,
4443 CBufferedRWPairTest, PyBufferedRWPairTest,
4444 CBufferedRandomTest, PyBufferedRandomTest,
4445 StatefulIncrementalDecoderTest,
4446 CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
4447 CTextIOWrapperTest, PyTextIOWrapperTest,
Antoine Pitroub46b9d52010-08-21 19:09:32 +00004448 CMiscIOTest, PyMiscIOTest,
4449 CSignalsTest, PySignalsTest,
4450 )
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004451
4452 # Put the namespaces of the IO module we are testing and some useful mock
4453 # classes in the __dict__ of each test.
4454 mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
benfogle9703f092017-11-10 16:03:40 -05004455 MockNonBlockWriterIO, MockUnseekableIO, MockRawIOWithoutRead,
4456 SlowFlushRawIO)
Benjamin Peterson4fa88fa2009-03-04 00:14:51 +00004457 all_members = io.__all__ + ["IncrementalNewlineDecoder"]
4458 c_io_ns = {name : getattr(io, name) for name in all_members}
4459 py_io_ns = {name : getattr(pyio, name) for name in all_members}
4460 globs = globals()
4461 c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
4462 py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
4463 # Avoid turning open into a bound method.
4464 py_io_ns["open"] = pyio.OpenWrapper
4465 for test in tests:
4466 if test.__name__.startswith("C"):
4467 for name, obj in c_io_ns.items():
4468 setattr(test, name, obj)
4469 elif test.__name__.startswith("Py"):
4470 for name, obj in py_io_ns.items():
4471 setattr(test, name, obj)
4472
Ezio Melottidaa42c72013-03-23 16:30:16 +02004473 suite = unittest.TestSuite([unittest.makeSuite(test) for test in tests])
4474 return suite
Guido van Rossum28524c72007-02-27 05:47:44 +00004475
4476if __name__ == "__main__":
Ezio Melottidaa42c72013-03-23 16:30:16 +02004477 unittest.main()